From 703de9e90980a5f798a3820a9ff01ba94f7a71eb Mon Sep 17 00:00:00 2001 From: Ara Sadoyan Date: Thu, 22 Jan 2026 16:50:51 +0100 Subject: [PATCH] updates on API server https://sadoyan.github.io/aralez-docs/assets/api/ --- Cargo.lock | 1 + Cargo.toml | 1 + src/utils/discovery.rs | 18 +++++----- src/utils/structs.rs | 14 ++++++-- src/utils/tools.rs | 77 ++++++++++++++++++++++++++++++++++++++++-- src/web/bgservice.rs | 2 ++ src/web/webserver.rs | 49 ++++++++++++++++++++++----- 7 files changed, 141 insertions(+), 21 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 5a7ed0a..12076eb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -142,6 +142,7 @@ dependencies = [ "reqwest", "rustls-pemfile", "serde", + "serde_json", "serde_yaml 0.9.34+deprecated", "sha2", "tokio", diff --git a/Cargo.toml b/Cargo.toml index b3b9329..92e89f0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -49,6 +49,7 @@ once_cell = "1.21.3" privdrop = "0.5.6" ctrlc = "3.5.1" port_check = "0.3.0" +serde_json = "1.0.140" #moka = { version = "0.12.10", features = ["sync"] } #rustls = { version = "0.23.27", features = ["ring"] } #hickory-client = { version = "0.25.2" } diff --git a/src/utils/discovery.rs b/src/utils/discovery.rs index 405c25e..84016fb 100644 --- a/src/utils/discovery.rs +++ b/src/utils/discovery.rs @@ -1,6 +1,6 @@ use crate::utils::filewatch; use crate::utils::kuberconsul::{ConsulDiscovery, KubernetesDiscovery, ServiceDiscovery}; -use crate::utils::structs::Configuration; +use crate::utils::structs::{Configuration, UpstreamsDashMap}; use crate::web::webserver; use async_trait::async_trait; use futures::channel::mpsc::Sender; @@ -15,13 +15,8 @@ pub struct APIUpstreamProvider { pub tls_key_file: Option, pub file_server_address: Option, pub file_server_folder: Option, -} - -#[async_trait] -impl Discovery for APIUpstreamProvider { - async fn start(&self, toreturn: Sender) { - webserver::run_server(self, toreturn).await; - } + pub current_upstreams: Arc, + pub full_upstreams: Arc, } pub struct FromFileProvider { @@ -41,6 +36,13 @@ pub trait Discovery { async fn start(&self, tx: Sender); } +#[async_trait] +impl Discovery for APIUpstreamProvider { + async fn start(&self, toreturn: Sender) { + webserver::run_server(self, toreturn, self.current_upstreams.clone(), self.full_upstreams.clone()).await; + } +} + #[async_trait] impl Discovery for FromFileProvider { async fn start(&self, tx: Sender) { diff --git a/src/utils/structs.rs b/src/utils/structs.rs index 349e908..80d30d7 100644 --- a/src/utils/structs.rs +++ b/src/utils/structs.rs @@ -1,11 +1,12 @@ use dashmap::DashMap; use serde::{Deserialize, Serialize}; use std::collections::HashMap; -use std::sync::atomic::AtomicUsize; -pub type UpstreamsDashMap = DashMap>, AtomicUsize)>>; use std::net::IpAddr; +use std::sync::atomic::AtomicUsize; use std::sync::Arc; +pub type UpstreamsDashMap = DashMap>, AtomicUsize)>>; + pub type UpstreamsIdMap = DashMap>; pub type Headers = DashMap, Vec<(Arc, Arc)>>>; @@ -115,7 +116,7 @@ pub struct AppConfig { pub rungroup: Option, } -#[derive(Debug, Clone, PartialEq, Eq, Hash)] +#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize)] pub struct InnerMap { pub address: IpAddr, pub port: u16, @@ -140,3 +141,10 @@ impl InnerMap { } } } + +#[derive(Serialize)] +pub struct UpstreamSnapshot { + pub backends: Vec, + pub requests: usize, +} +// pub type UpstreamsSnapshot = HashMap>; diff --git a/src/utils/tools.rs b/src/utils/tools.rs index 8af730a..7facbb7 100644 --- a/src/utils/tools.rs +++ b/src/utils/tools.rs @@ -1,4 +1,4 @@ -use crate::utils::structs::{InnerMap, UpstreamsDashMap, UpstreamsIdMap}; +use crate::utils::structs::{InnerMap, UpstreamSnapshot, UpstreamsDashMap, UpstreamsIdMap}; use crate::utils::tls; use crate::utils::tls::CertificateConfig; use dashmap::DashMap; @@ -6,6 +6,7 @@ use log::{error, info}; use notify::{event::ModifyKind, Config, EventKind, RecommendedWatcher, RecursiveMode, Watcher}; use port_check::is_port_reachable; use privdrop::PrivDrop; +use serde_json::{json, Value}; use sha2::{Digest, Sha256}; use std::any::type_name; use std::collections::{HashMap, HashSet}; @@ -13,7 +14,7 @@ use std::fmt::Write; use std::net::SocketAddr; use std::os::unix::fs::MetadataExt; use std::str::FromStr; -use std::sync::atomic::AtomicUsize; +use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::mpsc::{channel, Sender}; use std::sync::Arc; use std::time::{Duration, Instant}; @@ -269,3 +270,75 @@ pub fn check_priv(addr: &str) { false => {} } } + +#[allow(dead_code)] +pub fn upstreams_to_json(upstreams: &UpstreamsDashMap) -> serde_json::Result { + let mut outer = HashMap::new(); + + for outer_entry in upstreams.iter() { + let mut inner_map = HashMap::new(); + + for inner_entry in outer_entry.value().iter() { + let (backends, counter) = inner_entry.value(); + + inner_map.insert( + inner_entry.key().clone(), + UpstreamSnapshot { + backends: backends.iter().map(|a| (**a).clone()).collect(), + requests: counter.load(Ordering::Relaxed), + }, + ); + } + + outer.insert(outer_entry.key().clone(), inner_map); + } + + // serde_json::to_string_pretty(&outer) + serde_json::to_string(&outer) +} + +pub fn upstreams_liveness_json(configured: &UpstreamsDashMap, current: &UpstreamsDashMap) -> Value { + let mut result = serde_json::Map::new(); + + for host_entry in configured.iter() { + let hostname = host_entry.key().clone(); + let configured_paths = host_entry.value(); + + let mut paths_json = serde_json::Map::new(); + + for path_entry in configured_paths.iter() { + let path = path_entry.key().clone(); + let (configured_backends, _) = path_entry.value(); + let backends_json: Vec = configured_backends + .iter() + .map(|backend| { + let alive = if let Some(host_map) = current.get(&hostname) { + if let Some(path_entry) = host_map.get(&path) { + let list = &path_entry.value().0; // Vec> + list.iter().any(|b| b.address == backend.address && b.port == backend.port) + } else { + false + } + } else { + false + }; + json!({ + "address": backend.address, + "port": backend.port, + "alive": alive + }) + }) + .collect(); + + paths_json.insert( + path, + json!({ + "backends": backends_json + }), + ); + } + + result.insert(hostname, Value::Object(paths_json)); + } + Value::Object(result) +} diff --git a/src/web/bgservice.rs b/src/web/bgservice.rs index a57ab04..8d58b06 100644 --- a/src/web/bgservice.rs +++ b/src/web/bgservice.rs @@ -59,6 +59,8 @@ impl BackgroundService for LB { tls_key_file: self.config.config_tls_key_file.clone(), file_server_address: self.config.file_server_address.clone(), file_server_folder: self.config.file_server_folder.clone(), + current_upstreams: self.ump_upst.clone(), + full_upstreams: self.ump_full.clone(), }; // let tx_api = tx.clone(); let _ = tokio::spawn(async move { api_load.start(tx_api).await }); diff --git a/src/web/webserver.rs b/src/web/webserver.rs index 23acebf..c3b6ee1 100644 --- a/src/web/webserver.rs +++ b/src/web/webserver.rs @@ -1,5 +1,6 @@ use crate::utils::discovery::APIUpstreamProvider; -use crate::utils::structs::{Config, Configuration}; +use crate::utils::structs::{Config, Configuration, UpstreamsDashMap}; +use crate::utils::tools::{upstreams_liveness_json, upstreams_to_json}; use axum::body::Body; use axum::extract::{Query, State}; use axum::http::{Response, StatusCode}; @@ -15,6 +16,7 @@ use prometheus::{gather, Encoder, TextEncoder}; use serde::{Deserialize, Serialize}; use std::collections::HashMap; use std::net::SocketAddr; +use std::sync::Arc; use std::time::{Duration, SystemTime, UNIX_EPOCH}; use tokio::net::TcpListener; use tower_http::services::ServeDir; @@ -36,16 +38,19 @@ struct AppState { master_key: String, config_sender: Sender, config_api_enabled: bool, + current_upstreams: Arc, + full_upstreams: Arc, } #[allow(unused_mut)] -pub async fn run_server(config: &APIUpstreamProvider, mut to_return: Sender) { +pub async fn run_server(config: &APIUpstreamProvider, mut to_return: Sender, upstreams_curr: Arc, upstreams_full: Arc) { let app_state = AppState { master_key: config.masterkey.clone(), config_sender: to_return.clone(), config_api_enabled: config.config_api_enabled.clone(), + current_upstreams: upstreams_curr, + full_upstreams: upstreams_full, }; - let app = Router::new() // .route("/{*wildcard}", get(senderror)) // .route("/{*wildcard}", post(senderror)) @@ -56,6 +61,7 @@ pub async fn run_server(config: &APIUpstreamProvider, mut to_return: Sender impl IntoResponse { .body(Body::from(format!("Failed to encode metrics: {}", e))) .unwrap(); } - Response::builder() .status(StatusCode::OK) .header("Content-Type", encoder.format_type()) @@ -162,7 +167,35 @@ async fn metrics() -> impl IntoResponse { .unwrap() } -// #[allow(dead_code)] -// async fn senderror() -> impl IntoResponse { -// Response::builder().status(StatusCode::BAD_GATEWAY).body(Body::from("No live upstream found!\n")).unwrap() -// } +async fn status(State(st): State, Query(params): Query>) -> impl IntoResponse { + if let Some(_) = params.get("live") { + let r = upstreams_liveness_json(&st.full_upstreams, &st.current_upstreams); + return Response::builder() + .status(StatusCode::OK) + .header("Content-Type", "application/json") + .body(Body::from(format!("{}", r))) + .unwrap(); + } + if let Some(_) = params.get("all") { + let resp = upstreams_to_json(&st.current_upstreams); + match resp { + Ok(j) => { + return Response::builder() + .status(StatusCode::OK) + .header("Content-Type", "application/json") + .body(Body::from(j)) + .unwrap() + } + Err(e) => { + return Response::builder() + .status(StatusCode::INTERNAL_SERVER_ERROR) + .body(Body::from(format!("Failed to get status: {}", e))) + .unwrap(); + } + } + } + Response::builder() + .status(StatusCode::INTERNAL_SERVER_ERROR) + .body(Body::from(format!("Parameter mismatch"))) + .unwrap() +}