Metrics exporter for Prometheus

This commit is contained in:
Ara Sadoyan
2025-05-28 21:23:10 +02:00
parent 78a084380a
commit 423c7afa90
7 changed files with 141 additions and 16 deletions

45
Cargo.lock generated
View File

@@ -818,6 +818,7 @@ dependencies = [
"env_logger", "env_logger",
"futures", "futures",
"jsonwebtoken", "jsonwebtoken",
"lazy_static",
"log", "log",
"mimalloc", "mimalloc",
"notify", "notify",
@@ -825,6 +826,7 @@ dependencies = [
"pingora-core", "pingora-core",
"pingora-http", "pingora-http",
"pingora-proxy", "pingora-proxy",
"prometheus 0.14.0",
"rand 0.9.1", "rand 0.9.1",
"reqwest", "reqwest",
"serde", "serde",
@@ -1874,7 +1876,7 @@ dependencies = [
"pingora-pool", "pingora-pool",
"pingora-runtime", "pingora-runtime",
"pingora-timeout", "pingora-timeout",
"prometheus", "prometheus 0.13.4",
"rand 0.8.5", "rand 0.8.5",
"regex", "regex",
"serde", "serde",
@@ -2121,10 +2123,25 @@ dependencies = [
"lazy_static", "lazy_static",
"memchr", "memchr",
"parking_lot", "parking_lot",
"protobuf", "protobuf 2.28.0",
"thiserror 1.0.69", "thiserror 1.0.69",
] ]
[[package]]
name = "prometheus"
version = "0.14.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3ca5326d8d0b950a9acd87e6a3f94745394f62e4dae1b1ee22b2bc0c394af43a"
dependencies = [
"cfg-if",
"fnv",
"lazy_static",
"memchr",
"parking_lot",
"protobuf 3.7.2",
"thiserror 2.0.12",
]
[[package]] [[package]]
name = "prost" name = "prost"
version = "0.13.5" version = "0.13.5"
@@ -2140,6 +2157,26 @@ version = "2.28.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "106dd99e98437432fed6519dedecfade6a06a73bb7b2a1e019fdd2bee5778d94" checksum = "106dd99e98437432fed6519dedecfade6a06a73bb7b2a1e019fdd2bee5778d94"
[[package]]
name = "protobuf"
version = "3.7.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d65a1d4ddae7d8b5de68153b48f6aa3bba8cb002b243dbdbc55a5afbc98f99f4"
dependencies = [
"once_cell",
"protobuf-support",
"thiserror 1.0.69",
]
[[package]]
name = "protobuf-support"
version = "3.7.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3e36c2f31e0a47f9280fb347ef5e461ffcd2c52dd520d8e216b52f93b0b0d7d6"
dependencies = [
"thiserror 1.0.69",
]
[[package]] [[package]]
name = "quote" name = "quote"
version = "1.0.38" version = "1.0.38"
@@ -2352,7 +2389,7 @@ dependencies = [
"errno", "errno",
"libc", "libc",
"linux-raw-sys", "linux-raw-sys",
"windows-sys 0.52.0", "windows-sys 0.59.0",
] ]
[[package]] [[package]]
@@ -2719,7 +2756,7 @@ dependencies = [
"getrandom 0.3.1", "getrandom 0.3.1",
"once_cell", "once_cell",
"rustix", "rustix",
"windows-sys 0.52.0", "windows-sys 0.59.0",
] ]
[[package]] [[package]]

View File

@@ -40,4 +40,6 @@ urlencoding = "2.1.3"
arc-swap = "1.7.1" arc-swap = "1.7.1"
#rustls = { version = "0.23.27", features = ["ring"] } #rustls = { version = "0.23.27", features = ["ring"] }
mimalloc = { version = "0.1.46", default-features = false } mimalloc = { version = "0.1.46", default-features = false }
prometheus = "0.14.0"
lazy_static = "1.5.0"

View File

@@ -4,6 +4,7 @@ pub mod discovery;
mod filewatch; mod filewatch;
pub mod healthcheck; pub mod healthcheck;
pub mod jwt; pub mod jwt;
pub mod metrics;
pub mod parceyaml; pub mod parceyaml;
pub mod structs; pub mod structs;
pub mod tools; pub mod tools;

58
src/utils/metrics.rs Normal file
View File

@@ -0,0 +1,58 @@
use prometheus::{register_histogram, register_int_counter, register_int_counter_vec, Histogram, IntCounter, IntCounterVec};
use std::time::Duration;
lazy_static::lazy_static! {
pub static ref REQUEST_COUNT: IntCounter = register_int_counter!(
"gazan_requests_total",
"Total number of requests handled by Gazan"
).unwrap();
pub static ref RESPONSE_CODES: IntCounterVec = register_int_counter_vec!(
"gazan_responses_total",
"Responses grouped by status code",
&["status"]
).unwrap();
pub static ref REQUEST_LATENCY: Histogram = register_histogram!(
"gazan_request_latency_seconds",
"Request latency in seconds",
vec![0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0]
).unwrap();
pub static ref RESPONSE_LATENCY: Histogram = register_histogram!(
"gazan_response_latency_seconds",
"Response latency in seconds",
vec![0.01, 0.05, 0.1, 0.25, 0.5, 1.0, 2.0, 5.0]
).unwrap();
pub static ref REQUESTS_BY_METHOD: IntCounterVec = register_int_counter_vec!(
"gazan_requests_by_method_total",
"Number of requests by HTTP method",
&["method"]
).unwrap();
pub static ref ERROR_COUNT: IntCounter = register_int_counter!(
"gazan_errors_total",
"Total number of errors"
).unwrap();
}
pub fn calc_metrics(method: String, code: u16, latency: Duration) {
REQUEST_COUNT.inc();
let timer = REQUEST_LATENCY.start_timer();
timer.observe_duration();
RESPONSE_CODES.with_label_values(&[&code.to_string()]).inc();
REQUESTS_BY_METHOD.with_label_values(&[&method]).inc();
RESPONSE_LATENCY.observe(latency.as_secs_f64());
}
/*
tokio::spawn(async move {
let mut interval = tokio::time::interval(std::time::Duration::from_secs(5));
loop {
interval.tick().await;
// read Pingora stats
let stats = pingora.get_stats();
// update Prometheus metrics accordingly
REQUEST_COUNT.set(stats.requests_total);
// ... etc
}
});
*/

View File

@@ -1,4 +1,5 @@
use crate::utils::auth::authenticate; use crate::utils::auth::authenticate;
use crate::utils::metrics::*;
use crate::utils::structs::{AppConfig, Extraparams, Headers, UpstreamsDashMap, UpstreamsIdMap}; use crate::utils::structs::{AppConfig, Extraparams, Headers, UpstreamsDashMap, UpstreamsIdMap};
use crate::web::gethosts::GetHost; use crate::web::gethosts::GetHost;
use arc_swap::ArcSwap; use arc_swap::ArcSwap;
@@ -10,6 +11,7 @@ use pingora_core::listeners::ALPN;
use pingora_core::prelude::HttpPeer; use pingora_core::prelude::HttpPeer;
use pingora_proxy::{ProxyHttp, Session}; use pingora_proxy::{ProxyHttp, Session};
use std::sync::Arc; use std::sync::Arc;
use tokio::time::Instant;
pub struct LB { pub struct LB {
pub ump_upst: Arc<UpstreamsDashMap>, pub ump_upst: Arc<UpstreamsDashMap>,
@@ -24,6 +26,7 @@ pub struct Context {
backend_id: String, backend_id: String,
to_https: bool, to_https: bool,
redirect_to: String, redirect_to: String,
start_time: Instant,
} }
#[async_trait] #[async_trait]
@@ -36,6 +39,7 @@ impl ProxyHttp for LB {
backend_id: String::new(), backend_id: String::new(),
to_https: false, to_https: false,
redirect_to: String::new(), redirect_to: String::new(),
start_time: Instant::now(),
} }
} }
async fn request_filter(&self, session: &mut Session, _ctx: &mut Self::CTX) -> Result<bool> { async fn request_filter(&self, session: &mut Session, _ctx: &mut Self::CTX) -> Result<bool> {
@@ -51,7 +55,6 @@ impl ProxyHttp for LB {
} }
async fn upstream_peer(&self, session: &mut Session, ctx: &mut Self::CTX) -> Result<Box<HttpPeer>> { async fn upstream_peer(&self, session: &mut Session, ctx: &mut Self::CTX) -> Result<Box<HttpPeer>> {
let host_name = return_header_host(&session); let host_name = return_header_host(&session);
match host_name { match host_name {
Some(hostname) => { Some(hostname) => {
// session.req_header_mut().headers.insert("X-Host-Name", host.to_string().parse().unwrap()); // session.req_header_mut().headers.insert("X-Host-Name", host.to_string().parse().unwrap());
@@ -183,6 +186,11 @@ impl ProxyHttp for LB {
async fn logging(&self, session: &mut Session, _e: Option<&pingora::Error>, ctx: &mut Self::CTX) { async fn logging(&self, session: &mut Session, _e: Option<&pingora::Error>, ctx: &mut Self::CTX) {
let response_code = session.response_written().map_or(0, |resp| resp.status.as_u16()); let response_code = session.response_written().map_or(0, |resp| resp.status.as_u16());
debug!("{}, response code: {response_code}", self.request_summary(session, ctx)); debug!("{}, response code: {response_code}", self.request_summary(session, ctx));
let method = session.req_header().method.to_string();
let status = session.response_written().map(|resp| resp.status.as_u16()).unwrap_or(0);
let latency = ctx.start_time.elapsed();
calc_metrics(method, status, latency);
} }
} }

View File

@@ -1,3 +1,4 @@
// use rustls::crypto::ring::default_provider;
use crate::utils::structs::Extraparams; use crate::utils::structs::Extraparams;
use crate::web::proxyhttp::LB; use crate::web::proxyhttp::LB;
use arc_swap::ArcSwap; use arc_swap::ArcSwap;
@@ -5,7 +6,6 @@ use dashmap::DashMap;
use log::info; use log::info;
use pingora_core::prelude::{background_service, Opt}; use pingora_core::prelude::{background_service, Opt};
use pingora_core::server::Server; use pingora_core::server::Server;
// use rustls::crypto::ring::default_provider;
use std::env; use std::env;
use std::sync::Arc; use std::sync::Arc;
@@ -92,8 +92,5 @@ pub fn run() {
proxy.add_tcp(bind_address_http.as_str()); proxy.add_tcp(bind_address_http.as_str());
server.add_service(proxy); server.add_service(proxy);
server.add_service(bg_srvc); server.add_service(bg_srvc);
// let mut prometheus_service_http = Service::prometheus_http_service();
// prometheus_service_http.add_tcp("0.0.0.0:1234");
// server.add_service(prometheus_service_http);
server.run_forever(); server.run_forever();
} }

View File

@@ -9,13 +9,14 @@ use futures::channel::mpsc::Sender;
use futures::SinkExt; use futures::SinkExt;
use jsonwebtoken::{encode, EncodingKey, Header}; use jsonwebtoken::{encode, EncodingKey, Header};
use log::{error, info, warn}; use log::{error, info, warn};
use prometheus::{gather, Encoder, TextEncoder};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use std::time::{Duration, SystemTime, UNIX_EPOCH}; use std::time::{Duration, SystemTime, UNIX_EPOCH};
use tokio::net::TcpListener; use tokio::net::TcpListener;
#[derive(Deserialize)] #[derive(Deserialize)]
struct InputKey { struct InputKey {
masterkey: String, master_key: String,
owner: String, owner: String,
valid: u64, valid: u64,
} }
@@ -26,8 +27,8 @@ struct OutToken {
} }
#[allow(unused_mut)] #[allow(unused_mut)]
pub async fn run_server(bindaddress: String, masterkey: String, mut toreturn: Sender<Configuration>) { pub async fn run_server(bindaddress: String, master_key: String, mut to_return: Sender<Configuration>) {
let mut tr = toreturn.clone(); let mut tr = to_return.clone();
let app = Router::new() let app = Router::new()
.route("/{*wildcard}", get(senderror)) .route("/{*wildcard}", get(senderror))
.route("/{*wildcard}", post(senderror)) .route("/{*wildcard}", post(senderror))
@@ -35,7 +36,8 @@ pub async fn run_server(bindaddress: String, masterkey: String, mut toreturn: Se
.route("/{*wildcard}", head(senderror)) .route("/{*wildcard}", head(senderror))
.route("/{*wildcard}", delete(senderror)) .route("/{*wildcard}", delete(senderror))
.route("/jwt", post(jwt_gen)) .route("/jwt", post(jwt_gen))
.with_state(masterkey.clone()) .route("/metrics", get(metrics))
.with_state(master_key.clone())
.route( .route(
"/conf", "/conf",
post(|up: String| async move { post(|up: String| async move {
@@ -64,12 +66,12 @@ async fn senderror() -> impl IntoResponse {
Response::builder().status(StatusCode::BAD_GATEWAY).body(Body::from("No live upstream found!\n")).unwrap() Response::builder().status(StatusCode::BAD_GATEWAY).body(Body::from("No live upstream found!\n")).unwrap()
} }
async fn jwt_gen(State(masterkey): State<String>, Json(payload): Json<InputKey>) -> (StatusCode, Json<OutToken>) { async fn jwt_gen(State(master_key): State<String>, Json(payload): Json<InputKey>) -> (StatusCode, Json<OutToken>) {
if payload.masterkey == masterkey { if payload.master_key == master_key {
let now = SystemTime::now() + Duration::from_secs(payload.valid * 60); let now = SystemTime::now() + Duration::from_secs(payload.valid * 60);
let a = now.duration_since(UNIX_EPOCH).unwrap().as_secs(); let a = now.duration_since(UNIX_EPOCH).unwrap().as_secs();
let claim = crate::utils::jwt::Claims { user: payload.owner, exp: a }; let claim = crate::utils::jwt::Claims { user: payload.owner, exp: a };
match encode(&Header::default(), &claim, &EncodingKey::from_secret(payload.masterkey.as_ref())) { match encode(&Header::default(), &claim, &EncodingKey::from_secret(payload.master_key.as_ref())) {
Ok(t) => { Ok(t) => {
let tok = OutToken { token: t }; let tok = OutToken { token: t };
info!("Generating token: {:?}", tok); info!("Generating token: {:?}", tok);
@@ -89,3 +91,23 @@ async fn jwt_gen(State(masterkey): State<String>, Json(payload): Json<InputKey>)
(StatusCode::FORBIDDEN, Json(tok)) (StatusCode::FORBIDDEN, Json(tok))
} }
} }
async fn metrics() -> impl IntoResponse {
let metric_families = gather();
let encoder = TextEncoder::new();
let mut buffer = Vec::new();
if let Err(e) = encoder.encode(&metric_families, &mut buffer) {
// encoding error fallback
return Response::builder()
.status(StatusCode::INTERNAL_SERVER_ERROR)
.body(Body::from(format!("Failed to encode metrics: {}", e)))
.unwrap();
}
Response::builder()
.status(StatusCode::OK)
.header("Content-Type", encoder.format_type())
.body(Body::from(buffer))
.unwrap()
}