mirror of
https://github.com/sadoyan/aralez.git
synced 2026-07-01 03:42:21 +08:00
Implementing full async access log inspired by #45
This commit is contained in:
@@ -15,6 +15,7 @@ pub struct MetricTypes {
|
||||
}
|
||||
|
||||
pub static OPEN_FILES: LazyLock<IntGauge> = LazyLock::new(|| register_int_gauge!("aralez_open_files", "Number of open file descriptors").unwrap());
|
||||
pub static LOGGING_ERRORS: LazyLock<IntGauge> = LazyLock::new(|| register_int_gauge!("aralez_logging_errors", "Number of log errors").unwrap());
|
||||
pub static MEMORY_USAGE: LazyLock<IntGauge> = LazyLock::new(|| register_int_gauge!("aralez_memory_bytes", "Total memory allocated in bytes").unwrap());
|
||||
pub static ACTIVE_SESSIONS: LazyLock<IntGauge> = LazyLock::new(|| register_int_gauge!("aralez_active_sessions", "Current number of active sessions").unwrap());
|
||||
pub static REQUEST_COUNT: LazyLock<IntCounter> = LazyLock::new(|| register_int_counter!("aralez_requests_total", "Total number of requests handled by Aralez").unwrap());
|
||||
|
||||
@@ -4,6 +4,7 @@ use crate::utils::parceyaml::load_configuration;
|
||||
use crate::utils::structs::Configuration;
|
||||
use crate::utils::tools::*;
|
||||
use crate::utils::*;
|
||||
use crate::web::logging::init_logging;
|
||||
use crate::web::proxyhttp::LB;
|
||||
use async_trait::async_trait;
|
||||
use dashmap::DashMap;
|
||||
@@ -77,7 +78,7 @@ impl BackgroundService for LB {
|
||||
healthcheck::hc2(uu, ff, im, (&*hc_method.to_string(), hc_interval.to_string().parse().unwrap())).await
|
||||
}));
|
||||
drop(tokio::spawn(async move { refresh_order(certdir, confdir).await }));
|
||||
|
||||
init_logging(self.config.access_log.clone());
|
||||
loop {
|
||||
tokio::select! {
|
||||
_ = shutdown.changed() => {
|
||||
|
||||
@@ -1,15 +1,29 @@
|
||||
use crate::utils::metrics::LOGGING_ERRORS;
|
||||
use log::info;
|
||||
use pingora_http::Version;
|
||||
use pingora_proxy::Session;
|
||||
use std::net::{IpAddr, Ipv4Addr};
|
||||
use std::sync::OnceLock;
|
||||
use tokio::sync::mpsc;
|
||||
|
||||
pub static ACCESS_LOG: OnceLock<LogLevel> = OnceLock::new();
|
||||
#[derive(Debug)]
|
||||
pub struct LogMessage {
|
||||
pub response_code: u16,
|
||||
pub summary: String,
|
||||
pub client_ip: IpAddr,
|
||||
pub version: Version,
|
||||
pub user_agent: String,
|
||||
}
|
||||
static LOG_SENDER: OnceLock<mpsc::Sender<LogMessage>> = OnceLock::new();
|
||||
static ACCESS_LOG: OnceLock<LogLevel> = OnceLock::new();
|
||||
const LOG_BUFFER: usize = 16384;
|
||||
|
||||
pub fn init_access_log(level_str: &str) {
|
||||
let level = LogLevel::from_str(level_str);
|
||||
let _ = ACCESS_LOG.set(level);
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub enum LogLevel {
|
||||
Access,
|
||||
Error,
|
||||
@@ -47,11 +61,37 @@ pub fn access_log(response_code: u16, summary: &str, session: &Session) {
|
||||
|
||||
let user_agent = session.req_header().headers.get("user-agent").and_then(|v| v.to_str().ok()).unwrap_or("-");
|
||||
|
||||
info!(
|
||||
"{}, response code: {response_code}, client: {}, version: {:?}, useragent: {}",
|
||||
summary,
|
||||
ip,
|
||||
session.req_header().version,
|
||||
user_agent,
|
||||
);
|
||||
let log = LogMessage {
|
||||
response_code,
|
||||
summary: summary.to_owned(),
|
||||
client_ip: ip,
|
||||
version: session.req_header().version,
|
||||
user_agent: user_agent.to_owned(),
|
||||
};
|
||||
|
||||
if let Some(sender) = LOG_SENDER.get() {
|
||||
let sender = sender;
|
||||
if let Err(_) = sender.try_send(log) {
|
||||
LOGGING_ERRORS.inc();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn init_logging(enabled: Option<String>) {
|
||||
if let Some(_) = enabled {
|
||||
LOGGING_ERRORS.set(0);
|
||||
info!("Enabling {:?} log, with buffer of {} messages", ACCESS_LOG.get().unwrap_or(&LogLevel::None), LOG_BUFFER);
|
||||
let (ltx, lrx) = mpsc::channel(LOG_BUFFER);
|
||||
LOG_SENDER.set(ltx).unwrap();
|
||||
std::thread::spawn(move || log_receiver(lrx));
|
||||
}
|
||||
}
|
||||
|
||||
pub fn log_receiver(mut receiver: mpsc::Receiver<LogMessage>) {
|
||||
while let Some(msg) = receiver.blocking_recv() {
|
||||
info!(
|
||||
"{}, {}, client: {}, version: {:?}, useragent: {}",
|
||||
msg.response_code, msg.summary, msg.client_ip, msg.version, msg.user_agent,
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -95,9 +95,6 @@ impl ProxyHttp for LB {
|
||||
let header = ResponseHeader::build(429, None)?;
|
||||
session.set_keepalive(None);
|
||||
session.write_response_header(Box::new(header), true).await?;
|
||||
// if let (Some(oi), Some(oa)) = (&_ctx.hostname, rate_key) {
|
||||
// warn!("Limit 4XX: {}-rps exceed on {} from {} path {}", rate, oi, oa, session.req_header().uri.path());
|
||||
// }
|
||||
return Ok(true);
|
||||
}
|
||||
}
|
||||
@@ -109,9 +106,6 @@ impl ProxyHttp for LB {
|
||||
let header = ResponseHeader::build(429, None)?;
|
||||
session.set_keepalive(None);
|
||||
session.write_response_header(Box::new(header), true).await?;
|
||||
// if let (Some(oi), Some(oa)) = (&_ctx.hostname, rate_key) {
|
||||
// warn!("Limit: {}-rps exceed on {} from {}", rate, oi, oa);
|
||||
// }
|
||||
return Ok(true);
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user