API server changes, improvements

This commit is contained in:
Ara Sadoyan
2025-06-06 19:30:51 +02:00
parent 8e05794784
commit ffc2bab79f
4 changed files with 95 additions and 45 deletions

View File

@@ -1,6 +1,13 @@
use pingora_http::Version;
use prometheus::{register_histogram, register_int_counter, register_int_counter_vec, Histogram, IntCounter, IntCounterVec};
use std::time::Duration;
pub struct MetricTypes {
pub method: String,
pub code: String,
pub latency: Duration,
pub version: Version,
}
lazy_static::lazy_static! {
pub static ref REQUEST_COUNT: IntCounter = register_int_counter!(
"gazan_requests_total",
@@ -26,12 +33,35 @@ lazy_static::lazy_static! {
"Number of requests by HTTP method",
&["method"]
).unwrap();
pub static ref REQUESTS_BY_VERSION: IntCounterVec = register_int_counter_vec!(
"gazan_requests_by_version_total",
"Number of requests by HTTP versions",
&["version"]
).unwrap();
pub static ref ERROR_COUNT: IntCounter = register_int_counter!(
"gazan_errors_total",
"Total number of errors"
).unwrap();
}
pub fn calc_metrics(metric_types: &MetricTypes) {
REQUEST_COUNT.inc();
let timer = REQUEST_LATENCY.start_timer();
timer.observe_duration();
let version_str = match &metric_types.version {
&Version::HTTP_11 => "HTTP/1.1",
&Version::HTTP_2 => "HTTP/2.0",
&Version::HTTP_3 => "HTTP/3.0",
&Version::HTTP_10 => "HTTP/1.0",
_ => "Unknown",
};
REQUESTS_BY_VERSION.with_label_values(&[&version_str]).inc();
RESPONSE_CODES.with_label_values(&[&metric_types.code.to_string()]).inc();
REQUESTS_BY_METHOD.with_label_values(&[&metric_types.method]).inc();
RESPONSE_LATENCY.observe(metric_types.latency.as_secs_f64());
}
/*
pub fn calc_metrics(method: String, code: u16, latency: Duration) {
REQUEST_COUNT.inc();
let timer = REQUEST_LATENCY.start_timer();
@@ -41,7 +71,6 @@ pub fn calc_metrics(method: String, code: u16, latency: Duration) {
RESPONSE_LATENCY.observe(latency.as_secs_f64());
}
/*
tokio::spawn(async move {
let mut interval = tokio::time::interval(std::time::Duration::from_secs(5));
loop {

View File

@@ -95,7 +95,7 @@ impl ProxyHttp for LB {
if let Some(addr) = session.server_addr() {
if let Some((host, _)) = addr.to_string().split_once(':') {
let uri = session.req_header().uri.path_and_query().map_or("/", |pq| pq.as_str());
let port = self.config.proxy_port_tls.unwrap_or(443);
let port = self.config.proxy_port_tls.unwrap_or(403);
ctx.to_https = true;
ctx.redirect_to = format!("https://{}:{}{}", host, port, uri);
}
@@ -186,11 +186,13 @@ impl ProxyHttp for LB {
async fn logging(&self, session: &mut Session, _e: Option<&pingora::Error>, ctx: &mut Self::CTX) {
let response_code = session.response_written().map_or(0, |resp| resp.status.as_u16());
debug!("{}, response code: {response_code}", self.request_summary(session, ctx));
let method = session.req_header().method.to_string();
let status = session.response_written().map(|resp| resp.status.as_u16()).unwrap_or(0);
let latency = ctx.start_time.elapsed();
calc_metrics(method, status, latency);
let m = &MetricTypes {
method: session.req_header().method.to_string(),
code: session.response_written().map(|resp| resp.status.as_str().to_owned()).unwrap_or("0".to_string()),
latency: ctx.start_time.elapsed(),
version: session.req_header().version,
};
calc_metrics(m);
}
}

View File

@@ -1,6 +1,6 @@
use crate::utils::structs::Configuration;
use axum::body::Body;
use axum::extract::State;
use axum::extract::{Query, State};
use axum::http::{Response, StatusCode};
use axum::response::IntoResponse;
use axum::routing::{delete, get, head, post, put};
@@ -11,6 +11,7 @@ use jsonwebtoken::{encode, EncodingKey, Header};
use log::{error, info, warn};
use prometheus::{gather, Encoder, TextEncoder};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use tokio::net::TcpListener;
@@ -26,9 +27,18 @@ struct OutToken {
token: String,
}
#[derive(Clone)]
struct AppState {
master_key: String,
config_sender: Sender<Configuration>,
}
#[allow(unused_mut)]
pub async fn run_server(bindaddress: String, master_key: String, mut to_return: Sender<Configuration>) {
let mut tr = to_return.clone();
let app_state = AppState {
master_key: master_key.clone(),
config_sender: to_return.clone(),
};
let app = Router::new()
.route("/{*wildcard}", get(senderror))
.route("/{*wildcard}", post(senderror))
@@ -36,38 +46,30 @@ pub async fn run_server(bindaddress: String, master_key: String, mut to_return:
.route("/{*wildcard}", head(senderror))
.route("/{*wildcard}", delete(senderror))
.route("/jwt", post(jwt_gen))
.route("/conf", post(conf))
.route("/metrics", get(metrics))
.with_state(master_key.clone())
.route(
"/conf",
post(|up: String| async move {
let serverlist = crate::utils::parceyaml::load_configuration(up.as_str(), "content");
match serverlist {
Some(serverlist) => {
let _ = tr.send(serverlist).await.unwrap();
Response::builder().status(StatusCode::CREATED).body(Body::from("Config, conf file, updated!\n")).unwrap()
}
None => Response::builder()
.status(StatusCode::INTERNAL_SERVER_ERROR)
.body(Body::from("Failed to parce config file!\n"))
.unwrap(),
}
})
.with_state("state"),
);
.with_state(app_state);
let listener = TcpListener::bind(bindaddress.clone()).await.unwrap();
info!("Starting the API server on: {}", bindaddress);
axum::serve(listener, app).await.unwrap();
}
#[allow(dead_code)]
async fn senderror() -> impl IntoResponse {
Response::builder().status(StatusCode::BAD_GATEWAY).body(Body::from("No live upstream found!\n")).unwrap()
async fn conf(State(mut st): State<AppState>, Query(params): Query<HashMap<String, String>>, content: String) -> impl IntoResponse {
if let Some(s) = params.get("key") {
if s.to_owned() == st.master_key.to_owned() {
if let Some(serverlist) = crate::utils::parceyaml::load_configuration(content.as_str(), "content") {
st.config_sender.send(serverlist).await.unwrap();
return Response::builder().status(StatusCode::OK).body(Body::from("Config, conf file, updated !\n")).unwrap();
} else {
return Response::builder().status(StatusCode::BAD_GATEWAY).body(Body::from("Failed to parse config!\n")).unwrap();
};
}
}
Response::builder().status(StatusCode::FORBIDDEN).body(Body::from("Access Denied !\n")).unwrap()
}
async fn jwt_gen(State(master_key): State<String>, Json(payload): Json<InputKey>) -> (StatusCode, Json<OutToken>) {
if payload.master_key == master_key {
async fn jwt_gen(State(state): State<AppState>, Json(payload): Json<InputKey>) -> (StatusCode, Json<OutToken>) {
if payload.master_key == state.master_key {
let now = SystemTime::now() + Duration::from_secs(payload.valid * 60);
let a = now.duration_since(UNIX_EPOCH).unwrap().as_secs();
let claim = crate::utils::jwt::Claims { user: payload.owner, exp: a };
@@ -111,3 +113,8 @@ async fn metrics() -> impl IntoResponse {
.body(Body::from(buffer))
.unwrap()
}
#[allow(dead_code)]
async fn senderror() -> impl IntoResponse {
Response::builder().status(StatusCode::BAD_GATEWAY).body(Body::from("No live upstream found!\n")).unwrap()
}