mirror of
https://github.com/sadoyan/aralez.git
synced 2026-04-30 23:08:40 +08:00
perf: optimize header handling and concurrent access patterns
This commit is contained in:
@@ -115,11 +115,21 @@ impl ServiceDiscovery for KubernetesDiscovery {
|
|||||||
if let Some(kuber) = config.kubernetes.clone() {
|
if let Some(kuber) = config.kubernetes.clone() {
|
||||||
if let Some(svc) = kuber.services {
|
if let Some(svc) = kuber.services {
|
||||||
for i in svc {
|
for i in svc {
|
||||||
let header_list = DashMap::new();
|
let header_list: DashMap<Arc<str>, Vec<(Arc<str>, Arc<str>)>> = DashMap::new();
|
||||||
let mut hl = Vec::new();
|
let mut hl = Vec::new();
|
||||||
build_headers(&i.client_headers, config.as_ref(), &mut hl);
|
build_headers(&i.client_headers, config.as_ref(), &mut hl);
|
||||||
if !hl.is_empty() {
|
if !hl.is_empty() {
|
||||||
header_list.insert(i.path.clone().unwrap_or("/".to_string()), hl);
|
match i.path.clone() {
|
||||||
|
Some(path) => {
|
||||||
|
header_list.insert(Arc::from(path.as_str()), hl);
|
||||||
|
}
|
||||||
|
None => {
|
||||||
|
header_list.insert(Arc::from("/"), hl);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// header_list.insert(Arc::from(path.as_str()), hl);
|
||||||
|
// header_list.insert(Arc::from(i.path).unwrap_or(Arc::from("/")).as_str(), hl);
|
||||||
config.client_headers.insert(i.hostname.clone(), header_list);
|
config.client_headers.insert(i.hostname.clone(), header_list);
|
||||||
}
|
}
|
||||||
let url = format!("https://{}/api/v1/namespaces/{}/endpoints/{}", server, namespace, i.hostname);
|
let url = format!("https://{}/api/v1/namespaces/{}/endpoints/{}", server, namespace, i.hostname);
|
||||||
@@ -171,7 +181,15 @@ impl ServiceDiscovery for ConsulDiscovery {
|
|||||||
let mut hl = Vec::new();
|
let mut hl = Vec::new();
|
||||||
build_headers(&i.client_headers, config.as_ref(), &mut hl);
|
build_headers(&i.client_headers, config.as_ref(), &mut hl);
|
||||||
if !hl.is_empty() {
|
if !hl.is_empty() {
|
||||||
header_list.insert(i.path.clone().unwrap_or("/".to_string()), hl);
|
match i.path.clone() {
|
||||||
|
Some(path) => {
|
||||||
|
header_list.insert(Arc::from(path.as_str()), hl);
|
||||||
|
}
|
||||||
|
None => {
|
||||||
|
header_list.insert(Arc::from("/"), hl);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// header_list.insert(i.path.clone().unwrap_or("/".to_string()), hl);
|
||||||
config.client_headers.insert(i.hostname.clone(), header_list);
|
config.client_headers.insert(i.hostname.clone(), header_list);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -6,7 +6,7 @@ use dashmap::DashMap;
|
|||||||
use log::{error, info, warn};
|
use log::{error, info, warn};
|
||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
use std::sync::atomic::AtomicUsize;
|
use std::sync::atomic::AtomicUsize;
|
||||||
// use std::sync::mpsc::{channel, Receiver, Sender};
|
use std::sync::Arc;
|
||||||
use std::{env, fs};
|
use std::{env, fs};
|
||||||
// use tokio::sync::oneshot::{Receiver, Sender};
|
// use tokio::sync::oneshot::{Receiver, Sender};
|
||||||
|
|
||||||
@@ -67,31 +67,31 @@ pub async fn load_configuration(d: &str, kind: &str) -> Option<Configuration> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
async fn populate_headers_and_auth(config: &mut Configuration, parsed: &Config) {
|
async fn populate_headers_and_auth(config: &mut Configuration, parsed: &Config) {
|
||||||
let mut ch = Vec::new();
|
let mut ch: Vec<(Arc<str>, Arc<str>)> = Vec::new();
|
||||||
ch.push(("Server".to_string(), "Aralez".to_string()));
|
ch.push((Arc::from("Server"), Arc::from("Aralez")));
|
||||||
// println!("{:?}", &parsed.client_headers);
|
// println!("{:?}", &parsed.client_headers);
|
||||||
if let Some(headers) = &parsed.client_headers {
|
if let Some(headers) = &parsed.client_headers {
|
||||||
for header in headers {
|
for header in headers {
|
||||||
if let Some((key, val)) = header.split_once(':') {
|
if let Some((key, val)) = header.split_once(':') {
|
||||||
ch.push((key.trim().to_string(), val.trim().to_string()));
|
ch.push((Arc::from(key), Arc::from(val)));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
let global_headers = DashMap::new();
|
let global_headers: DashMap<Arc<str>, Vec<(Arc<str>, Arc<str>)>> = DashMap::new();
|
||||||
global_headers.insert("/".to_string(), ch);
|
global_headers.insert(Arc::from("/"), ch);
|
||||||
config.client_headers.insert("GLOBAL_CLIENT_HEADERS".to_string(), global_headers);
|
config.client_headers.insert("GLOBAL_CLIENT_HEADERS".to_string(), global_headers);
|
||||||
|
|
||||||
let mut sh = Vec::new();
|
let mut sh: Vec<(Arc<str>, Arc<str>)> = Vec::new();
|
||||||
sh.push(("X-Proxy-Server".to_string(), "Aralez".to_string()));
|
sh.push((Arc::from("X-Proxy-Server"), Arc::from("Aralez")));
|
||||||
if let Some(headers) = &parsed.server_headers {
|
if let Some(headers) = &parsed.server_headers {
|
||||||
for header in headers {
|
for header in headers {
|
||||||
if let Some((key, val)) = header.split_once(':') {
|
if let Some((key, val)) = header.split_once(':') {
|
||||||
sh.push((key.trim().to_string(), val.trim().to_string()));
|
sh.push((Arc::from(key.trim()), Arc::from(val.trim())));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
let server_global_headers = DashMap::new();
|
let server_global_headers: DashMap<Arc<str>, Vec<(Arc<str>, Arc<str>)>> = DashMap::new();
|
||||||
server_global_headers.insert("/".to_string(), sh);
|
server_global_headers.insert(Arc::from("/"), sh);
|
||||||
config.server_headers.insert("GLOBAL_SERVER_HEADERS".to_string(), server_global_headers);
|
config.server_headers.insert("GLOBAL_SERVER_HEADERS".to_string(), server_global_headers);
|
||||||
|
|
||||||
config.extraparams.sticky_sessions = parsed.sticky_sessions;
|
config.extraparams.sticky_sessions = parsed.sticky_sessions;
|
||||||
@@ -123,12 +123,12 @@ async fn populate_file_upstreams(config: &mut Configuration, parsed: &Config) {
|
|||||||
info!("Applied Rate Limit for {} : {} request per second", hostname, rate);
|
info!("Applied Rate Limit for {} : {} request per second", hostname, rate);
|
||||||
}
|
}
|
||||||
|
|
||||||
let mut hl: Vec<(String, String)> = Vec::new();
|
let mut hl: Vec<(Arc<str>, Arc<str>)> = Vec::new();
|
||||||
let mut sl: Vec<(String, String)> = Vec::new();
|
let mut sl: Vec<(Arc<str>, Arc<str>)> = Vec::new();
|
||||||
build_headers(&path_config.client_headers, config, &mut hl);
|
build_headers(&path_config.client_headers, config, &mut hl);
|
||||||
build_headers(&path_config.server_headers, config, &mut sl);
|
build_headers(&path_config.server_headers, config, &mut sl);
|
||||||
client_header_list.insert(path.clone(), hl);
|
client_header_list.insert(Arc::from(path.as_str()), hl);
|
||||||
server_header_list.insert(path.clone(), sl);
|
server_header_list.insert(Arc::from(path.as_str()), sl);
|
||||||
|
|
||||||
let mut server_list = Vec::new();
|
let mut server_list = Vec::new();
|
||||||
for server in &path_config.servers {
|
for server in &path_config.servers {
|
||||||
@@ -237,11 +237,11 @@ fn log_builder(conf: &AppConfig) {
|
|||||||
env_logger::builder().init();
|
env_logger::builder().init();
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn build_headers(path_config: &Option<Vec<String>>, _config: &Configuration, hl: &mut Vec<(String, String)>) {
|
pub fn build_headers(path_config: &Option<Vec<String>>, _config: &Configuration, hl: &mut Vec<(Arc<str>, Arc<str>)>) {
|
||||||
if let Some(headers) = &path_config {
|
if let Some(headers) = &path_config {
|
||||||
for header in headers {
|
for header in headers {
|
||||||
if let Some((key, val)) = header.split_once(':') {
|
if let Some((key, val)) = header.split_once(':') {
|
||||||
hl.push((key.trim().to_string(), val.trim().to_string()));
|
hl.push((Arc::from(key.trim()), Arc::from(val.trim())));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// if let Some(push) = config.client_headers.get("GLOBAL_HEADERS") {
|
// if let Some(push) = config.client_headers.get("GLOBAL_HEADERS") {
|
||||||
|
|||||||
@@ -4,9 +4,10 @@ use std::collections::HashMap;
|
|||||||
use std::sync::atomic::AtomicUsize;
|
use std::sync::atomic::AtomicUsize;
|
||||||
pub type UpstreamsDashMap = DashMap<String, DashMap<String, (Vec<InnerMap>, AtomicUsize)>>;
|
pub type UpstreamsDashMap = DashMap<String, DashMap<String, (Vec<InnerMap>, AtomicUsize)>>;
|
||||||
use std::net::IpAddr;
|
use std::net::IpAddr;
|
||||||
|
use std::sync::Arc;
|
||||||
|
|
||||||
pub type UpstreamsIdMap = DashMap<String, InnerMap>;
|
pub type UpstreamsIdMap = DashMap<String, InnerMap>;
|
||||||
pub type Headers = DashMap<String, DashMap<String, Vec<(String, String)>>>;
|
pub type Headers = DashMap<String, DashMap<Arc<str>, Vec<(Arc<str>, Arc<str>)>>>;
|
||||||
|
|
||||||
#[derive(Debug, Default, Clone, Serialize, Deserialize)]
|
#[derive(Debug, Default, Clone, Serialize, Deserialize)]
|
||||||
pub struct ServiceMapping {
|
pub struct ServiceMapping {
|
||||||
|
|||||||
@@ -15,6 +15,7 @@ use std::os::unix::fs::MetadataExt;
|
|||||||
use std::str::FromStr;
|
use std::str::FromStr;
|
||||||
use std::sync::atomic::AtomicUsize;
|
use std::sync::atomic::AtomicUsize;
|
||||||
use std::sync::mpsc::{channel, Sender};
|
use std::sync::mpsc::{channel, Sender};
|
||||||
|
use std::sync::Arc;
|
||||||
use std::time::{Duration, Instant};
|
use std::time::{Duration, Instant};
|
||||||
use std::{fs, process, thread, time};
|
use std::{fs, process, thread, time};
|
||||||
|
|
||||||
@@ -132,7 +133,7 @@ pub fn compare_dashmaps(map1: &UpstreamsDashMap, map2: &UpstreamsDashMap) -> boo
|
|||||||
true
|
true
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn merge_headers(target: &DashMap<String, Vec<(String, String)>>, source: &DashMap<String, Vec<(String, String)>>) {
|
pub fn merge_headers(target: &DashMap<Arc<str>, Vec<(Arc<str>, Arc<str>)>>, source: &DashMap<Arc<str>, Vec<(Arc<str>, Arc<str>)>>) {
|
||||||
for entry in source.iter() {
|
for entry in source.iter() {
|
||||||
let global_key = entry.key().clone();
|
let global_key = entry.key().clone();
|
||||||
let global_values = entry.value().clone();
|
let global_values = entry.value().clone();
|
||||||
|
|||||||
@@ -2,11 +2,12 @@ use crate::utils::structs::InnerMap;
|
|||||||
use crate::web::proxyhttp::LB;
|
use crate::web::proxyhttp::LB;
|
||||||
use async_trait::async_trait;
|
use async_trait::async_trait;
|
||||||
use std::sync::atomic::Ordering;
|
use std::sync::atomic::Ordering;
|
||||||
|
use std::sync::Arc;
|
||||||
|
|
||||||
#[derive(Debug, Clone)]
|
#[derive(Debug, Clone)]
|
||||||
pub struct GetHostsReturHeaders {
|
pub struct GetHostsReturHeaders {
|
||||||
pub client_headers: Option<Vec<(String, String)>>,
|
pub client_headers: Option<Vec<(Arc<str>, Arc<str>)>>,
|
||||||
pub server_headers: Option<Vec<(String, String)>>,
|
pub server_headers: Option<Vec<(Arc<str>, Arc<str>)>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[async_trait]
|
#[async_trait]
|
||||||
|
|||||||
@@ -191,7 +191,7 @@ impl ProxyHttp for LB {
|
|||||||
if let Some(headers) = self.get_header(ctx.hostname.as_ref().unwrap_or(&Arc::from("localhost")), session.req_header().uri.path()) {
|
if let Some(headers) = self.get_header(ctx.hostname.as_ref().unwrap_or(&Arc::from("localhost")), session.req_header().uri.path()) {
|
||||||
if let Some(server_headers) = headers.server_headers {
|
if let Some(server_headers) = headers.server_headers {
|
||||||
for k in server_headers {
|
for k in server_headers {
|
||||||
upstream_request.insert_header(k.0, k.1)?;
|
upstream_request.insert_header(k.0.to_string(), k.1.as_ref())?;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if let Some(client_headers) = headers.client_headers {
|
if let Some(client_headers) = headers.client_headers {
|
||||||
|
|||||||
Reference in New Issue
Block a user