This commit is contained in:
Ara Sadoyan
2026-01-22 16:50:51 +01:00
parent 2c8b01295c
commit 703de9e909
7 changed files with 141 additions and 21 deletions

1
Cargo.lock generated
View File

@@ -142,6 +142,7 @@ dependencies = [
"reqwest",
"rustls-pemfile",
"serde",
"serde_json",
"serde_yaml 0.9.34+deprecated",
"sha2",
"tokio",

View File

@@ -49,6 +49,7 @@ once_cell = "1.21.3"
privdrop = "0.5.6"
ctrlc = "3.5.1"
port_check = "0.3.0"
serde_json = "1.0.140"
#moka = { version = "0.12.10", features = ["sync"] }
#rustls = { version = "0.23.27", features = ["ring"] }
#hickory-client = { version = "0.25.2" }

View File

@@ -1,6 +1,6 @@
use crate::utils::filewatch;
use crate::utils::kuberconsul::{ConsulDiscovery, KubernetesDiscovery, ServiceDiscovery};
use crate::utils::structs::Configuration;
use crate::utils::structs::{Configuration, UpstreamsDashMap};
use crate::web::webserver;
use async_trait::async_trait;
use futures::channel::mpsc::Sender;
@@ -15,13 +15,8 @@ pub struct APIUpstreamProvider {
pub tls_key_file: Option<String>,
pub file_server_address: Option<String>,
pub file_server_folder: Option<String>,
}
#[async_trait]
impl Discovery for APIUpstreamProvider {
async fn start(&self, toreturn: Sender<Configuration>) {
webserver::run_server(self, toreturn).await;
}
pub current_upstreams: Arc<UpstreamsDashMap>,
pub full_upstreams: Arc<UpstreamsDashMap>,
}
pub struct FromFileProvider {
@@ -41,6 +36,13 @@ pub trait Discovery {
async fn start(&self, tx: Sender<Configuration>);
}
#[async_trait]
impl Discovery for APIUpstreamProvider {
async fn start(&self, toreturn: Sender<Configuration>) {
webserver::run_server(self, toreturn, self.current_upstreams.clone(), self.full_upstreams.clone()).await;
}
}
#[async_trait]
impl Discovery for FromFileProvider {
async fn start(&self, tx: Sender<Configuration>) {

View File

@@ -1,11 +1,12 @@
use dashmap::DashMap;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::sync::atomic::AtomicUsize;
pub type UpstreamsDashMap = DashMap<String, DashMap<String, (Vec<Arc<InnerMap>>, AtomicUsize)>>;
use std::net::IpAddr;
use std::sync::atomic::AtomicUsize;
use std::sync::Arc;
pub type UpstreamsDashMap = DashMap<String, DashMap<String, (Vec<Arc<InnerMap>>, AtomicUsize)>>;
pub type UpstreamsIdMap = DashMap<String, Arc<InnerMap>>;
pub type Headers = DashMap<String, DashMap<Arc<str>, Vec<(Arc<str>, Arc<str>)>>>;
@@ -115,7 +116,7 @@ pub struct AppConfig {
pub rungroup: Option<String>,
}
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize)]
pub struct InnerMap {
pub address: IpAddr,
pub port: u16,
@@ -140,3 +141,10 @@ impl InnerMap {
}
}
}
#[derive(Serialize)]
pub struct UpstreamSnapshot {
pub backends: Vec<InnerMap>,
pub requests: usize,
}
// pub type UpstreamsSnapshot = HashMap<String, HashMap<String, UpstreamSnapshot>>;

View File

@@ -1,4 +1,4 @@
use crate::utils::structs::{InnerMap, UpstreamsDashMap, UpstreamsIdMap};
use crate::utils::structs::{InnerMap, UpstreamSnapshot, UpstreamsDashMap, UpstreamsIdMap};
use crate::utils::tls;
use crate::utils::tls::CertificateConfig;
use dashmap::DashMap;
@@ -6,6 +6,7 @@ use log::{error, info};
use notify::{event::ModifyKind, Config, EventKind, RecommendedWatcher, RecursiveMode, Watcher};
use port_check::is_port_reachable;
use privdrop::PrivDrop;
use serde_json::{json, Value};
use sha2::{Digest, Sha256};
use std::any::type_name;
use std::collections::{HashMap, HashSet};
@@ -13,7 +14,7 @@ use std::fmt::Write;
use std::net::SocketAddr;
use std::os::unix::fs::MetadataExt;
use std::str::FromStr;
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::mpsc::{channel, Sender};
use std::sync::Arc;
use std::time::{Duration, Instant};
@@ -269,3 +270,75 @@ pub fn check_priv(addr: &str) {
false => {}
}
}
#[allow(dead_code)]
pub fn upstreams_to_json(upstreams: &UpstreamsDashMap) -> serde_json::Result<String> {
let mut outer = HashMap::new();
for outer_entry in upstreams.iter() {
let mut inner_map = HashMap::new();
for inner_entry in outer_entry.value().iter() {
let (backends, counter) = inner_entry.value();
inner_map.insert(
inner_entry.key().clone(),
UpstreamSnapshot {
backends: backends.iter().map(|a| (**a).clone()).collect(),
requests: counter.load(Ordering::Relaxed),
},
);
}
outer.insert(outer_entry.key().clone(), inner_map);
}
// serde_json::to_string_pretty(&outer)
serde_json::to_string(&outer)
}
pub fn upstreams_liveness_json(configured: &UpstreamsDashMap, current: &UpstreamsDashMap) -> Value {
let mut result = serde_json::Map::new();
for host_entry in configured.iter() {
let hostname = host_entry.key().clone();
let configured_paths = host_entry.value();
let mut paths_json = serde_json::Map::new();
for path_entry in configured_paths.iter() {
let path = path_entry.key().clone();
let (configured_backends, _) = path_entry.value();
let backends_json: Vec<Value> = configured_backends
.iter()
.map(|backend| {
let alive = if let Some(host_map) = current.get(&hostname) {
if let Some(path_entry) = host_map.get(&path) {
let list = &path_entry.value().0; // Vec<Arc<InnerMap>>
list.iter().any(|b| b.address == backend.address && b.port == backend.port)
} else {
false
}
} else {
false
};
json!({
"address": backend.address,
"port": backend.port,
"alive": alive
})
})
.collect();
paths_json.insert(
path,
json!({
"backends": backends_json
}),
);
}
result.insert(hostname, Value::Object(paths_json));
}
Value::Object(result)
}

View File

@@ -59,6 +59,8 @@ impl BackgroundService for LB {
tls_key_file: self.config.config_tls_key_file.clone(),
file_server_address: self.config.file_server_address.clone(),
file_server_folder: self.config.file_server_folder.clone(),
current_upstreams: self.ump_upst.clone(),
full_upstreams: self.ump_full.clone(),
};
// let tx_api = tx.clone();
let _ = tokio::spawn(async move { api_load.start(tx_api).await });

View File

@@ -1,5 +1,6 @@
use crate::utils::discovery::APIUpstreamProvider;
use crate::utils::structs::{Config, Configuration};
use crate::utils::structs::{Config, Configuration, UpstreamsDashMap};
use crate::utils::tools::{upstreams_liveness_json, upstreams_to_json};
use axum::body::Body;
use axum::extract::{Query, State};
use axum::http::{Response, StatusCode};
@@ -15,6 +16,7 @@ use prometheus::{gather, Encoder, TextEncoder};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::net::SocketAddr;
use std::sync::Arc;
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use tokio::net::TcpListener;
use tower_http::services::ServeDir;
@@ -36,16 +38,19 @@ struct AppState {
master_key: String,
config_sender: Sender<Configuration>,
config_api_enabled: bool,
current_upstreams: Arc<UpstreamsDashMap>,
full_upstreams: Arc<UpstreamsDashMap>,
}
#[allow(unused_mut)]
pub async fn run_server(config: &APIUpstreamProvider, mut to_return: Sender<Configuration>) {
pub async fn run_server(config: &APIUpstreamProvider, mut to_return: Sender<Configuration>, upstreams_curr: Arc<UpstreamsDashMap>, upstreams_full: Arc<UpstreamsDashMap>) {
let app_state = AppState {
master_key: config.masterkey.clone(),
config_sender: to_return.clone(),
config_api_enabled: config.config_api_enabled.clone(),
current_upstreams: upstreams_curr,
full_upstreams: upstreams_full,
};
let app = Router::new()
// .route("/{*wildcard}", get(senderror))
// .route("/{*wildcard}", post(senderror))
@@ -56,6 +61,7 @@ pub async fn run_server(config: &APIUpstreamProvider, mut to_return: Sender<Conf
.route("/jwt", post(jwt_gen))
.route("/conf", post(conf))
.route("/metrics", get(metrics))
.route("/status", get(status))
.with_state(app_state);
if let Some(value) = &config.tls_address {
@@ -154,7 +160,6 @@ async fn metrics() -> impl IntoResponse {
.body(Body::from(format!("Failed to encode metrics: {}", e)))
.unwrap();
}
Response::builder()
.status(StatusCode::OK)
.header("Content-Type", encoder.format_type())
@@ -162,7 +167,35 @@ async fn metrics() -> impl IntoResponse {
.unwrap()
}
// #[allow(dead_code)]
// async fn senderror() -> impl IntoResponse {
// Response::builder().status(StatusCode::BAD_GATEWAY).body(Body::from("No live upstream found!\n")).unwrap()
// }
async fn status(State(st): State<AppState>, Query(params): Query<HashMap<String, String>>) -> impl IntoResponse {
if let Some(_) = params.get("live") {
let r = upstreams_liveness_json(&st.full_upstreams, &st.current_upstreams);
return Response::builder()
.status(StatusCode::OK)
.header("Content-Type", "application/json")
.body(Body::from(format!("{}", r)))
.unwrap();
}
if let Some(_) = params.get("all") {
let resp = upstreams_to_json(&st.current_upstreams);
match resp {
Ok(j) => {
return Response::builder()
.status(StatusCode::OK)
.header("Content-Type", "application/json")
.body(Body::from(j))
.unwrap()
}
Err(e) => {
return Response::builder()
.status(StatusCode::INTERNAL_SERVER_ERROR)
.body(Body::from(format!("Failed to get status: {}", e)))
.unwrap();
}
}
}
Response::builder()
.status(StatusCode::INTERNAL_SERVER_ERROR)
.body(Body::from(format!("Parameter mismatch")))
.unwrap()
}