5 Commits

Author SHA1 Message Date
Ara Sadoyan
e07791e1f0 Removed futures dependency, migrated to tokio::sync::mpsc .
closed #45
2026-06-30 13:34:47 +02:00
Ara Sadoyan
7b952b467c Removed futures dependency, migrated to tokio::sync::mpsc .
closed #45
2026-06-30 13:34:08 +02:00
Ara Sadoyan
9a1e8fe149 Removed futures dependency, migrated to tokio::sync::mpsc .
closed #45
2026-06-30 12:27:49 +02:00
Ara Sadoyan
874ce4fddb Implementing full async access log inspired by #45 2026-06-29 21:35:10 +02:00
Ara Sadoyan
99c7a811da Header parsing broken with rsplit_once in populate_headers_and_auth and build_headers
closed #48
2026-06-27 21:34:13 +02:00
11 changed files with 69 additions and 49 deletions

3
Cargo.lock generated
View File

@@ -127,7 +127,7 @@ checksum = "7f202df86484c868dbad7eaa557ef785d5c66295e41b460ef922eca0723b842c"
[[package]] [[package]]
name = "aralez" name = "aralez"
version = "0.92.12" version = "0.92.13"
dependencies = [ dependencies = [
"ahash", "ahash",
"arc-swap", "arc-swap",
@@ -136,7 +136,6 @@ dependencies = [
"base16ct 1.0.0", "base16ct 1.0.0",
"base64", "base64",
"dashmap", "dashmap",
"futures",
"instant-acme", "instant-acme",
"jsonwebtoken", "jsonwebtoken",
"libc", "libc",

View File

@@ -1,6 +1,6 @@
[package] [package]
name = "aralez" name = "aralez"
version = "0.92.12" version = "0.92.13"
edition = "2021" edition = "2021"
license = "Apache-2.0" license = "Apache-2.0"
description = "Reverse proxy built on top of Cloudflare's Pingora" description = "Reverse proxy built on top of Cloudflare's Pingora"
@@ -24,7 +24,6 @@ pingora-limits = "0.8.1"
dashmap = "7.0.0-rc2" dashmap = "7.0.0-rc2"
async-trait = "0.1.89" async-trait = "0.1.89"
log = "0.4.30" log = "0.4.30"
futures = "0.3.32"
notify = "9.0.0-rc.4" notify = "9.0.0-rc.4"
axum = { version = "0.8.9" } axum = { version = "0.8.9" }
reqwest = { version = "0.13.4", features = ["json", "stream", "blocking"] } reqwest = { version = "0.13.4", features = ["json", "stream", "blocking"] }

View File

@@ -3,8 +3,8 @@ use crate::utils::kuberconsul::{ConsulDiscovery, KubernetesDiscovery, ServiceDis
use crate::utils::structs::{Configuration, UpstreamsDashMap}; use crate::utils::structs::{Configuration, UpstreamsDashMap};
use crate::web::webserver; use crate::web::webserver;
use async_trait::async_trait; use async_trait::async_trait;
use futures::channel::mpsc::Sender;
use std::sync::Arc; use std::sync::Arc;
use tokio::sync::mpsc::Sender;
pub struct APIUpstreamProvider { pub struct APIUpstreamProvider {
pub config_api_enabled: bool, pub config_api_enabled: bool,
@@ -46,7 +46,7 @@ impl Discovery for APIUpstreamProvider {
#[async_trait] #[async_trait]
impl Discovery for FromFileProvider { impl Discovery for FromFileProvider {
async fn start(&self, tx: Sender<Configuration>) { async fn start(&self, tx: Sender<Configuration>) {
tokio::spawn(filewatch::start(self.path.clone(), tx.clone())); tokio::spawn(filewatch::start(self.path.clone(), tx));
} }
} }

View File

@@ -1,16 +1,15 @@
use crate::utils::parceyaml::load_configuration; use crate::utils::parceyaml::load_configuration;
use crate::utils::structs::Configuration; use crate::utils::structs::Configuration;
use futures::channel::mpsc::Sender;
use futures::SinkExt;
use log::error; use log::error;
use notify::event::ModifyKind; use notify::event::ModifyKind;
use notify::{Config, Event, EventKind, RecommendedWatcher, RecursiveMode, Watcher}; use notify::{Config, Event, EventKind, RecommendedWatcher, RecursiveMode, Watcher};
use pingora::prelude::sleep; use pingora::prelude::sleep;
use std::path::Path; use std::path::Path;
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
use tokio::sync::mpsc::Sender;
use tokio::task; use tokio::task;
pub async fn start(fp: String, mut toreturn: Sender<Configuration>) { pub async fn start(fp: String, toreturn: Sender<Configuration>) {
sleep(Duration::from_millis(50)).await; // For having nice logs :-) sleep(Duration::from_millis(50)).await; // For having nice logs :-)
let file_path = fp.as_str(); let file_path = fp.as_str();
let parent_dir = Path::new(file_path).parent().unwrap(); let parent_dir = Path::new(file_path).parent().unwrap();

View File

@@ -4,8 +4,6 @@ use crate::utils::structs::{Configuration, GlobalServiceMapping, InnerMap, Upstr
use crate::utils::tools::{clone_dashmap_into, compare_dashmaps, print_upstreams}; use crate::utils::tools::{clone_dashmap_into, compare_dashmaps, print_upstreams};
use async_trait::async_trait; use async_trait::async_trait;
use dashmap::DashMap; use dashmap::DashMap;
use futures::channel::mpsc::Sender;
use futures::SinkExt;
use pingora::prelude::sleep; use pingora::prelude::sleep;
use rand::RngExt; use rand::RngExt;
use serde::Deserialize; use serde::Deserialize;
@@ -18,6 +16,7 @@ use std::sync::Arc;
use std::time::Duration; use std::time::Duration;
use tokio::fs::File; use tokio::fs::File;
use tokio::io::AsyncReadExt; use tokio::io::AsyncReadExt;
use tokio::sync::mpsc::Sender;
#[derive(Debug, serde::Deserialize)] #[derive(Debug, serde::Deserialize)]
pub struct KubeEndpoints { pub struct KubeEndpoints {
@@ -95,7 +94,7 @@ pub struct ConsulDiscovery;
#[async_trait] #[async_trait]
impl ServiceDiscovery for KubernetesDiscovery { impl ServiceDiscovery for KubernetesDiscovery {
async fn fetch_upstreams(&self, config: Arc<Configuration>, mut toreturn: Sender<Configuration>) { async fn fetch_upstreams(&self, config: Arc<Configuration>, toreturn: Sender<Configuration>) {
let prev_upstreams = UpstreamsDashMap::new(); let prev_upstreams = UpstreamsDashMap::new();
if let Some(kuber) = config.kubernetes.clone() { if let Some(kuber) = config.kubernetes.clone() {
@@ -162,7 +161,7 @@ fn get_current_namespace() -> Option<String> {
#[async_trait] #[async_trait]
impl ServiceDiscovery for ConsulDiscovery { impl ServiceDiscovery for ConsulDiscovery {
async fn fetch_upstreams(&self, config: Arc<Configuration>, mut toreturn: Sender<Configuration>) { async fn fetch_upstreams(&self, config: Arc<Configuration>, toreturn: Sender<Configuration>) {
let prev_upstreams = UpstreamsDashMap::new(); let prev_upstreams = UpstreamsDashMap::new();
loop { loop {
let upstreams = UpstreamsDashMap::new(); let upstreams = UpstreamsDashMap::new();

View File

@@ -15,6 +15,7 @@ pub struct MetricTypes {
} }
pub static OPEN_FILES: LazyLock<IntGauge> = LazyLock::new(|| register_int_gauge!("aralez_open_files", "Number of open file descriptors").unwrap()); pub static OPEN_FILES: LazyLock<IntGauge> = LazyLock::new(|| register_int_gauge!("aralez_open_files", "Number of open file descriptors").unwrap());
pub static LOGGING_ERRORS: LazyLock<IntGauge> = LazyLock::new(|| register_int_gauge!("aralez_logging_errors", "Number of log errors").unwrap());
pub static MEMORY_USAGE: LazyLock<IntGauge> = LazyLock::new(|| register_int_gauge!("aralez_memory_bytes", "Total memory allocated in bytes").unwrap()); pub static MEMORY_USAGE: LazyLock<IntGauge> = LazyLock::new(|| register_int_gauge!("aralez_memory_bytes", "Total memory allocated in bytes").unwrap());
pub static ACTIVE_SESSIONS: LazyLock<IntGauge> = LazyLock::new(|| register_int_gauge!("aralez_active_sessions", "Current number of active sessions").unwrap()); pub static ACTIVE_SESSIONS: LazyLock<IntGauge> = LazyLock::new(|| register_int_gauge!("aralez_active_sessions", "Current number of active sessions").unwrap());
pub static REQUEST_COUNT: LazyLock<IntCounter> = LazyLock::new(|| register_int_counter!("aralez_requests_total", "Total number of requests handled by Aralez").unwrap()); pub static REQUEST_COUNT: LazyLock<IntCounter> = LazyLock::new(|| register_int_counter!("aralez_requests_total", "Total number of requests handled by Aralez").unwrap());
@@ -22,15 +23,6 @@ pub static REQUEST_COUNT: LazyLock<IntCounter> = LazyLock::new(|| register_int_c
pub static RESPONSE_CODES: LazyLock<IntCounterVec> = pub static RESPONSE_CODES: LazyLock<IntCounterVec> =
LazyLock::new(|| register_int_counter_vec!("aralez_responses_total", "Responses grouped by status code", &["status"]).unwrap()); LazyLock::new(|| register_int_counter_vec!("aralez_responses_total", "Responses grouped by status code", &["status"]).unwrap());
// pub static RESPONSE_LATENCY: LazyLock<Histogram> = LazyLock::new(|| {
// register_histogram!(
// "aralez_response_latency_seconds",
// "Response latency in seconds",
// vec![0.01, 0.05, 0.1, 0.25, 0.5, 1.0, 2.0, 5.0]
// )
// .unwrap()
// });
pub static RESPONSE_LATENCY: LazyLock<Histogram> = LazyLock::new(|| { pub static RESPONSE_LATENCY: LazyLock<Histogram> = LazyLock::new(|| {
register_histogram!( register_histogram!(
"aralez_response_latency_seconds", "aralez_response_latency_seconds",

View File

@@ -157,7 +157,7 @@ async fn populate_headers_and_auth(config: &mut Configuration, parsed: &Config)
let mut ch: Vec<(String, Arc<str>)> = Vec::new(); let mut ch: Vec<(String, Arc<str>)> = Vec::new();
if let Some(headers) = &parsed.client_headers { if let Some(headers) = &parsed.client_headers {
for header in headers { for header in headers {
if let Some((key, val)) = header.rsplit_once(':') { if let Some((key, val)) = header.split_once(':') {
ch.push((key.to_string(), Arc::from(val))); ch.push((key.to_string(), Arc::from(val)));
} }
} }
@@ -170,7 +170,7 @@ async fn populate_headers_and_auth(config: &mut Configuration, parsed: &Config)
let mut sh: Vec<(String, Arc<str>)> = Vec::new(); let mut sh: Vec<(String, Arc<str>)> = Vec::new();
if let Some(headers) = &parsed.server_headers { if let Some(headers) = &parsed.server_headers {
for header in headers { for header in headers {
if let Some((key, val)) = header.rsplit_once(':') { if let Some((key, val)) = header.split_once(':') {
sh.push((key.to_string(), Arc::from(val.trim()))); sh.push((key.to_string(), Arc::from(val.trim())));
} }
} }
@@ -321,7 +321,7 @@ fn parce_tls_grades(what: Option<String>) -> Option<String> {
pub fn build_headers(path_config: &Option<Vec<String>>, _config: &Configuration, hl: &mut Vec<(String, Arc<str>)>) { pub fn build_headers(path_config: &Option<Vec<String>>, _config: &Configuration, hl: &mut Vec<(String, Arc<str>)>) {
if let Some(headers) = &path_config { if let Some(headers) = &path_config {
for header in headers { for header in headers {
if let Some((key, val)) = header.rsplit_once(':') { if let Some((key, val)) = header.split_once(':') {
hl.push((key.trim().to_string(), Arc::from(val.trim()))); hl.push((key.trim().to_string(), Arc::from(val.trim())));
} }
} }

View File

@@ -4,21 +4,21 @@ use crate::utils::parceyaml::load_configuration;
use crate::utils::structs::Configuration; use crate::utils::structs::Configuration;
use crate::utils::tools::*; use crate::utils::tools::*;
use crate::utils::*; use crate::utils::*;
use crate::web::logging::init_logging;
use crate::web::proxyhttp::LB; use crate::web::proxyhttp::LB;
use async_trait::async_trait; use async_trait::async_trait;
use dashmap::DashMap; use dashmap::DashMap;
use futures::channel::mpsc;
use futures::{SinkExt, StreamExt};
use log::{error, info}; use log::{error, info};
use pingora_core::server::ShutdownWatch; use pingora_core::server::ShutdownWatch;
use pingora_core::services::background::BackgroundService; use pingora_core::services::background::BackgroundService;
use std::sync::Arc; use std::sync::Arc;
use tokio::sync::mpsc;
#[async_trait] #[async_trait]
impl BackgroundService for LB { impl BackgroundService for LB {
async fn start(&self, mut shutdown: ShutdownWatch) { async fn start(&self, mut shutdown: ShutdownWatch) {
info!("Starting background service"); // tx: Sender<Configuration> info!("Starting background service"); // tx: Sender<Configuration>
let (mut tx, mut rx) = mpsc::channel::<Configuration>(1); let (tx, mut rx) = mpsc::channel::<Configuration>(1);
let tx_api = tx.clone(); let tx_api = tx.clone();
let config = load_configuration(self.config.upstreams_conf.clone().as_str(), "filepath") let config = load_configuration(self.config.upstreams_conf.clone().as_str(), "filepath")
.await .await
@@ -77,13 +77,13 @@ impl BackgroundService for LB {
healthcheck::hc2(uu, ff, im, (&*hc_method.to_string(), hc_interval.to_string().parse().unwrap())).await healthcheck::hc2(uu, ff, im, (&*hc_method.to_string(), hc_interval.to_string().parse().unwrap())).await
})); }));
drop(tokio::spawn(async move { refresh_order(certdir, confdir).await })); drop(tokio::spawn(async move { refresh_order(certdir, confdir).await }));
init_logging(self.config.access_log.clone());
loop { loop {
tokio::select! { tokio::select! {
_ = shutdown.changed() => { _ = shutdown.changed() => {
break; break;
} }
val = rx.next() => { val = rx.recv() => {
if let Some(ss) = val { if let Some(ss) = val {
clone_dashmap_into(&ss.upstreams, &self.ump_full); clone_dashmap_into(&ss.upstreams, &self.ump_full);
clone_dashmap_into(&ss.upstreams, &self.ump_upst); clone_dashmap_into(&ss.upstreams, &self.ump_upst);

View File

@@ -1,15 +1,29 @@
use crate::utils::metrics::LOGGING_ERRORS;
use log::info; use log::info;
use pingora_http::Version;
use pingora_proxy::Session; use pingora_proxy::Session;
use std::net::{IpAddr, Ipv4Addr}; use std::net::{IpAddr, Ipv4Addr};
use std::sync::OnceLock; use std::sync::OnceLock;
use tokio::sync::mpsc;
pub static ACCESS_LOG: OnceLock<LogLevel> = OnceLock::new(); #[derive(Debug)]
pub struct LogMessage {
pub response_code: u16,
pub summary: String,
pub client_ip: IpAddr,
pub version: Version,
pub user_agent: String,
}
static LOG_SENDER: OnceLock<mpsc::Sender<LogMessage>> = OnceLock::new();
static ACCESS_LOG: OnceLock<LogLevel> = OnceLock::new();
const LOG_BUFFER: usize = 16384;
pub fn init_access_log(level_str: &str) { pub fn init_access_log(level_str: &str) {
let level = LogLevel::from_str(level_str); let level = LogLevel::from_str(level_str);
let _ = ACCESS_LOG.set(level); let _ = ACCESS_LOG.set(level);
} }
#[derive(Debug)]
pub enum LogLevel { pub enum LogLevel {
Access, Access,
Error, Error,
@@ -47,11 +61,37 @@ pub fn access_log(response_code: u16, summary: &str, session: &Session) {
let user_agent = session.req_header().headers.get("user-agent").and_then(|v| v.to_str().ok()).unwrap_or("-"); let user_agent = session.req_header().headers.get("user-agent").and_then(|v| v.to_str().ok()).unwrap_or("-");
let log = LogMessage {
response_code,
summary: summary.to_owned(),
client_ip: ip,
version: session.req_header().version,
user_agent: user_agent.to_owned(),
};
if let Some(sender) = LOG_SENDER.get() {
let sender = sender;
if let Err(_) = sender.try_send(log) {
LOGGING_ERRORS.inc();
}
}
}
pub fn init_logging(enabled: Option<String>) {
if let Some(_) = enabled {
LOGGING_ERRORS.set(0);
info!("Enabling {:?} log, with buffer of {} messages", ACCESS_LOG.get().unwrap_or(&LogLevel::None), LOG_BUFFER);
let (ltx, lrx) = mpsc::channel(LOG_BUFFER);
LOG_SENDER.set(ltx).unwrap();
std::thread::spawn(move || log_receiver(lrx));
}
}
pub fn log_receiver(mut receiver: mpsc::Receiver<LogMessage>) {
while let Some(msg) = receiver.blocking_recv() {
info!( info!(
"{}, response code: {response_code}, client: {}, version: {:?}, useragent: {}", "{}, {}, client: {}, version: {:?}, useragent: {}",
summary, msg.response_code, msg.summary, msg.client_ip, msg.version, msg.user_agent,
ip,
session.req_header().version,
user_agent,
); );
} }
}

View File

@@ -95,9 +95,6 @@ impl ProxyHttp for LB {
let header = ResponseHeader::build(429, None)?; let header = ResponseHeader::build(429, None)?;
session.set_keepalive(None); session.set_keepalive(None);
session.write_response_header(Box::new(header), true).await?; session.write_response_header(Box::new(header), true).await?;
// if let (Some(oi), Some(oa)) = (&_ctx.hostname, rate_key) {
// warn!("Limit 4XX: {}-rps exceed on {} from {} path {}", rate, oi, oa, session.req_header().uri.path());
// }
return Ok(true); return Ok(true);
} }
} }
@@ -109,9 +106,6 @@ impl ProxyHttp for LB {
let header = ResponseHeader::build(429, None)?; let header = ResponseHeader::build(429, None)?;
session.set_keepalive(None); session.set_keepalive(None);
session.write_response_header(Box::new(header), true).await?; session.write_response_header(Box::new(header), true).await?;
// if let (Some(oi), Some(oa)) = (&_ctx.hostname, rate_key) {
// warn!("Limit: {}-rps exceed on {} from {}", rate, oi, oa);
// }
return Ok(true); return Ok(true);
} }
} }

View File

@@ -10,8 +10,6 @@ use axum::http::{Response, StatusCode};
use axum::response::IntoResponse; use axum::response::IntoResponse;
use axum::routing::{any, get, post}; use axum::routing::{any, get, post};
use axum::{Json, Router}; use axum::{Json, Router};
use futures::channel::mpsc::Sender;
use futures::SinkExt;
use jsonwebtoken::{encode, EncodingKey, Header}; use jsonwebtoken::{encode, EncodingKey, Header};
use log::{debug, error, info, warn}; use log::{debug, error, info, warn};
use prometheus::{gather, Encoder, TextEncoder}; use prometheus::{gather, Encoder, TextEncoder};
@@ -37,14 +35,14 @@ pub(crate) struct AppState {
pub(crate) cert_creds: String, pub(crate) cert_creds: String,
pub(crate) certs_dir: String, pub(crate) certs_dir: String,
upstreams_file: String, upstreams_file: String,
config_sender: Sender<Configuration>, config_sender: mpsc::Sender<Configuration>,
config_api_enabled: bool, config_api_enabled: bool,
current_upstreams: Arc<UpstreamsDashMap>, current_upstreams: Arc<UpstreamsDashMap>,
full_upstreams: Arc<UpstreamsDashMap>, full_upstreams: Arc<UpstreamsDashMap>,
} }
#[allow(unused_mut)] #[allow(unused_mut)]
pub async fn run_server(config: &APIUpstreamProvider, mut to_return: Sender<Configuration>, upstreams_curr: Arc<UpstreamsDashMap>, upstreams_full: Arc<UpstreamsDashMap>) { pub async fn run_server(config: &APIUpstreamProvider, mut to_return: mpsc::Sender<Configuration>, upstreams_curr: Arc<UpstreamsDashMap>, upstreams_full: Arc<UpstreamsDashMap>) {
let credsfile = config.config_dir.clone() + "/acme_credentials.json"; let credsfile = config.config_dir.clone() + "/acme_credentials.json";
let app_state = AppState { let app_state = AppState {
master_key: config.masterkey.clone(), master_key: config.masterkey.clone(),
@@ -119,7 +117,7 @@ async fn conf(State(st): State<AppState>, Query(params): Query<HashMap<String, S
} }
} }
async fn apply_config(content: &str, mut st: AppState, save: bool) { async fn apply_config(content: &str, st: AppState, save: bool) {
let sl = crate::utils::parceyaml::load_configuration(content, "content").await; let sl = crate::utils::parceyaml::load_configuration(content, "content").await;
if let Some(serverlist) = sl.0 { if let Some(serverlist) = sl.0 {
if save { if save {