diff --git a/Cargo.lock b/Cargo.lock index 1b9b1cc..6d9476a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -818,6 +818,7 @@ dependencies = [ "env_logger", "futures", "jsonwebtoken", + "lazy_static", "log", "mimalloc", "notify", @@ -825,6 +826,7 @@ dependencies = [ "pingora-core", "pingora-http", "pingora-proxy", + "prometheus 0.14.0", "rand 0.9.1", "reqwest", "serde", @@ -1874,7 +1876,7 @@ dependencies = [ "pingora-pool", "pingora-runtime", "pingora-timeout", - "prometheus", + "prometheus 0.13.4", "rand 0.8.5", "regex", "serde", @@ -2121,10 +2123,25 @@ dependencies = [ "lazy_static", "memchr", "parking_lot", - "protobuf", + "protobuf 2.28.0", "thiserror 1.0.69", ] +[[package]] +name = "prometheus" +version = "0.14.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3ca5326d8d0b950a9acd87e6a3f94745394f62e4dae1b1ee22b2bc0c394af43a" +dependencies = [ + "cfg-if", + "fnv", + "lazy_static", + "memchr", + "parking_lot", + "protobuf 3.7.2", + "thiserror 2.0.12", +] + [[package]] name = "prost" version = "0.13.5" @@ -2140,6 +2157,26 @@ version = "2.28.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "106dd99e98437432fed6519dedecfade6a06a73bb7b2a1e019fdd2bee5778d94" +[[package]] +name = "protobuf" +version = "3.7.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d65a1d4ddae7d8b5de68153b48f6aa3bba8cb002b243dbdbc55a5afbc98f99f4" +dependencies = [ + "once_cell", + "protobuf-support", + "thiserror 1.0.69", +] + +[[package]] +name = "protobuf-support" +version = "3.7.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3e36c2f31e0a47f9280fb347ef5e461ffcd2c52dd520d8e216b52f93b0b0d7d6" +dependencies = [ + "thiserror 1.0.69", +] + [[package]] name = "quote" version = "1.0.38" @@ -2352,7 +2389,7 @@ dependencies = [ "errno", "libc", "linux-raw-sys", - "windows-sys 0.52.0", + "windows-sys 0.59.0", ] [[package]] @@ -2719,7 +2756,7 @@ dependencies = [ "getrandom 0.3.1", "once_cell", "rustix", - "windows-sys 0.52.0", + "windows-sys 0.59.0", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index ef17ad5..644f10f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -40,4 +40,6 @@ urlencoding = "2.1.3" arc-swap = "1.7.1" #rustls = { version = "0.23.27", features = ["ring"] } mimalloc = { version = "0.1.46", default-features = false } +prometheus = "0.14.0" +lazy_static = "1.5.0" diff --git a/src/utils.rs b/src/utils.rs index 75ef53b..2562713 100644 --- a/src/utils.rs +++ b/src/utils.rs @@ -4,6 +4,7 @@ pub mod discovery; mod filewatch; pub mod healthcheck; pub mod jwt; +pub mod metrics; pub mod parceyaml; pub mod structs; pub mod tools; diff --git a/src/utils/metrics.rs b/src/utils/metrics.rs new file mode 100644 index 0000000..00bef9c --- /dev/null +++ b/src/utils/metrics.rs @@ -0,0 +1,58 @@ +use prometheus::{register_histogram, register_int_counter, register_int_counter_vec, Histogram, IntCounter, IntCounterVec}; +use std::time::Duration; + +lazy_static::lazy_static! { + pub static ref REQUEST_COUNT: IntCounter = register_int_counter!( + "gazan_requests_total", + "Total number of requests handled by Gazan" + ).unwrap(); + pub static ref RESPONSE_CODES: IntCounterVec = register_int_counter_vec!( + "gazan_responses_total", + "Responses grouped by status code", + &["status"] + ).unwrap(); + pub static ref REQUEST_LATENCY: Histogram = register_histogram!( + "gazan_request_latency_seconds", + "Request latency in seconds", + vec![0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0] + ).unwrap(); + pub static ref RESPONSE_LATENCY: Histogram = register_histogram!( + "gazan_response_latency_seconds", + "Response latency in seconds", + vec![0.01, 0.05, 0.1, 0.25, 0.5, 1.0, 2.0, 5.0] + ).unwrap(); + pub static ref REQUESTS_BY_METHOD: IntCounterVec = register_int_counter_vec!( + "gazan_requests_by_method_total", + "Number of requests by HTTP method", + &["method"] + ).unwrap(); + pub static ref ERROR_COUNT: IntCounter = register_int_counter!( + "gazan_errors_total", + "Total number of errors" + ).unwrap(); +} + +pub fn calc_metrics(method: String, code: u16, latency: Duration) { + REQUEST_COUNT.inc(); + let timer = REQUEST_LATENCY.start_timer(); + timer.observe_duration(); + RESPONSE_CODES.with_label_values(&[&code.to_string()]).inc(); + REQUESTS_BY_METHOD.with_label_values(&[&method]).inc(); + RESPONSE_LATENCY.observe(latency.as_secs_f64()); +} + +/* +tokio::spawn(async move { + let mut interval = tokio::time::interval(std::time::Duration::from_secs(5)); + loop { + interval.tick().await; + + // read Pingora stats + let stats = pingora.get_stats(); + + // update Prometheus metrics accordingly + REQUEST_COUNT.set(stats.requests_total); + // ... etc + } +}); +*/ diff --git a/src/web/proxyhttp.rs b/src/web/proxyhttp.rs index 1c2a8cb..7295e6d 100644 --- a/src/web/proxyhttp.rs +++ b/src/web/proxyhttp.rs @@ -1,4 +1,5 @@ use crate::utils::auth::authenticate; +use crate::utils::metrics::*; use crate::utils::structs::{AppConfig, Extraparams, Headers, UpstreamsDashMap, UpstreamsIdMap}; use crate::web::gethosts::GetHost; use arc_swap::ArcSwap; @@ -10,6 +11,7 @@ use pingora_core::listeners::ALPN; use pingora_core::prelude::HttpPeer; use pingora_proxy::{ProxyHttp, Session}; use std::sync::Arc; +use tokio::time::Instant; pub struct LB { pub ump_upst: Arc, @@ -24,6 +26,7 @@ pub struct Context { backend_id: String, to_https: bool, redirect_to: String, + start_time: Instant, } #[async_trait] @@ -36,6 +39,7 @@ impl ProxyHttp for LB { backend_id: String::new(), to_https: false, redirect_to: String::new(), + start_time: Instant::now(), } } async fn request_filter(&self, session: &mut Session, _ctx: &mut Self::CTX) -> Result { @@ -51,7 +55,6 @@ impl ProxyHttp for LB { } async fn upstream_peer(&self, session: &mut Session, ctx: &mut Self::CTX) -> Result> { let host_name = return_header_host(&session); - match host_name { Some(hostname) => { // session.req_header_mut().headers.insert("X-Host-Name", host.to_string().parse().unwrap()); @@ -183,6 +186,11 @@ impl ProxyHttp for LB { async fn logging(&self, session: &mut Session, _e: Option<&pingora::Error>, ctx: &mut Self::CTX) { let response_code = session.response_written().map_or(0, |resp| resp.status.as_u16()); debug!("{}, response code: {response_code}", self.request_summary(session, ctx)); + + let method = session.req_header().method.to_string(); + let status = session.response_written().map(|resp| resp.status.as_u16()).unwrap_or(0); + let latency = ctx.start_time.elapsed(); + calc_metrics(method, status, latency); } } diff --git a/src/web/start.rs b/src/web/start.rs index 09aef49..0e0eea6 100644 --- a/src/web/start.rs +++ b/src/web/start.rs @@ -1,3 +1,4 @@ +// use rustls::crypto::ring::default_provider; use crate::utils::structs::Extraparams; use crate::web::proxyhttp::LB; use arc_swap::ArcSwap; @@ -5,7 +6,6 @@ use dashmap::DashMap; use log::info; use pingora_core::prelude::{background_service, Opt}; use pingora_core::server::Server; -// use rustls::crypto::ring::default_provider; use std::env; use std::sync::Arc; @@ -92,8 +92,5 @@ pub fn run() { proxy.add_tcp(bind_address_http.as_str()); server.add_service(proxy); server.add_service(bg_srvc); - // let mut prometheus_service_http = Service::prometheus_http_service(); - // prometheus_service_http.add_tcp("0.0.0.0:1234"); - // server.add_service(prometheus_service_http); server.run_forever(); } diff --git a/src/web/webserver.rs b/src/web/webserver.rs index 8561fdb..f923ed1 100644 --- a/src/web/webserver.rs +++ b/src/web/webserver.rs @@ -9,13 +9,14 @@ use futures::channel::mpsc::Sender; use futures::SinkExt; use jsonwebtoken::{encode, EncodingKey, Header}; use log::{error, info, warn}; +use prometheus::{gather, Encoder, TextEncoder}; use serde::{Deserialize, Serialize}; use std::time::{Duration, SystemTime, UNIX_EPOCH}; use tokio::net::TcpListener; #[derive(Deserialize)] struct InputKey { - masterkey: String, + master_key: String, owner: String, valid: u64, } @@ -26,8 +27,8 @@ struct OutToken { } #[allow(unused_mut)] -pub async fn run_server(bindaddress: String, masterkey: String, mut toreturn: Sender) { - let mut tr = toreturn.clone(); +pub async fn run_server(bindaddress: String, master_key: String, mut to_return: Sender) { + let mut tr = to_return.clone(); let app = Router::new() .route("/{*wildcard}", get(senderror)) .route("/{*wildcard}", post(senderror)) @@ -35,7 +36,8 @@ pub async fn run_server(bindaddress: String, masterkey: String, mut toreturn: Se .route("/{*wildcard}", head(senderror)) .route("/{*wildcard}", delete(senderror)) .route("/jwt", post(jwt_gen)) - .with_state(masterkey.clone()) + .route("/metrics", get(metrics)) + .with_state(master_key.clone()) .route( "/conf", post(|up: String| async move { @@ -64,12 +66,12 @@ async fn senderror() -> impl IntoResponse { Response::builder().status(StatusCode::BAD_GATEWAY).body(Body::from("No live upstream found!\n")).unwrap() } -async fn jwt_gen(State(masterkey): State, Json(payload): Json) -> (StatusCode, Json) { - if payload.masterkey == masterkey { +async fn jwt_gen(State(master_key): State, Json(payload): Json) -> (StatusCode, Json) { + if payload.master_key == master_key { let now = SystemTime::now() + Duration::from_secs(payload.valid * 60); let a = now.duration_since(UNIX_EPOCH).unwrap().as_secs(); let claim = crate::utils::jwt::Claims { user: payload.owner, exp: a }; - match encode(&Header::default(), &claim, &EncodingKey::from_secret(payload.masterkey.as_ref())) { + match encode(&Header::default(), &claim, &EncodingKey::from_secret(payload.master_key.as_ref())) { Ok(t) => { let tok = OutToken { token: t }; info!("Generating token: {:?}", tok); @@ -89,3 +91,23 @@ async fn jwt_gen(State(masterkey): State, Json(payload): Json) (StatusCode::FORBIDDEN, Json(tok)) } } + +async fn metrics() -> impl IntoResponse { + let metric_families = gather(); + let encoder = TextEncoder::new(); + + let mut buffer = Vec::new(); + if let Err(e) = encoder.encode(&metric_families, &mut buffer) { + // encoding error fallback + return Response::builder() + .status(StatusCode::INTERNAL_SERVER_ERROR) + .body(Body::from(format!("Failed to encode metrics: {}", e))) + .unwrap(); + } + + Response::builder() + .status(StatusCode::OK) + .header("Content-Type", encoder.format_type()) + .body(Body::from(buffer)) + .unwrap() +}