mirror of
https://github.com/sadoyan/aralez.git
synced 2026-07-01 03:42:21 +08:00
Compare commits
13 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
e07791e1f0 | ||
|
|
7b952b467c | ||
|
|
9a1e8fe149 | ||
|
|
874ce4fddb | ||
|
|
99c7a811da | ||
|
|
edadda803c | ||
|
|
c5d0fe60a0 | ||
|
|
d7fe476b24 | ||
|
|
aa22749393 | ||
|
|
4b6a558b4b | ||
|
|
6c9c7acf65 | ||
|
|
2845dc5923 | ||
|
|
f2e07a8d70 |
3
Cargo.lock
generated
3
Cargo.lock
generated
@@ -127,7 +127,7 @@ checksum = "7f202df86484c868dbad7eaa557ef785d5c66295e41b460ef922eca0723b842c"
|
|||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "aralez"
|
name = "aralez"
|
||||||
version = "0.92.11"
|
version = "0.92.13"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"ahash",
|
"ahash",
|
||||||
"arc-swap",
|
"arc-swap",
|
||||||
@@ -136,7 +136,6 @@ dependencies = [
|
|||||||
"base16ct 1.0.0",
|
"base16ct 1.0.0",
|
||||||
"base64",
|
"base64",
|
||||||
"dashmap",
|
"dashmap",
|
||||||
"futures",
|
|
||||||
"instant-acme",
|
"instant-acme",
|
||||||
"jsonwebtoken",
|
"jsonwebtoken",
|
||||||
"libc",
|
"libc",
|
||||||
|
|||||||
@@ -1,6 +1,6 @@
|
|||||||
[package]
|
[package]
|
||||||
name = "aralez"
|
name = "aralez"
|
||||||
version = "0.92.11"
|
version = "0.92.13"
|
||||||
edition = "2021"
|
edition = "2021"
|
||||||
license = "Apache-2.0"
|
license = "Apache-2.0"
|
||||||
description = "Reverse proxy built on top of Cloudflare's Pingora"
|
description = "Reverse proxy built on top of Cloudflare's Pingora"
|
||||||
@@ -24,7 +24,6 @@ pingora-limits = "0.8.1"
|
|||||||
dashmap = "7.0.0-rc2"
|
dashmap = "7.0.0-rc2"
|
||||||
async-trait = "0.1.89"
|
async-trait = "0.1.89"
|
||||||
log = "0.4.30"
|
log = "0.4.30"
|
||||||
futures = "0.3.32"
|
|
||||||
notify = "9.0.0-rc.4"
|
notify = "9.0.0-rc.4"
|
||||||
axum = { version = "0.8.9" }
|
axum = { version = "0.8.9" }
|
||||||
reqwest = { version = "0.13.4", features = ["json", "stream", "blocking"] }
|
reqwest = { version = "0.13.4", features = ["json", "stream", "blocking"] }
|
||||||
|
|||||||
@@ -3,8 +3,8 @@ use crate::utils::kuberconsul::{ConsulDiscovery, KubernetesDiscovery, ServiceDis
|
|||||||
use crate::utils::structs::{Configuration, UpstreamsDashMap};
|
use crate::utils::structs::{Configuration, UpstreamsDashMap};
|
||||||
use crate::web::webserver;
|
use crate::web::webserver;
|
||||||
use async_trait::async_trait;
|
use async_trait::async_trait;
|
||||||
use futures::channel::mpsc::Sender;
|
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
use tokio::sync::mpsc::Sender;
|
||||||
|
|
||||||
pub struct APIUpstreamProvider {
|
pub struct APIUpstreamProvider {
|
||||||
pub config_api_enabled: bool,
|
pub config_api_enabled: bool,
|
||||||
@@ -46,7 +46,7 @@ impl Discovery for APIUpstreamProvider {
|
|||||||
#[async_trait]
|
#[async_trait]
|
||||||
impl Discovery for FromFileProvider {
|
impl Discovery for FromFileProvider {
|
||||||
async fn start(&self, tx: Sender<Configuration>) {
|
async fn start(&self, tx: Sender<Configuration>) {
|
||||||
tokio::spawn(filewatch::start(self.path.clone(), tx.clone()));
|
tokio::spawn(filewatch::start(self.path.clone(), tx));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -1,16 +1,15 @@
|
|||||||
use crate::utils::parceyaml::load_configuration;
|
use crate::utils::parceyaml::load_configuration;
|
||||||
use crate::utils::structs::Configuration;
|
use crate::utils::structs::Configuration;
|
||||||
use futures::channel::mpsc::Sender;
|
|
||||||
use futures::SinkExt;
|
|
||||||
use log::error;
|
use log::error;
|
||||||
use notify::event::ModifyKind;
|
use notify::event::ModifyKind;
|
||||||
use notify::{Config, Event, EventKind, RecommendedWatcher, RecursiveMode, Watcher};
|
use notify::{Config, Event, EventKind, RecommendedWatcher, RecursiveMode, Watcher};
|
||||||
use pingora::prelude::sleep;
|
use pingora::prelude::sleep;
|
||||||
use std::path::Path;
|
use std::path::Path;
|
||||||
use std::time::{Duration, Instant};
|
use std::time::{Duration, Instant};
|
||||||
|
use tokio::sync::mpsc::Sender;
|
||||||
use tokio::task;
|
use tokio::task;
|
||||||
|
|
||||||
pub async fn start(fp: String, mut toreturn: Sender<Configuration>) {
|
pub async fn start(fp: String, toreturn: Sender<Configuration>) {
|
||||||
sleep(Duration::from_millis(50)).await; // For having nice logs :-)
|
sleep(Duration::from_millis(50)).await; // For having nice logs :-)
|
||||||
let file_path = fp.as_str();
|
let file_path = fp.as_str();
|
||||||
let parent_dir = Path::new(file_path).parent().unwrap();
|
let parent_dir = Path::new(file_path).parent().unwrap();
|
||||||
|
|||||||
@@ -4,8 +4,6 @@ use crate::utils::structs::{Configuration, GlobalServiceMapping, InnerMap, Upstr
|
|||||||
use crate::utils::tools::{clone_dashmap_into, compare_dashmaps, print_upstreams};
|
use crate::utils::tools::{clone_dashmap_into, compare_dashmaps, print_upstreams};
|
||||||
use async_trait::async_trait;
|
use async_trait::async_trait;
|
||||||
use dashmap::DashMap;
|
use dashmap::DashMap;
|
||||||
use futures::channel::mpsc::Sender;
|
|
||||||
use futures::SinkExt;
|
|
||||||
use pingora::prelude::sleep;
|
use pingora::prelude::sleep;
|
||||||
use rand::RngExt;
|
use rand::RngExt;
|
||||||
use serde::Deserialize;
|
use serde::Deserialize;
|
||||||
@@ -18,6 +16,7 @@ use std::sync::Arc;
|
|||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
use tokio::fs::File;
|
use tokio::fs::File;
|
||||||
use tokio::io::AsyncReadExt;
|
use tokio::io::AsyncReadExt;
|
||||||
|
use tokio::sync::mpsc::Sender;
|
||||||
|
|
||||||
#[derive(Debug, serde::Deserialize)]
|
#[derive(Debug, serde::Deserialize)]
|
||||||
pub struct KubeEndpoints {
|
pub struct KubeEndpoints {
|
||||||
@@ -95,7 +94,7 @@ pub struct ConsulDiscovery;
|
|||||||
|
|
||||||
#[async_trait]
|
#[async_trait]
|
||||||
impl ServiceDiscovery for KubernetesDiscovery {
|
impl ServiceDiscovery for KubernetesDiscovery {
|
||||||
async fn fetch_upstreams(&self, config: Arc<Configuration>, mut toreturn: Sender<Configuration>) {
|
async fn fetch_upstreams(&self, config: Arc<Configuration>, toreturn: Sender<Configuration>) {
|
||||||
let prev_upstreams = UpstreamsDashMap::new();
|
let prev_upstreams = UpstreamsDashMap::new();
|
||||||
|
|
||||||
if let Some(kuber) = config.kubernetes.clone() {
|
if let Some(kuber) = config.kubernetes.clone() {
|
||||||
@@ -162,7 +161,7 @@ fn get_current_namespace() -> Option<String> {
|
|||||||
|
|
||||||
#[async_trait]
|
#[async_trait]
|
||||||
impl ServiceDiscovery for ConsulDiscovery {
|
impl ServiceDiscovery for ConsulDiscovery {
|
||||||
async fn fetch_upstreams(&self, config: Arc<Configuration>, mut toreturn: Sender<Configuration>) {
|
async fn fetch_upstreams(&self, config: Arc<Configuration>, toreturn: Sender<Configuration>) {
|
||||||
let prev_upstreams = UpstreamsDashMap::new();
|
let prev_upstreams = UpstreamsDashMap::new();
|
||||||
loop {
|
loop {
|
||||||
let upstreams = UpstreamsDashMap::new();
|
let upstreams = UpstreamsDashMap::new();
|
||||||
|
|||||||
@@ -15,6 +15,7 @@ pub struct MetricTypes {
|
|||||||
}
|
}
|
||||||
|
|
||||||
pub static OPEN_FILES: LazyLock<IntGauge> = LazyLock::new(|| register_int_gauge!("aralez_open_files", "Number of open file descriptors").unwrap());
|
pub static OPEN_FILES: LazyLock<IntGauge> = LazyLock::new(|| register_int_gauge!("aralez_open_files", "Number of open file descriptors").unwrap());
|
||||||
|
pub static LOGGING_ERRORS: LazyLock<IntGauge> = LazyLock::new(|| register_int_gauge!("aralez_logging_errors", "Number of log errors").unwrap());
|
||||||
pub static MEMORY_USAGE: LazyLock<IntGauge> = LazyLock::new(|| register_int_gauge!("aralez_memory_bytes", "Total memory allocated in bytes").unwrap());
|
pub static MEMORY_USAGE: LazyLock<IntGauge> = LazyLock::new(|| register_int_gauge!("aralez_memory_bytes", "Total memory allocated in bytes").unwrap());
|
||||||
pub static ACTIVE_SESSIONS: LazyLock<IntGauge> = LazyLock::new(|| register_int_gauge!("aralez_active_sessions", "Current number of active sessions").unwrap());
|
pub static ACTIVE_SESSIONS: LazyLock<IntGauge> = LazyLock::new(|| register_int_gauge!("aralez_active_sessions", "Current number of active sessions").unwrap());
|
||||||
pub static REQUEST_COUNT: LazyLock<IntCounter> = LazyLock::new(|| register_int_counter!("aralez_requests_total", "Total number of requests handled by Aralez").unwrap());
|
pub static REQUEST_COUNT: LazyLock<IntCounter> = LazyLock::new(|| register_int_counter!("aralez_requests_total", "Total number of requests handled by Aralez").unwrap());
|
||||||
@@ -22,15 +23,6 @@ pub static REQUEST_COUNT: LazyLock<IntCounter> = LazyLock::new(|| register_int_c
|
|||||||
pub static RESPONSE_CODES: LazyLock<IntCounterVec> =
|
pub static RESPONSE_CODES: LazyLock<IntCounterVec> =
|
||||||
LazyLock::new(|| register_int_counter_vec!("aralez_responses_total", "Responses grouped by status code", &["status"]).unwrap());
|
LazyLock::new(|| register_int_counter_vec!("aralez_responses_total", "Responses grouped by status code", &["status"]).unwrap());
|
||||||
|
|
||||||
// pub static RESPONSE_LATENCY: LazyLock<Histogram> = LazyLock::new(|| {
|
|
||||||
// register_histogram!(
|
|
||||||
// "aralez_response_latency_seconds",
|
|
||||||
// "Response latency in seconds",
|
|
||||||
// vec![0.01, 0.05, 0.1, 0.25, 0.5, 1.0, 2.0, 5.0]
|
|
||||||
// )
|
|
||||||
// .unwrap()
|
|
||||||
// });
|
|
||||||
|
|
||||||
pub static RESPONSE_LATENCY: LazyLock<Histogram> = LazyLock::new(|| {
|
pub static RESPONSE_LATENCY: LazyLock<Histogram> = LazyLock::new(|| {
|
||||||
register_histogram!(
|
register_histogram!(
|
||||||
"aralez_response_latency_seconds",
|
"aralez_response_latency_seconds",
|
||||||
|
|||||||
@@ -96,7 +96,7 @@ pub async fn load_configuration(d: &str, kind: &str) -> (Option<Configuration>,
|
|||||||
}
|
}
|
||||||
|
|
||||||
info!("Reading upstreams from {}", d);
|
info!("Reading upstreams from {}", d);
|
||||||
data
|
data // [2606:4700:2ff9::1]:443
|
||||||
}
|
}
|
||||||
"content" => {
|
"content" => {
|
||||||
info!("Reading upstreams from API post body");
|
info!("Reading upstreams from API post body");
|
||||||
@@ -225,7 +225,8 @@ async fn populate_file_upstreams(config: &mut Configuration, parsed: &Config) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
let redirect_link = path_config.redirect_to.as_ref().map(|www| Arc::from(www.as_str()));
|
let redirect_link = path_config.redirect_to.as_ref().map(|www| Arc::from(www.as_str()));
|
||||||
if let Some((ip, port_str)) = server.split_once(':') {
|
|
||||||
|
if let Some((ip, port_str)) = server.rsplit_once(':') {
|
||||||
if let Ok(port) = port_str.parse::<u16>() {
|
if let Ok(port) = port_str.parse::<u16>() {
|
||||||
server_list.push(Arc::from(InnerMap {
|
server_list.push(Arc::from(InnerMap {
|
||||||
address: Arc::from(ip),
|
address: Arc::from(ip),
|
||||||
@@ -271,18 +272,18 @@ pub fn parce_main_config(path: &str) -> AppConfig {
|
|||||||
|
|
||||||
log_builder(&cfo, &cfo.log_file);
|
log_builder(&cfo, &cfo.log_file);
|
||||||
cfo.hc_method = cfo.hc_method.to_uppercase();
|
cfo.hc_method = cfo.hc_method.to_uppercase();
|
||||||
if let Some((ip, port_str)) = cfo.config_address.split_once(':') {
|
if let Some((ip, port_str)) = cfo.config_address.rsplit_once(':') {
|
||||||
if let Ok(port) = port_str.parse::<u16>() {
|
if let Ok(port) = port_str.parse::<u16>() {
|
||||||
cfo.local_server = Option::from((ip.to_string(), port));
|
cfo.local_server = Option::from((ip.to_string(), port));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if let Some(tlsport_cfg) = cfo.proxy_address_tls.clone() {
|
if let Some(tlsport_cfg) = cfo.proxy_address_tls.clone() {
|
||||||
if let Some((_, port_str)) = tlsport_cfg.split_once(':') {
|
if let Some((_, port_str)) = tlsport_cfg.rsplit_once(':') {
|
||||||
cfo.proxy_port_tls = Some(port_str.to_string());
|
cfo.proxy_port_tls = Some(port_str.to_string());
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
if let Some((_, port_str)) = cfo.proxy_address_http.split_once(':') {
|
if let Some((_, port_str)) = cfo.proxy_address_http.rsplit_once(':') {
|
||||||
cfo.proxy_port = Some(port_str.to_string());
|
cfo.proxy_port = Some(port_str.to_string());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -8,8 +8,6 @@ pub type UpstreamsDashMap = DashMap<Arc<str>, DashMap<Arc<str>, (Vec<Arc<InnerMa
|
|||||||
|
|
||||||
pub type UpstreamsIdMap = DashMap<String, Arc<InnerMap>>;
|
pub type UpstreamsIdMap = DashMap<String, Arc<InnerMap>>;
|
||||||
pub type Headers = DashMap<Arc<str>, DashMap<Arc<str>, Vec<(String, Arc<str>)>>>;
|
pub type Headers = DashMap<Arc<str>, DashMap<Arc<str>, Vec<(String, Arc<str>)>>>;
|
||||||
// pub type UpstreamsSerDde = Option<HashMap<String, HostConfig>>;
|
|
||||||
// pub type UpstreamsSerDe = HashMap<String, HostConfig>;
|
|
||||||
|
|
||||||
#[derive(Clone, Debug, Default)]
|
#[derive(Clone, Debug, Default)]
|
||||||
pub struct Extraparams {
|
pub struct Extraparams {
|
||||||
|
|||||||
@@ -1,6 +1,6 @@
|
|||||||
use crate::tls::load;
|
use crate::tls::load;
|
||||||
use crate::tls::load::CertificateConfig;
|
use crate::tls::load::CertificateConfig;
|
||||||
use crate::utils::structs::{Extraparams, InnerMap, InnerMapForJson, UpstreamSnapshotForJson, UpstreamsDashMap, UpstreamsIdMap};
|
use crate::utils::structs::{Extraparams, InnerMapForJson, UpstreamSnapshotForJson, UpstreamsDashMap, UpstreamsIdMap};
|
||||||
use dashmap::DashMap;
|
use dashmap::DashMap;
|
||||||
use log::{error, info};
|
use log::{error, info};
|
||||||
use notify::{event::ModifyKind, Config, EventKind, RecommendedWatcher, RecursiveMode, Watcher};
|
use notify::{event::ModifyKind, Config, EventKind, RecommendedWatcher, RecursiveMode, Watcher};
|
||||||
@@ -147,11 +147,8 @@ pub fn clone_idmap_into(original: &UpstreamsDashMap, cloned: &UpstreamsIdMap) {
|
|||||||
cloned.clear();
|
cloned.clear();
|
||||||
for outer_entry in original.iter() {
|
for outer_entry in original.iter() {
|
||||||
let inner_map = outer_entry.value();
|
let inner_map = outer_entry.value();
|
||||||
let new_inner_map = DashMap::new();
|
|
||||||
for inner_entry in inner_map.iter() {
|
for inner_entry in inner_map.iter() {
|
||||||
let path = inner_entry.key();
|
|
||||||
let (vec, _) = inner_entry.value();
|
let (vec, _) = inner_entry.value();
|
||||||
let new_vec = vec.clone();
|
|
||||||
for x in vec.iter() {
|
for x in vec.iter() {
|
||||||
let mut id = String::new();
|
let mut id = String::new();
|
||||||
write!(
|
write!(
|
||||||
@@ -175,22 +172,8 @@ pub fn clone_idmap_into(original: &UpstreamsDashMap, cloned: &UpstreamsIdMap) {
|
|||||||
let hash = hasher.finalize();
|
let hash = hasher.finalize();
|
||||||
let hex_hash = base16ct::lower::encode_string(&hash);
|
let hex_hash = base16ct::lower::encode_string(&hash);
|
||||||
let hh = hex_hash[0..50].to_string();
|
let hh = hex_hash[0..50].to_string();
|
||||||
let to_add = InnerMap {
|
|
||||||
address: Arc::from("127.0.0.1"),
|
|
||||||
port: 0,
|
|
||||||
is_ssl: false,
|
|
||||||
is_http2: false,
|
|
||||||
to_https: false,
|
|
||||||
rate_limit: None,
|
|
||||||
x4xx_limit: None,
|
|
||||||
healthcheck: None,
|
|
||||||
redirect_to: None,
|
|
||||||
authorization: None,
|
|
||||||
};
|
|
||||||
cloned.insert(id, Arc::from(to_add));
|
|
||||||
cloned.insert(hh, x.to_owned());
|
cloned.insert(hh, x.to_owned());
|
||||||
}
|
}
|
||||||
new_inner_map.insert(path.clone(), new_vec);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
info!("Upstreams are fully populated. Ready to server requests");
|
info!("Upstreams are fully populated. Ready to server requests");
|
||||||
|
|||||||
@@ -4,21 +4,21 @@ use crate::utils::parceyaml::load_configuration;
|
|||||||
use crate::utils::structs::Configuration;
|
use crate::utils::structs::Configuration;
|
||||||
use crate::utils::tools::*;
|
use crate::utils::tools::*;
|
||||||
use crate::utils::*;
|
use crate::utils::*;
|
||||||
|
use crate::web::logging::init_logging;
|
||||||
use crate::web::proxyhttp::LB;
|
use crate::web::proxyhttp::LB;
|
||||||
use async_trait::async_trait;
|
use async_trait::async_trait;
|
||||||
use dashmap::DashMap;
|
use dashmap::DashMap;
|
||||||
use futures::channel::mpsc;
|
|
||||||
use futures::{SinkExt, StreamExt};
|
|
||||||
use log::{error, info};
|
use log::{error, info};
|
||||||
use pingora_core::server::ShutdownWatch;
|
use pingora_core::server::ShutdownWatch;
|
||||||
use pingora_core::services::background::BackgroundService;
|
use pingora_core::services::background::BackgroundService;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
use tokio::sync::mpsc;
|
||||||
|
|
||||||
#[async_trait]
|
#[async_trait]
|
||||||
impl BackgroundService for LB {
|
impl BackgroundService for LB {
|
||||||
async fn start(&self, mut shutdown: ShutdownWatch) {
|
async fn start(&self, mut shutdown: ShutdownWatch) {
|
||||||
info!("Starting background service"); // tx: Sender<Configuration>
|
info!("Starting background service"); // tx: Sender<Configuration>
|
||||||
let (mut tx, mut rx) = mpsc::channel::<Configuration>(1);
|
let (tx, mut rx) = mpsc::channel::<Configuration>(1);
|
||||||
let tx_api = tx.clone();
|
let tx_api = tx.clone();
|
||||||
let config = load_configuration(self.config.upstreams_conf.clone().as_str(), "filepath")
|
let config = load_configuration(self.config.upstreams_conf.clone().as_str(), "filepath")
|
||||||
.await
|
.await
|
||||||
@@ -77,13 +77,13 @@ impl BackgroundService for LB {
|
|||||||
healthcheck::hc2(uu, ff, im, (&*hc_method.to_string(), hc_interval.to_string().parse().unwrap())).await
|
healthcheck::hc2(uu, ff, im, (&*hc_method.to_string(), hc_interval.to_string().parse().unwrap())).await
|
||||||
}));
|
}));
|
||||||
drop(tokio::spawn(async move { refresh_order(certdir, confdir).await }));
|
drop(tokio::spawn(async move { refresh_order(certdir, confdir).await }));
|
||||||
|
init_logging(self.config.access_log.clone());
|
||||||
loop {
|
loop {
|
||||||
tokio::select! {
|
tokio::select! {
|
||||||
_ = shutdown.changed() => {
|
_ = shutdown.changed() => {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
val = rx.next() => {
|
val = rx.recv() => {
|
||||||
if let Some(ss) = val {
|
if let Some(ss) = val {
|
||||||
clone_dashmap_into(&ss.upstreams, &self.ump_full);
|
clone_dashmap_into(&ss.upstreams, &self.ump_full);
|
||||||
clone_dashmap_into(&ss.upstreams, &self.ump_upst);
|
clone_dashmap_into(&ss.upstreams, &self.ump_upst);
|
||||||
|
|||||||
@@ -1,7 +1,6 @@
|
|||||||
use crate::utils::structs::InnerMap;
|
use crate::utils::structs::InnerMap;
|
||||||
use crate::web::proxyhttp::LB;
|
use crate::web::proxyhttp::LB;
|
||||||
use async_trait::async_trait;
|
use std::sync::atomic::{AtomicUsize, Ordering};
|
||||||
use std::sync::atomic::Ordering;
|
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
|
||||||
#[derive(Debug, Clone)]
|
#[derive(Debug, Clone)]
|
||||||
@@ -10,30 +9,38 @@ pub struct GetHostsReturHeaders {
|
|||||||
pub server_headers: Option<Vec<(String, Arc<str>)>>,
|
pub server_headers: Option<Vec<(String, Arc<str>)>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[async_trait]
|
|
||||||
pub trait GetHost {
|
pub trait GetHost {
|
||||||
|
fn find_sticky_backend(&self, servers: &[Arc<InnerMap>], backend_id: Option<&str>) -> Option<Arc<InnerMap>>;
|
||||||
|
fn pick_backend(&self, servers: &[Arc<InnerMap>], index: &AtomicUsize, backend_id: Option<&str>) -> Option<Arc<InnerMap>>;
|
||||||
fn get_host(&self, peer: &str, path: &str, backend_id: Option<&str>) -> Option<Arc<InnerMap>>;
|
fn get_host(&self, peer: &str, path: &str, backend_id: Option<&str>) -> Option<Arc<InnerMap>>;
|
||||||
|
|
||||||
fn get_header(&self, peer: &str, path: &str) -> Option<GetHostsReturHeaders>;
|
fn get_header(&self, peer: &str, path: &str) -> Option<GetHostsReturHeaders>;
|
||||||
// fn get_upstreams(&self) -> Arc<UpstreamsDashMap>;
|
|
||||||
}
|
}
|
||||||
#[async_trait]
|
|
||||||
impl GetHost for LB {
|
impl GetHost for LB {
|
||||||
fn get_host(&self, peer: &str, path: &str, backend_id: Option<&str>) -> Option<Arc<InnerMap>> {
|
fn find_sticky_backend(&self, servers: &[Arc<InnerMap>], backend_id: Option<&str>) -> Option<Arc<InnerMap>> {
|
||||||
if let Some(b) = backend_id {
|
let b = backend_id?;
|
||||||
if let Some(bb) = self.ump_byid.get(b) {
|
let bb = self.ump_byid.get(b)?;
|
||||||
return Some(bb.value().clone());
|
let target = bb.value();
|
||||||
}
|
servers.iter().any(|s| s.address == target.address && s.port == target.port).then(|| target.clone())
|
||||||
|
}
|
||||||
|
fn pick_backend(&self, servers: &[Arc<InnerMap>], index: &AtomicUsize, backend_id: Option<&str>) -> Option<Arc<InnerMap>> {
|
||||||
|
if servers.is_empty() {
|
||||||
|
return None;
|
||||||
}
|
}
|
||||||
|
if let Some(target) = self.find_sticky_backend(servers, backend_id) {
|
||||||
|
return Some(target);
|
||||||
|
}
|
||||||
|
let idx = index.fetch_add(1, Ordering::Relaxed) % servers.len();
|
||||||
|
Some(servers[idx].clone())
|
||||||
|
}
|
||||||
|
fn get_host(&self, peer: &str, path: &str, backend_id: Option<&str>) -> Option<Arc<InnerMap>> {
|
||||||
let host_entry = self.ump_upst.get(peer)?;
|
let host_entry = self.ump_upst.get(peer)?;
|
||||||
let mut end = path.len();
|
let mut end = path.len();
|
||||||
loop {
|
loop {
|
||||||
let slice = &path[..end];
|
let slice = &path[..end];
|
||||||
if let Some(entry) = host_entry.get(slice) {
|
if let Some(entry) = host_entry.get(slice) {
|
||||||
let (servers, index) = entry.value();
|
let (servers, index) = entry.value();
|
||||||
if !servers.is_empty() {
|
if let Some(backend) = self.pick_backend(servers, index, backend_id) {
|
||||||
let idx = index.fetch_add(1, Ordering::Relaxed) % servers.len();
|
return Some(backend);
|
||||||
return Some(servers[idx].clone());
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if let Some(pos) = slice.rfind('/') {
|
if let Some(pos) = slice.rfind('/') {
|
||||||
@@ -44,9 +51,8 @@ impl GetHost for LB {
|
|||||||
}
|
}
|
||||||
if let Some(entry) = host_entry.get("/") {
|
if let Some(entry) = host_entry.get("/") {
|
||||||
let (servers, index) = entry.value();
|
let (servers, index) = entry.value();
|
||||||
if !servers.is_empty() {
|
if let Some(backend) = self.pick_backend(servers, index, backend_id) {
|
||||||
let idx = index.fetch_add(1, Ordering::Relaxed) % servers.len();
|
return Some(backend);
|
||||||
return Some(servers[idx].clone());
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
None
|
None
|
||||||
|
|||||||
@@ -1,15 +1,29 @@
|
|||||||
|
use crate::utils::metrics::LOGGING_ERRORS;
|
||||||
use log::info;
|
use log::info;
|
||||||
|
use pingora_http::Version;
|
||||||
use pingora_proxy::Session;
|
use pingora_proxy::Session;
|
||||||
use std::net::{IpAddr, Ipv4Addr};
|
use std::net::{IpAddr, Ipv4Addr};
|
||||||
use std::sync::OnceLock;
|
use std::sync::OnceLock;
|
||||||
|
use tokio::sync::mpsc;
|
||||||
|
|
||||||
pub static ACCESS_LOG: OnceLock<LogLevel> = OnceLock::new();
|
#[derive(Debug)]
|
||||||
|
pub struct LogMessage {
|
||||||
|
pub response_code: u16,
|
||||||
|
pub summary: String,
|
||||||
|
pub client_ip: IpAddr,
|
||||||
|
pub version: Version,
|
||||||
|
pub user_agent: String,
|
||||||
|
}
|
||||||
|
static LOG_SENDER: OnceLock<mpsc::Sender<LogMessage>> = OnceLock::new();
|
||||||
|
static ACCESS_LOG: OnceLock<LogLevel> = OnceLock::new();
|
||||||
|
const LOG_BUFFER: usize = 16384;
|
||||||
|
|
||||||
pub fn init_access_log(level_str: &str) {
|
pub fn init_access_log(level_str: &str) {
|
||||||
let level = LogLevel::from_str(level_str);
|
let level = LogLevel::from_str(level_str);
|
||||||
let _ = ACCESS_LOG.set(level);
|
let _ = ACCESS_LOG.set(level);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
pub enum LogLevel {
|
pub enum LogLevel {
|
||||||
Access,
|
Access,
|
||||||
Error,
|
Error,
|
||||||
@@ -47,11 +61,37 @@ pub fn access_log(response_code: u16, summary: &str, session: &Session) {
|
|||||||
|
|
||||||
let user_agent = session.req_header().headers.get("user-agent").and_then(|v| v.to_str().ok()).unwrap_or("-");
|
let user_agent = session.req_header().headers.get("user-agent").and_then(|v| v.to_str().ok()).unwrap_or("-");
|
||||||
|
|
||||||
info!(
|
let log = LogMessage {
|
||||||
"{}, response code: {response_code}, client: {}, version: {:?}, useragent: {}",
|
response_code,
|
||||||
summary,
|
summary: summary.to_owned(),
|
||||||
ip,
|
client_ip: ip,
|
||||||
session.req_header().version,
|
version: session.req_header().version,
|
||||||
user_agent,
|
user_agent: user_agent.to_owned(),
|
||||||
);
|
};
|
||||||
|
|
||||||
|
if let Some(sender) = LOG_SENDER.get() {
|
||||||
|
let sender = sender;
|
||||||
|
if let Err(_) = sender.try_send(log) {
|
||||||
|
LOGGING_ERRORS.inc();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn init_logging(enabled: Option<String>) {
|
||||||
|
if let Some(_) = enabled {
|
||||||
|
LOGGING_ERRORS.set(0);
|
||||||
|
info!("Enabling {:?} log, with buffer of {} messages", ACCESS_LOG.get().unwrap_or(&LogLevel::None), LOG_BUFFER);
|
||||||
|
let (ltx, lrx) = mpsc::channel(LOG_BUFFER);
|
||||||
|
LOG_SENDER.set(ltx).unwrap();
|
||||||
|
std::thread::spawn(move || log_receiver(lrx));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn log_receiver(mut receiver: mpsc::Receiver<LogMessage>) {
|
||||||
|
while let Some(msg) = receiver.blocking_recv() {
|
||||||
|
info!(
|
||||||
|
"{}, {}, client: {}, version: {:?}, useragent: {}",
|
||||||
|
msg.response_code, msg.summary, msg.client_ip, msg.version, msg.user_agent,
|
||||||
|
);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -95,9 +95,6 @@ impl ProxyHttp for LB {
|
|||||||
let header = ResponseHeader::build(429, None)?;
|
let header = ResponseHeader::build(429, None)?;
|
||||||
session.set_keepalive(None);
|
session.set_keepalive(None);
|
||||||
session.write_response_header(Box::new(header), true).await?;
|
session.write_response_header(Box::new(header), true).await?;
|
||||||
// if let (Some(oi), Some(oa)) = (&_ctx.hostname, rate_key) {
|
|
||||||
// warn!("Limit 4XX: {}-rps exceed on {} from {} path {}", rate, oi, oa, session.req_header().uri.path());
|
|
||||||
// }
|
|
||||||
return Ok(true);
|
return Ok(true);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -109,9 +106,6 @@ impl ProxyHttp for LB {
|
|||||||
let header = ResponseHeader::build(429, None)?;
|
let header = ResponseHeader::build(429, None)?;
|
||||||
session.set_keepalive(None);
|
session.set_keepalive(None);
|
||||||
session.write_response_header(Box::new(header), true).await?;
|
session.write_response_header(Box::new(header), true).await?;
|
||||||
// if let (Some(oi), Some(oa)) = (&_ctx.hostname, rate_key) {
|
|
||||||
// warn!("Limit: {}-rps exceed on {} from {}", rate, oi, oa);
|
|
||||||
// }
|
|
||||||
return Ok(true);
|
return Ok(true);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -283,7 +277,7 @@ impl ProxyHttp for LB {
|
|||||||
buf.push_str("; Path=/; Max-Age=");
|
buf.push_str("; Path=/; Max-Age=");
|
||||||
buf.push_str(&val.to_string());
|
buf.push_str(&val.to_string());
|
||||||
buf.push_str("; HttpOnly; SameSite=Lax");
|
buf.push_str("; HttpOnly; SameSite=Lax");
|
||||||
let _ = _upstream_response.insert_header("set-cookie", buf.as_str());
|
let _ = _upstream_response.append_header("set-cookie", buf.as_str());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -10,8 +10,6 @@ use axum::http::{Response, StatusCode};
|
|||||||
use axum::response::IntoResponse;
|
use axum::response::IntoResponse;
|
||||||
use axum::routing::{any, get, post};
|
use axum::routing::{any, get, post};
|
||||||
use axum::{Json, Router};
|
use axum::{Json, Router};
|
||||||
use futures::channel::mpsc::Sender;
|
|
||||||
use futures::SinkExt;
|
|
||||||
use jsonwebtoken::{encode, EncodingKey, Header};
|
use jsonwebtoken::{encode, EncodingKey, Header};
|
||||||
use log::{debug, error, info, warn};
|
use log::{debug, error, info, warn};
|
||||||
use prometheus::{gather, Encoder, TextEncoder};
|
use prometheus::{gather, Encoder, TextEncoder};
|
||||||
@@ -37,14 +35,14 @@ pub(crate) struct AppState {
|
|||||||
pub(crate) cert_creds: String,
|
pub(crate) cert_creds: String,
|
||||||
pub(crate) certs_dir: String,
|
pub(crate) certs_dir: String,
|
||||||
upstreams_file: String,
|
upstreams_file: String,
|
||||||
config_sender: Sender<Configuration>,
|
config_sender: mpsc::Sender<Configuration>,
|
||||||
config_api_enabled: bool,
|
config_api_enabled: bool,
|
||||||
current_upstreams: Arc<UpstreamsDashMap>,
|
current_upstreams: Arc<UpstreamsDashMap>,
|
||||||
full_upstreams: Arc<UpstreamsDashMap>,
|
full_upstreams: Arc<UpstreamsDashMap>,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[allow(unused_mut)]
|
#[allow(unused_mut)]
|
||||||
pub async fn run_server(config: &APIUpstreamProvider, mut to_return: Sender<Configuration>, upstreams_curr: Arc<UpstreamsDashMap>, upstreams_full: Arc<UpstreamsDashMap>) {
|
pub async fn run_server(config: &APIUpstreamProvider, mut to_return: mpsc::Sender<Configuration>, upstreams_curr: Arc<UpstreamsDashMap>, upstreams_full: Arc<UpstreamsDashMap>) {
|
||||||
let credsfile = config.config_dir.clone() + "/acme_credentials.json";
|
let credsfile = config.config_dir.clone() + "/acme_credentials.json";
|
||||||
let app_state = AppState {
|
let app_state = AppState {
|
||||||
master_key: config.masterkey.clone(),
|
master_key: config.masterkey.clone(),
|
||||||
@@ -119,7 +117,7 @@ async fn conf(State(st): State<AppState>, Query(params): Query<HashMap<String, S
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn apply_config(content: &str, mut st: AppState, save: bool) {
|
async fn apply_config(content: &str, st: AppState, save: bool) {
|
||||||
let sl = crate::utils::parceyaml::load_configuration(content, "content").await;
|
let sl = crate::utils::parceyaml::load_configuration(content, "content").await;
|
||||||
if let Some(serverlist) = sl.0 {
|
if let Some(serverlist) = sl.0 {
|
||||||
if save {
|
if save {
|
||||||
|
|||||||
Reference in New Issue
Block a user