Added Kubernetes API support, fo ingress controller.

This commit is contained in:
Ara Sadoyan
2025-09-01 16:32:30 +02:00
parent e304482667
commit d3602fa578
14 changed files with 434 additions and 228 deletions

View File

@@ -1,6 +1,5 @@
use crate::utils::parceyaml::load_configuration;
use crate::utils::structs::{Configuration, InnerMap, ServiceMapping, UpstreamsDashMap};
use crate::utils::tools::{clone_dashmap_into, compare_dashmaps};
use crate::utils::tools::{clone_dashmap_into, compare_dashmaps, print_upstreams};
use dashmap::DashMap;
use futures::channel::mpsc::Sender;
use futures::SinkExt;
@@ -11,6 +10,7 @@ use reqwest::header::{HeaderMap, HeaderValue};
use serde::Deserialize;
use std::collections::HashMap;
use std::sync::atomic::AtomicUsize;
use std::sync::Arc;
use std::time::Duration;
#[derive(Debug, Deserialize)]
@@ -27,59 +27,53 @@ struct TaggedAddress {
port: u16,
}
pub async fn start(fp: String, mut toreturn: Sender<Configuration>) {
let config = load_configuration(fp.as_str(), "filepath").await;
pub async fn start(mut toreturn: Sender<Configuration>, config: Arc<Configuration>) {
let headers = DashMap::new();
match config {
Some(config) => {
if config.typecfg.to_string() != "consul" {
info!("Not running Consul discovery, requested type is: {}", config.typecfg);
return;
}
info!("Consul Discovery is enabled : {}", config.typecfg);
let consul = config.consul.clone();
let prev_upstreams = UpstreamsDashMap::new();
match consul {
Some(consul) => {
let servers = consul.servers.unwrap();
info!("Consul Servers => {:?}", servers);
let end = servers.len() - 1;
info!("Consul Discovery is enabled : {}", config.typecfg);
let consul = config.consul.clone();
let prev_upstreams = UpstreamsDashMap::new();
match consul {
Some(consul) => {
let servers = consul.servers.unwrap();
info!("Consul Servers => {:?}", servers);
let end = servers.len();
loop {
let num = rand::rng().random_range(1..end);
headers.clear();
for (k, v) in config.headers.clone() {
headers.insert(k.to_string(), v);
}
let consul_data = servers.get(num).unwrap().to_string();
let upstreams = consul_request(consul_data, consul.services.clone(), consul.token.clone());
match upstreams.await {
Some(upstreams) => {
if !compare_dashmaps(&upstreams, &prev_upstreams) {
let mut tosend: Configuration = Configuration {
upstreams: Default::default(),
headers: Default::default(),
consul: None,
typecfg: "".to_string(),
extraparams: config.extraparams.clone(),
};
clone_dashmap_into(&upstreams, &prev_upstreams);
clone_dashmap_into(&upstreams, &tosend.upstreams);
tosend.headers = headers.clone();
tosend.extraparams.authentication = config.extraparams.authentication.clone();
tosend.typecfg = config.typecfg.clone();
tosend.consul = config.consul.clone();
toreturn.send(tosend).await.unwrap();
}
}
None => {}
}
sleep(Duration::from_secs(5)).await;
}
loop {
let mut num = 0;
if end > 0 {
num = rand::rng().random_range(0..end);
}
None => {}
headers.clear();
for (k, v) in config.headers.clone() {
headers.insert(k.to_string(), v);
}
let consul_data = servers.get(num).unwrap().to_string();
let upstreams = consul_request(consul_data, consul.services.clone(), consul.token.clone());
match upstreams.await {
Some(upstreams) => {
if !compare_dashmaps(&upstreams, &prev_upstreams) {
let mut tosend: Configuration = Configuration {
upstreams: Default::default(),
headers: Default::default(),
consul: None,
kubernetes: None,
typecfg: "".to_string(),
extraparams: config.extraparams.clone(),
};
clone_dashmap_into(&upstreams, &prev_upstreams);
clone_dashmap_into(&upstreams, &tosend.upstreams);
tosend.headers = headers.clone();
tosend.extraparams.authentication = config.extraparams.authentication.clone();
tosend.typecfg = config.typecfg.clone();
tosend.consul = config.consul.clone();
print_upstreams(&tosend.upstreams);
toreturn.send(tosend).await.unwrap();
}
}
None => {}
}
sleep(Duration::from_secs(5)).await;
}
}
None => {}

View File

@@ -1,13 +1,11 @@
use crate::utils::consul;
use crate::utils::filewatch;
use crate::utils::structs::Configuration;
use crate::utils::{consul, kuber};
use crate::web::webserver;
use async_trait::async_trait;
use futures::channel::mpsc::Sender;
use std::sync::Arc;
pub struct FromFileProvider {
pub path: String,
}
pub struct APIUpstreamProvider {
pub config_api_enabled: bool,
pub address: String,
@@ -19,15 +17,6 @@ pub struct APIUpstreamProvider {
pub file_server_folder: Option<String>,
}
pub struct ConsulProvider {
pub path: String,
}
#[async_trait]
pub trait Discovery {
async fn start(&self, tx: Sender<Configuration>);
}
#[async_trait]
impl Discovery for APIUpstreamProvider {
async fn start(&self, toreturn: Sender<Configuration>) {
@@ -35,6 +24,23 @@ impl Discovery for APIUpstreamProvider {
}
}
pub struct FromFileProvider {
pub path: String,
}
pub struct ConsulProvider {
pub config: Arc<Configuration>,
}
pub struct KubernetesProvider {
pub config: Arc<Configuration>,
}
#[async_trait]
pub trait Discovery {
async fn start(&self, tx: Sender<Configuration>);
}
#[async_trait]
impl Discovery for FromFileProvider {
async fn start(&self, tx: Sender<Configuration>) {
@@ -45,6 +51,13 @@ impl Discovery for FromFileProvider {
#[async_trait]
impl Discovery for ConsulProvider {
async fn start(&self, tx: Sender<Configuration>) {
tokio::spawn(consul::start(self.path.clone(), tx.clone()));
tokio::spawn(consul::start(tx.clone(), self.config.clone()));
}
}
#[async_trait]
impl Discovery for KubernetesProvider {
async fn start(&self, tx: Sender<Configuration>) {
tokio::spawn(kuber::start(tx.clone(), self.config.clone()));
}
}

158
src/utils/dnsclient.rs Normal file
View File

@@ -0,0 +1,158 @@
/*
use crate::utils::structs::InnerMap;
use dashmap::DashMap;
use hickory_client::client::{Client, ClientHandle};
use hickory_client::proto::rr::{DNSClass, Name, RecordType};
use hickory_client::proto::runtime::TokioRuntimeProvider;
use hickory_client::proto::udp::UdpClientStream;
use std::net::SocketAddr;
use std::str::FromStr;
use std::sync::atomic::AtomicUsize;
use std::time::Duration;
use tokio::sync::Mutex;
type DnsError = Box<dyn std::error::Error + Send + Sync + 'static>;
pub struct DnsClientPool {
clients: Vec<Mutex<DnsClient>>,
}
struct DnsClient {
client: Client,
}
pub async fn start2(mut toreturn: Sender<Configuration>, config: Arc<Configuration>) {
let k8s = config.kubernetes.clone();
match k8s {
Some(k8s) => {
let dnserver = k8s.servers.unwrap_or(vec!["127.0.0.1:53".to_string()]);
let headers = DashMap::new();
let end = dnserver.len() - 1;
let mut num = 0;
if end > 0 {
num = rand::rng().random_range(0..end);
}
let srv = dnserver.get(num).unwrap().to_string();
let pool = DnsClientPool::new(5, srv.clone()).await;
let u = UpstreamsDashMap::new();
if let Some(whitelist) = k8s.services {
loop {
let upstreams = UpstreamsDashMap::new();
for service in whitelist.iter() {
let ret = pool.query_srv(service.real.as_str(), srv.clone()).await;
match ret {
Ok(r) => {
upstreams.insert(service.proxy.clone(), r);
}
Err(e) => eprintln!("DNS query failed for {:?}: {:?}", service, e),
}
}
if !compare_dashmaps(&u, &upstreams) {
headers.clear();
for (k, v) in config.headers.clone() {
headers.insert(k.to_string(), v);
}
let mut tosend: Configuration = Configuration {
upstreams: Default::default(),
headers: Default::default(),
consul: None,
kubernetes: None,
typecfg: "".to_string(),
extraparams: config.extraparams.clone(),
};
clone_dashmap_into(&upstreams, &u);
clone_dashmap_into(&upstreams, &tosend.upstreams);
tosend.headers = headers.clone();
tosend.extraparams.authentication = config.extraparams.authentication.clone();
tosend.typecfg = config.typecfg.clone();
tosend.consul = config.consul.clone();
print_upstreams(&tosend.upstreams);
toreturn.send(tosend).await.unwrap();
}
tokio::time::sleep(Duration::from_secs(1)).await;
}
}
}
None => {}
}
}
impl DnsClient {
pub async fn new(server: String) -> Result<Self, DnsError> {
let server_details = server;
let server: SocketAddr = server_details.parse().expect("Unable to parse socket address");
let conn = UdpClientStream::builder(server, TokioRuntimeProvider::default()).build();
let (client, bg) = Client::connect(conn).await.unwrap();
tokio::spawn(bg);
Ok(Self { client })
}
pub async fn query_srv(&mut self, name: &str) -> Result<DashMap<String, (Vec<InnerMap>, AtomicUsize)>, DnsError> {
let upstreams: DashMap<String, (Vec<InnerMap>, AtomicUsize)> = DashMap::new();
let mut values = Vec::new();
match tokio::time::timeout(Duration::from_secs(5), self.client.query(Name::from_str(name)?, DNSClass::IN, RecordType::SRV)).await {
Ok(Ok(response)) => {
for answer in response.answers() {
if let hickory_client::proto::rr::RData::SRV(srv) = answer.data() {
let to_add = InnerMap {
address: srv.target().to_string(),
port: srv.port(),
is_ssl: false,
is_http2: false,
to_https: false,
rate_limit: None,
};
values.push(to_add);
}
}
upstreams.insert("/".to_string(), (values, AtomicUsize::new(0)));
Ok(upstreams)
}
Ok(Err(e)) => Err(Box::new(e)),
Err(_) => Err("DNS query timed out".into()),
}
}
}
impl DnsClientPool {
pub async fn new(pool_size: usize, server: String) -> Self {
let mut clients = Vec::with_capacity(pool_size);
for _ in 0..pool_size {
if let Ok(client) = DnsClient::new(server.clone()).await {
clients.push(Mutex::new(client));
}
}
Self { clients }
}
pub async fn query_srv(&self, name: &str, server: String) -> Result<DashMap<String, (Vec<InnerMap>, AtomicUsize)>, DnsError> {
// Try to get an available client
for client_mutex in &self.clients {
if let Ok(mut client) = client_mutex.try_lock() {
let vay = client.query_srv(name).await;
match vay {
Ok(_) => return vay,
Err(_) => {
// If query fails, drop this client and create a new one
*client = match DnsClient::new(server).await {
Ok(c) => c,
Err(e) => return Err(e),
};
// Retry with the new client
return client.query_srv(name).await;
}
}
}
}
// If all clients are busy, wait for the first one with a timeout
match tokio::time::timeout(Duration::from_secs(2), self.clients[0].lock()).await {
Ok(mut client) => client.query_srv(name).await,
Err(_) => Err("All DNS clients are busy and timeout reached".into()),
}
}
}
*/

View File

@@ -2,7 +2,7 @@ use crate::utils::parceyaml::load_configuration;
use crate::utils::structs::Configuration;
use futures::channel::mpsc::Sender;
use futures::SinkExt;
use log::{error, info, warn};
use log::error;
use notify::event::ModifyKind;
use notify::{Config, Event, EventKind, RecommendedWatcher, RecursiveMode, Watcher};
use pingora::prelude::sleep;
@@ -15,19 +15,7 @@ pub async fn start(fp: String, mut toreturn: Sender<Configuration>) {
let file_path = fp.as_str();
let parent_dir = Path::new(file_path).parent().unwrap();
let (local_tx, mut local_rx) = tokio::sync::mpsc::channel::<notify::Result<Event>>(1);
let snd = load_configuration(file_path, "filepath").await;
match snd {
Some(snd) => {
if snd.typecfg != "file" {
warn!("Disabling file watcher, requested discovery type is: {}", snd.typecfg);
return;
}
info!("Watching for changes in {:?}", parent_dir);
toreturn.send(snd).await.unwrap();
}
None => {}
}
let _watcher_handle = task::spawn_blocking({
let parent_dir = parent_dir.to_path_buf(); // Move directory path into the closure
move || {

133
src/utils/kuber.rs Normal file
View File

@@ -0,0 +1,133 @@
// use crate::utils::dnsclient::DnsClientPool;
use crate::utils::structs::{Configuration, InnerMap, UpstreamsDashMap};
use crate::utils::tools::{clone_dashmap_into, compare_dashmaps, print_upstreams};
use dashmap::DashMap;
use futures::channel::mpsc::Sender;
use futures::SinkExt;
use pingora::prelude::sleep;
use rand::Rng;
use reqwest::Client;
use std::env;
use std::sync::atomic::AtomicUsize;
use std::sync::Arc;
use std::time::Duration;
use tokio::fs::File;
use tokio::io::AsyncReadExt;
// static KUBERNETES_SERVICE_HOST: &str = "140.238.122.18:6443";
// static TOKEN: &str = "eyJhbGciOiJSUzI1NiIsImtpZCI6InJXMzl2VFlHTDM4V0tBbWxNQnRhbnRVcDlvcS10MjRDVHNwc2p3d1ZXeDAifQ.eyJhdWQiOlsiaHR0cHM6Ly9rdWJlcm5ldGVzLmRlZmF1bHQuc3ZjLmNsdXN0ZXIubG9jYWwiXSwiZXhwIjoxNzg4MjY3OTgxLCJpYXQiOjE3NTY3MzE5ODEsImlzcyI6Imh0dHBzOi8va3ViZXJuZXRlcy5kZWZhdWx0LnN2Yy5jbHVzdGVyLmxvY2FsIiwianRpIjoiMDM4ZWVlODAtNGViZi00MjU5LWI0OTctYmYwNDgxMmYyNWJhIiwia3ViZXJuZXRlcy5pbyI6eyJuYW1lc3BhY2UiOiJzdGFnaW5nIiwibm9kZSI6eyJuYW1lIjoiMTcyLjE2LjAuMTU1IiwidWlkIjoiOGY2ZTk2ZjYtNDdjNS00YjZhLTkwMmUtYmZhZTUwNzcxMWFjIn0sInBvZCI6eyJuYW1lIjoiYXJhbGV6LTY3Njg5OGY5ZDUtaHpienEiLCJ1aWQiOiI2NjAyZDFjNy00ZWM2LTRiZDEtYTc3NS00NzI5OGYyMTc0N2QifSwic2VydmljZWFjY291bnQiOnsibmFtZSI6ImFyYWxlei1zYSIsInVpZCI6IjVjYmYxZTU2LTJjY2YtNDFlMS05OGU2LTc3NmY1ZWY1NGRkOCJ9LCJ3YXJuYWZ0ZXIiOjE3NTY3MzU1ODh9LCJuYmYiOjE3NTY3MzE5ODEsInN1YiI6InN5c3RlbTpzZXJ2aWNlYWNjb3VudDpzdGFnaW5nOmFyYWxlei1zYSJ9.fnlQoOMEztWx6aE4JNPCMVDVc4s8ygs1tvuU_4FoNTWnTipFivzV3duHAg5sHFnLexYEbeBzWqr1betk__ATfy5RB5UaDdg0kk2AdVjYhvUfW4FcIqPNzXBUd74BOUG8vhN6bhZTQC8Fh0eCLCn9XXwVGIO94LKi9LmLbFiDAhpQDGwbXYnI8kV4nNGRE3kf0fsb6SyHs_8bOGc-t2U6OPdAFqBsk4JliaqiXhJKsoc8JCfUkcxYkT7GqIZxFYpvgisbOdZL7_iVyLU1BiSPMHb0jFa4O60aZ8EzCR7Mio0F5A8eZjSCf_f90ecUCGFuW3eCTCDd02RutXeSyPqxhQ";
#[derive(Debug, serde::Deserialize)]
struct Endpoints {
subsets: Option<Vec<Subset>>,
}
#[derive(Debug, serde::Deserialize)]
struct Subset {
addresses: Option<Vec<Address>>,
ports: Option<Vec<Port>>,
}
#[derive(Debug, serde::Deserialize)]
struct Address {
ip: String,
}
#[derive(Debug, serde::Deserialize)]
struct Port {
// name: String,
port: u16,
}
pub async fn start(mut toreturn: Sender<Configuration>, config: Arc<Configuration>) {
let upstreams = UpstreamsDashMap::new();
let prev_upstreams = UpstreamsDashMap::new();
loop {
if let Some(kuber) = config.kubernetes.clone() {
let path = kuber.tokenpath.unwrap_or("/var/run/secrets/kubernetes.io/serviceaccount/token".to_string());
let token = read_token(path.as_str()).await;
let servers = kuber.servers.unwrap_or(vec![format!(
"{}:{}",
env::var("KUBERNETES_SERVICE_HOST").unwrap_or("0.0.0.0".to_string()),
env::var("KUBERNETES_SERVICE_PORT_HTTPS").unwrap_or("0".to_string())
)]);
let end = servers.len() - 1;
let mut num = 0;
if end > 0 {
num = rand::rng().random_range(0..end);
}
let server = servers.get(num).unwrap().to_string();
if let Some(svc) = kuber.services {
for i in svc {
let url = format!("https://{}/api/v1/namespaces/staging/endpoints/{}", server, i.real);
let list = get_by_http(&*url, &*token).await;
if let Some(list) = list {
upstreams.insert(i.proxy.clone(), list);
}
}
}
}
if !compare_dashmaps(&upstreams, &prev_upstreams) {
let tosend: Configuration = Configuration {
upstreams: Default::default(),
headers: config.headers.clone(),
consul: config.consul.clone(),
kubernetes: config.kubernetes.clone(),
typecfg: config.typecfg.clone(),
extraparams: config.extraparams.clone(),
};
clone_dashmap_into(&upstreams, &prev_upstreams);
clone_dashmap_into(&upstreams, &tosend.upstreams);
print_upstreams(&tosend.upstreams);
toreturn.send(tosend).await.unwrap();
}
sleep(Duration::from_secs(5)).await;
}
}
pub async fn get_by_http(url: &str, token: &str) -> Option<DashMap<String, (Vec<InnerMap>, AtomicUsize)>> {
let client = Client::builder().timeout(Duration::from_secs(2)).danger_accept_invalid_certs(true).build().ok()?;
let resp = client.get(url).bearer_auth(token).send().await.ok()?;
if !resp.status().is_success() {
eprintln!("Kubernetes API returned status: {}", resp.status());
return None;
}
let endpoints: Endpoints = resp.json().await.ok()?;
let upstreams: DashMap<String, (Vec<InnerMap>, AtomicUsize)> = DashMap::new();
if let Some(subsets) = endpoints.subsets {
for subset in subsets {
if let (Some(addresses), Some(ports)) = (subset.addresses, subset.ports) {
for addr in addresses {
let mut inner_vec = Vec::new();
for port in &ports {
let to_add = InnerMap {
address: addr.ip.clone(),
port: port.port.clone(),
is_ssl: false,
is_http2: false,
to_https: false,
rate_limit: None,
};
inner_vec.push(to_add);
}
upstreams.insert("/".to_string(), (inner_vec, AtomicUsize::new(0)));
}
}
}
}
Some(upstreams)
}
async fn read_token(path: &str) -> String {
let mut file = File::open(path).await.unwrap();
let mut contents = String::new();
file.read_to_string(&mut contents).await.unwrap();
contents.trim().to_string()
}

View File

@@ -52,13 +52,12 @@ pub async fn load_configuration(d: &str, kind: &str) -> Option<Configuration> {
}
"consul" => {
toreturn.consul = parsed.consul;
if toreturn.consul.is_some() {
Some(toreturn)
} else {
None
}
toreturn.consul.is_some().then_some(toreturn)
}
"kubernetes" => {
toreturn.kubernetes = parsed.kubernetes;
toreturn.kubernetes.is_some().then_some(toreturn)
}
"kubernetes" => None,
_ => {
warn!("Unknown provider {}", parsed.provider);
None

View File

@@ -21,6 +21,12 @@ pub struct Extraparams {
pub authentication: DashMap<String, Vec<String>>,
pub rate_limit: Option<isize>,
}
#[derive(Clone, Default, Debug, Serialize, Deserialize)]
pub struct Kubernetes {
pub servers: Option<Vec<String>>,
pub services: Option<Vec<ServiceMapping>>,
pub tokenpath: Option<String>,
}
#[derive(Clone, Default, Debug, Serialize, Deserialize)]
pub struct Consul {
@@ -44,6 +50,8 @@ pub struct Config {
#[serde(default)]
pub consul: Option<Consul>,
#[serde(default)]
pub kubernetes: Option<Kubernetes>,
#[serde(default)]
pub rate_limit: Option<isize>,
}
@@ -65,6 +73,7 @@ pub struct Configuration {
pub upstreams: UpstreamsDashMap,
pub headers: Headers,
pub consul: Option<Consul>,
pub kubernetes: Option<Kubernetes>,
pub typecfg: String,
pub extraparams: Extraparams,
}