Memory allocation improvements for metrics collector .

This commit is contained in:
Ara Sadoyan
2026-02-05 13:57:39 +01:00
parent 3ff262c7f4
commit a893b3c301
7 changed files with 372 additions and 259 deletions

View File

@@ -2,6 +2,7 @@ pub mod auth;
pub mod discovery;
pub mod dnsclient;
mod filewatch;
pub mod fordebug;
pub mod healthcheck;
pub mod httpclient;
pub mod jwt;

31
src/utils/fordebug.rs Normal file
View File

@@ -0,0 +1,31 @@
use std::alloc::{GlobalAlloc, Layout, System};
use std::sync::atomic::{AtomicUsize, Ordering};
pub struct CountingAllocator;
pub static ALLOC_COUNT: AtomicUsize = AtomicUsize::new(0);
pub static DEALLOC_COUNT: AtomicUsize = AtomicUsize::new(0);
pub static ALLOC_BYTES: AtomicUsize = AtomicUsize::new(0);
#[allow(dead_code)]
unsafe impl GlobalAlloc for CountingAllocator {
unsafe fn alloc(&self, layout: Layout) -> *mut u8 {
ALLOC_COUNT.fetch_add(1, Ordering::Relaxed);
ALLOC_BYTES.fetch_add(layout.size(), Ordering::Relaxed);
System.alloc(layout)
}
unsafe fn dealloc(&self, ptr: *mut u8, layout: Layout) {
DEALLOC_COUNT.fetch_add(1, Ordering::Relaxed);
System.dealloc(ptr, layout)
}
}
// Uncomment following lines and comment allocator in main.rs
// #[global_allocator]
// pub static A: CountingAllocator = CountingAllocator;
#[allow(dead_code)]
fn for_example() {
let before = crate::utils::fordebug::ALLOC_COUNT.load(Ordering::Relaxed);
let after = crate::utils::fordebug::ALLOC_COUNT.load(Ordering::Relaxed);
println!("Allocations : {}", after - before);
}

View File

@@ -7,7 +7,7 @@ use dashmap::DashMap;
use futures::channel::mpsc::Sender;
use futures::SinkExt;
use pingora::prelude::sleep;
use rand::Rng;
use rand::RngExt;
use serde::Deserialize;
use std::collections::HashMap;
use std::env;
@@ -19,10 +19,6 @@ use std::time::Duration;
use tokio::fs::File;
use tokio::io::AsyncReadExt;
// #[derive(Debug, Deserialize)]
// pub struct KubeEndpointsList {
// pub items: Vec<KubeEndpoints>,
// }
#[derive(Debug, serde::Deserialize)]
pub struct KubeEndpoints {
pub subsets: Option<Vec<KubeSubset>>,

View File

@@ -1,4 +1,5 @@
use http::method::Method;
use http::StatusCode;
use pingora_http::Version;
use prometheus::{register_histogram, register_int_counter, register_int_counter_vec, Histogram, IntCounter, IntCounterVec};
use std::sync::Arc;
@@ -7,7 +8,7 @@ use std::time::Duration;
pub struct MetricTypes {
pub method: Method,
pub upstream: Arc<str>,
pub code: String,
pub code: Option<StatusCode>,
pub latency: Duration,
pub version: Version,
}
@@ -65,7 +66,7 @@ pub fn calc_metrics(metric_types: &MetricTypes) {
_ => "Unknown",
};
REQUESTS_BY_VERSION.with_label_values(&[&version_str]).inc();
RESPONSE_CODES.with_label_values(&[&metric_types.code]).inc();
RESPONSE_CODES.with_label_values(&[metric_types.code.unwrap_or(http::StatusCode::GONE).as_str()]).inc();
REQUESTS_BY_METHOD.with_label_values(&[&metric_types.method]).inc();
REQUESTS_BY_UPSTREAM.with_label_values(&[metric_types.upstream.as_ref()]).inc();
RESPONSE_LATENCY.observe(metric_types.latency.as_secs_f64());

View File

@@ -14,7 +14,6 @@ use pingora_core::listeners::ALPN;
use pingora_core::prelude::HttpPeer;
use pingora_limits::rate::Rate;
use pingora_proxy::{ProxyHttp, Session};
// use std::net::{IpAddr, Ipv4Addr};
use std::sync::Arc;
use std::time::Duration;
use tokio::time::Instant;
@@ -72,8 +71,8 @@ impl ProxyHttp for LB {
};
let hostname = return_header_host_from_upstream(session, &self.ump_upst);
_ctx.hostname = hostname;
_ctx.hostname = hostname;
let mut backend_id = None;
if ep.sticky_sessions {
@@ -93,8 +92,8 @@ impl ProxyHttp for LB {
match _ctx.hostname.as_ref() {
None => return Ok(false),
Some(host) => {
// let optioninnermap = self.get_host(host.as_str(), host.as_str(), backend_id);
let optioninnermap = self.get_host(host, session.req_header().uri.path(), backend_id);
match optioninnermap {
None => return Ok(false),
Some(ref innermap) => {
@@ -118,6 +117,7 @@ impl ProxyHttp for LB {
_ctx.upstream_peer = optioninnermap;
}
}
Ok(false)
}
async fn upstream_peer(&self, session: &mut Session, ctx: &mut Self::CTX) -> Result<Box<HttpPeer>> {
@@ -230,8 +230,7 @@ impl ProxyHttp for LB {
debug!("{}, response code: {response_code}", self.request_summary(session, ctx));
let m = &MetricTypes {
method: session.req_header().method.clone(),
// method: Arc::from(session.req_header().method.as_str()),
code: session.response_written().map(|resp| resp.status.as_str().to_owned()).unwrap_or("0".to_string()),
code: session.response_written().map(|resp| resp.status),
latency: ctx.start_time.elapsed(),
version: session.req_header().version,
upstream: ctx.hostname.clone().unwrap_or(Arc::from("localhost")),
@@ -240,50 +239,6 @@ impl ProxyHttp for LB {
}
}
// use moka::sync::Cache;
// Using Moka for a high-concurrency, size-limited cache
// static HOST_CACHE: Lazy<Cache<String, Arc<str>>> = Lazy::new(|| {
// Cache::builder()
// .max_capacity(10_000) // Limits memory usage if attacked
// .build()
// });
// fn return_header_host_cached(session: &Session) -> Option<Arc<str>> {
// let host_str = if session.is_http2() {
// session.req_header().uri.host()?
// } else {
// let h = session.req_header().headers.get("host")?.to_str().ok()?;
// h.split_once(':').map_or(h, |(host, _)| host)
// };
// HOST_CACHE
// .get_with(host_str.to_string(), || {
// Arc::from(host_str)
// })
// .into()
// }
// use dashmap::DashMap;
// A simple cache to reuse Arcs for common hostnames
// static HOST_CACHE: Lazy<DashMap<String, Arc<str>>> = Lazy::new(|| DashMap::with_capacity(200));
//
// fn return_header_host_cached(session: &Session) -> Option<Arc<str>> {
// let host_str = if session.is_http2() {
// session.req_header().uri.host()?
// } else {
// let h = session.req_header().headers.get("host")?.to_str().ok()?;
// h.split_once(':').map_or(h, |(host, _)| host)
// };
//
// // Fast path: check if we already have an Arc for this host
// if let Some(arc) = HOST_CACHE.get(host_str) {
// return Some(arc.clone()); // Only an atomic increment!
// }
//
// // Slow path: create new Arc and cache it
// let new_arc: Arc<str> = Arc::from(host_str);
// HOST_CACHE.insert(host_str.to_string(), new_arc.clone());
// Some(new_arc)
// }
fn return_header_host_from_upstream(session: &Session, ump_upst: &UpstreamsDashMap) -> Option<Arc<str>> {
let host_str = if session.is_http2() {
session.req_header().uri.host()?