mirror of
https://github.com/sadoyan/aralez.git
synced 2026-04-29 22:38:36 +08:00
Performance improvements on data types .
This commit is contained in:
@@ -71,23 +71,13 @@ async fn build_upstreams(fullist: &UpstreamsDashMap, method: &str, client: &Clie
|
|||||||
if resp.1 {
|
if resp.1 {
|
||||||
scheme.is_http2 = is_h2; // could be adjusted further
|
scheme.is_http2 = is_h2; // could be adjusted further
|
||||||
}
|
}
|
||||||
innervec.push(scheme);
|
innervec.push(Arc::from(scheme));
|
||||||
} else {
|
} else {
|
||||||
warn!("Dead Upstream : {}", link);
|
warn!("Dead Upstream : {}", link);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
innervec.push(scheme);
|
innervec.push(Arc::from(scheme));
|
||||||
}
|
}
|
||||||
|
|
||||||
// let resp = http_request(&link, method, "", &client).await;
|
|
||||||
// if resp.0 {
|
|
||||||
// if resp.1 {
|
|
||||||
// scheme.is_http2 = is_h2; // could be adjusted further
|
|
||||||
// }
|
|
||||||
// innervec.push(scheme);
|
|
||||||
// } else {
|
|
||||||
// warn!("Dead Upstream : {}", link);
|
|
||||||
// }
|
|
||||||
}
|
}
|
||||||
inner.insert(path.clone(), (innervec, AtomicUsize::new(0)));
|
inner.insert(path.clone(), (innervec, AtomicUsize::new(0)));
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -4,9 +4,10 @@ use axum::http::{HeaderMap, HeaderValue};
|
|||||||
use dashmap::DashMap;
|
use dashmap::DashMap;
|
||||||
use reqwest::Client;
|
use reqwest::Client;
|
||||||
use std::sync::atomic::AtomicUsize;
|
use std::sync::atomic::AtomicUsize;
|
||||||
|
use std::sync::Arc;
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
|
|
||||||
pub async fn for_consul(url: String, token: Option<String>, conf: &ServiceMapping) -> Option<DashMap<String, (Vec<InnerMap>, AtomicUsize)>> {
|
pub async fn for_consul(url: String, token: Option<String>, conf: &ServiceMapping) -> Option<DashMap<String, (Vec<Arc<InnerMap>>, AtomicUsize)>> {
|
||||||
let client = Client::builder().timeout(Duration::from_secs(2)).danger_accept_invalid_certs(true).build().ok()?;
|
let client = Client::builder().timeout(Duration::from_secs(2)).danger_accept_invalid_certs(true).build().ok()?;
|
||||||
let mut headers = HeaderMap::new();
|
let mut headers = HeaderMap::new();
|
||||||
if let Some(token) = token {
|
if let Some(token) = token {
|
||||||
@@ -19,14 +20,14 @@ pub async fn for_consul(url: String, token: Option<String>, conf: &ServiceMappin
|
|||||||
return None;
|
return None;
|
||||||
}
|
}
|
||||||
let mut inner_vec = Vec::new();
|
let mut inner_vec = Vec::new();
|
||||||
let upstreams: DashMap<String, (Vec<InnerMap>, AtomicUsize)> = DashMap::new();
|
let upstreams: DashMap<String, (Vec<Arc<InnerMap>>, AtomicUsize)> = DashMap::new();
|
||||||
let endpoints: Vec<ConsulService> = resp.json().await.ok()?;
|
let endpoints: Vec<ConsulService> = resp.json().await.ok()?;
|
||||||
for subsets in endpoints {
|
for subsets in endpoints {
|
||||||
// let addr = subsets.tagged_addresses.get("lan_ipv4").unwrap().address.clone();
|
// let addr = subsets.tagged_addresses.get("lan_ipv4").unwrap().address.clone();
|
||||||
// let prt = subsets.tagged_addresses.get("lan_ipv4").unwrap().port.clone();
|
// let prt = subsets.tagged_addresses.get("lan_ipv4").unwrap().port.clone();
|
||||||
let addr = subsets.tagged_addresses.get("lan_ipv4").unwrap().address.clone().parse().unwrap();
|
let addr = subsets.tagged_addresses.get("lan_ipv4").unwrap().address.clone().parse().unwrap();
|
||||||
let prt = subsets.tagged_addresses.get("lan_ipv4").unwrap().port.clone();
|
let prt = subsets.tagged_addresses.get("lan_ipv4").unwrap().port.clone();
|
||||||
let to_add = InnerMap {
|
let to_add = Arc::from(InnerMap {
|
||||||
address: addr,
|
address: addr,
|
||||||
port: prt,
|
port: prt,
|
||||||
is_ssl: false,
|
is_ssl: false,
|
||||||
@@ -34,14 +35,14 @@ pub async fn for_consul(url: String, token: Option<String>, conf: &ServiceMappin
|
|||||||
to_https: conf.to_https.unwrap_or(false),
|
to_https: conf.to_https.unwrap_or(false),
|
||||||
rate_limit: conf.rate_limit,
|
rate_limit: conf.rate_limit,
|
||||||
healthcheck: None,
|
healthcheck: None,
|
||||||
};
|
});
|
||||||
inner_vec.push(to_add);
|
inner_vec.push(to_add);
|
||||||
}
|
}
|
||||||
match_path(&conf, &upstreams, inner_vec.clone());
|
match_path(&conf, &upstreams, inner_vec.clone());
|
||||||
Some(upstreams)
|
Some(upstreams)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn for_kuber(url: &str, token: &str, conf: &ServiceMapping) -> Option<DashMap<String, (Vec<InnerMap>, AtomicUsize)>> {
|
pub async fn for_kuber(url: &str, token: &str, conf: &ServiceMapping) -> Option<DashMap<String, (Vec<Arc<InnerMap>>, AtomicUsize)>> {
|
||||||
let to = Duration::from_secs(10);
|
let to = Duration::from_secs(10);
|
||||||
let client = Client::builder().timeout(Duration::from_secs(10)).danger_accept_invalid_certs(true).build().ok()?;
|
let client = Client::builder().timeout(Duration::from_secs(10)).danger_accept_invalid_certs(true).build().ok()?;
|
||||||
let resp = client.get(url).timeout(to).bearer_auth(token).send().await.ok()?;
|
let resp = client.get(url).timeout(to).bearer_auth(token).send().await.ok()?;
|
||||||
@@ -50,14 +51,14 @@ pub async fn for_kuber(url: &str, token: &str, conf: &ServiceMapping) -> Option<
|
|||||||
return None;
|
return None;
|
||||||
}
|
}
|
||||||
let endpoints: KubeEndpoints = resp.json().await.ok()?;
|
let endpoints: KubeEndpoints = resp.json().await.ok()?;
|
||||||
let upstreams: DashMap<String, (Vec<InnerMap>, AtomicUsize)> = DashMap::new();
|
let upstreams: DashMap<String, (Vec<Arc<InnerMap>>, AtomicUsize)> = DashMap::new();
|
||||||
if let Some(subsets) = endpoints.subsets {
|
if let Some(subsets) = endpoints.subsets {
|
||||||
for subset in subsets {
|
for subset in subsets {
|
||||||
if let (Some(addresses), Some(ports)) = (subset.addresses, subset.ports) {
|
if let (Some(addresses), Some(ports)) = (subset.addresses, subset.ports) {
|
||||||
let mut inner_vec = Vec::new();
|
let mut inner_vec = Vec::new();
|
||||||
for addr in addresses {
|
for addr in addresses {
|
||||||
for port in &ports {
|
for port in &ports {
|
||||||
let to_add = InnerMap {
|
let to_add = Arc::from(InnerMap {
|
||||||
address: addr.ip.parse().unwrap(),
|
address: addr.ip.parse().unwrap(),
|
||||||
port: port.port.clone(),
|
port: port.port.clone(),
|
||||||
is_ssl: false,
|
is_ssl: false,
|
||||||
@@ -65,7 +66,7 @@ pub async fn for_kuber(url: &str, token: &str, conf: &ServiceMapping) -> Option<
|
|||||||
to_https: conf.to_https.unwrap_or(false),
|
to_https: conf.to_https.unwrap_or(false),
|
||||||
rate_limit: conf.rate_limit,
|
rate_limit: conf.rate_limit,
|
||||||
healthcheck: None,
|
healthcheck: None,
|
||||||
};
|
});
|
||||||
inner_vec.push(to_add);
|
inner_vec.push(to_add);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -52,7 +52,7 @@ pub struct ConsulTaggedAddress {
|
|||||||
#[serde(rename = "Port")]
|
#[serde(rename = "Port")]
|
||||||
pub port: u16,
|
pub port: u16,
|
||||||
}
|
}
|
||||||
pub fn list_to_upstreams(lt: Option<DashMap<String, (Vec<InnerMap>, AtomicUsize)>>, upstreams: &UpstreamsDashMap, i: &ServiceMapping) {
|
pub fn list_to_upstreams(lt: Option<DashMap<String, (Vec<Arc<InnerMap>>, AtomicUsize)>>, upstreams: &UpstreamsDashMap, i: &ServiceMapping) {
|
||||||
if let Some(list) = lt {
|
if let Some(list) = lt {
|
||||||
match upstreams.get(&i.hostname.clone()) {
|
match upstreams.get(&i.hostname.clone()) {
|
||||||
Some(upstr) => {
|
Some(upstr) => {
|
||||||
@@ -67,7 +67,7 @@ pub fn list_to_upstreams(lt: Option<DashMap<String, (Vec<InnerMap>, AtomicUsize)
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn match_path(conf: &ServiceMapping, upstreams: &DashMap<String, (Vec<InnerMap>, AtomicUsize)>, values: Vec<InnerMap>) {
|
pub fn match_path(conf: &ServiceMapping, upstreams: &DashMap<String, (Vec<Arc<InnerMap>>, AtomicUsize)>, values: Vec<Arc<InnerMap>>) {
|
||||||
match conf.path {
|
match conf.path {
|
||||||
Some(ref p) => {
|
Some(ref p) => {
|
||||||
upstreams.insert(p.to_string(), (values, AtomicUsize::new(0)));
|
upstreams.insert(p.to_string(), (values, AtomicUsize::new(0)));
|
||||||
|
|||||||
@@ -134,7 +134,7 @@ async fn populate_file_upstreams(config: &mut Configuration, parsed: &Config) {
|
|||||||
for server in &path_config.servers {
|
for server in &path_config.servers {
|
||||||
if let Some((ip, port_str)) = server.split_once(':') {
|
if let Some((ip, port_str)) = server.split_once(':') {
|
||||||
if let Ok(port) = port_str.parse::<u16>() {
|
if let Ok(port) = port_str.parse::<u16>() {
|
||||||
server_list.push(InnerMap {
|
server_list.push(Arc::from(InnerMap {
|
||||||
address: ip.trim().parse().unwrap(),
|
address: ip.trim().parse().unwrap(),
|
||||||
port,
|
port,
|
||||||
is_ssl: true,
|
is_ssl: true,
|
||||||
@@ -142,7 +142,7 @@ async fn populate_file_upstreams(config: &mut Configuration, parsed: &Config) {
|
|||||||
to_https: path_config.to_https.unwrap_or(false),
|
to_https: path_config.to_https.unwrap_or(false),
|
||||||
rate_limit: path_config.rate_limit,
|
rate_limit: path_config.rate_limit,
|
||||||
healthcheck: path_config.healthcheck,
|
healthcheck: path_config.healthcheck,
|
||||||
});
|
}));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -2,11 +2,11 @@ use dashmap::DashMap;
|
|||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
use std::sync::atomic::AtomicUsize;
|
use std::sync::atomic::AtomicUsize;
|
||||||
pub type UpstreamsDashMap = DashMap<String, DashMap<String, (Vec<InnerMap>, AtomicUsize)>>;
|
pub type UpstreamsDashMap = DashMap<String, DashMap<String, (Vec<Arc<InnerMap>>, AtomicUsize)>>;
|
||||||
use std::net::IpAddr;
|
use std::net::IpAddr;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
|
||||||
pub type UpstreamsIdMap = DashMap<String, InnerMap>;
|
pub type UpstreamsIdMap = DashMap<String, Arc<InnerMap>>;
|
||||||
pub type Headers = DashMap<String, DashMap<Arc<str>, Vec<(Arc<str>, Arc<str>)>>>;
|
pub type Headers = DashMap<String, DashMap<Arc<str>, Vec<(Arc<str>, Arc<str>)>>>;
|
||||||
|
|
||||||
#[derive(Debug, Default, Clone, Serialize, Deserialize)]
|
#[derive(Debug, Default, Clone, Serialize, Deserialize)]
|
||||||
|
|||||||
@@ -168,8 +168,8 @@ pub fn clone_idmap_into(original: &UpstreamsDashMap, cloned: &UpstreamsIdMap) {
|
|||||||
rate_limit: None,
|
rate_limit: None,
|
||||||
healthcheck: None,
|
healthcheck: None,
|
||||||
};
|
};
|
||||||
cloned.insert(id, to_add);
|
cloned.insert(id, Arc::from(to_add));
|
||||||
cloned.insert(hh, x.to_owned());
|
cloned.insert(hh, Arc::from(x.to_owned()));
|
||||||
}
|
}
|
||||||
new_inner_map.insert(path.clone(), new_vec);
|
new_inner_map.insert(path.clone(), new_vec);
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -14,13 +14,13 @@ pub struct GetHostsReturHeaders {
|
|||||||
pub trait GetHost {
|
pub trait GetHost {
|
||||||
// fn get_host<'a>(&self, peer: &str, path: &str, backend_id: Option<&str>) -> Option<&'a InnerMap>;
|
// fn get_host<'a>(&self, peer: &str, path: &str, backend_id: Option<&str>) -> Option<&'a InnerMap>;
|
||||||
|
|
||||||
fn get_host(&self, peer: &str, path: &str, backend_id: Option<&str>) -> Option<InnerMap>;
|
fn get_host(&self, peer: &str, path: &str, backend_id: Option<&str>) -> Option<Arc<InnerMap>>;
|
||||||
|
|
||||||
fn get_header(&self, peer: &str, path: &str) -> Option<GetHostsReturHeaders>;
|
fn get_header(&self, peer: &str, path: &str) -> Option<GetHostsReturHeaders>;
|
||||||
}
|
}
|
||||||
#[async_trait]
|
#[async_trait]
|
||||||
impl GetHost for LB {
|
impl GetHost for LB {
|
||||||
fn get_host(&self, peer: &str, path: &str, backend_id: Option<&str>) -> Option<InnerMap> {
|
fn get_host(&self, peer: &str, path: &str, backend_id: Option<&str>) -> Option<Arc<InnerMap>> {
|
||||||
if let Some(b) = backend_id {
|
if let Some(b) = backend_id {
|
||||||
if let Some(bb) = self.ump_byid.get(b) {
|
if let Some(bb) = self.ump_byid.get(b) {
|
||||||
return Some(bb.value().clone());
|
return Some(bb.value().clone());
|
||||||
|
|||||||
@@ -39,7 +39,7 @@ pub struct Context {
|
|||||||
redirect_to: Arc<str>,
|
redirect_to: Arc<str>,
|
||||||
start_time: Instant,
|
start_time: Instant,
|
||||||
hostname: Option<Arc<str>>,
|
hostname: Option<Arc<str>>,
|
||||||
upstream_peer: Option<InnerMap>,
|
upstream_peer: Option<Arc<InnerMap>>,
|
||||||
extraparams: arc_swap::Guard<Arc<Extraparams>>,
|
extraparams: arc_swap::Guard<Arc<Extraparams>>,
|
||||||
client_headers: Arc<Vec<(Arc<str>, Arc<str>)>>,
|
client_headers: Arc<Vec<(Arc<str>, Arc<str>)>>,
|
||||||
}
|
}
|
||||||
@@ -61,7 +61,7 @@ impl ProxyHttp for LB {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
async fn request_filter(&self, session: &mut Session, _ctx: &mut Self::CTX) -> Result<bool> {
|
async fn request_filter(&self, session: &mut Session, _ctx: &mut Self::CTX) -> Result<bool> {
|
||||||
let ep = _ctx.extraparams.clone();
|
let ep = _ctx.extraparams.as_ref();
|
||||||
|
|
||||||
if let Some(auth) = ep.authentication.get("authorization") {
|
if let Some(auth) = ep.authentication.get("authorization") {
|
||||||
let authenticated = authenticate(&auth.value(), &session);
|
let authenticated = authenticate(&auth.value(), &session);
|
||||||
@@ -125,7 +125,7 @@ impl ProxyHttp for LB {
|
|||||||
match ctx.hostname.as_ref() {
|
match ctx.hostname.as_ref() {
|
||||||
Some(hostname) => match ctx.upstream_peer.as_ref() {
|
Some(hostname) => match ctx.upstream_peer.as_ref() {
|
||||||
Some(innermap) => {
|
Some(innermap) => {
|
||||||
let mut peer = Box::new(HttpPeer::new((innermap.address.clone(), innermap.port.clone()), innermap.is_ssl, String::new()));
|
let mut peer = Box::new(HttpPeer::new((innermap.address, innermap.port), innermap.is_ssl, String::new()));
|
||||||
if innermap.is_http2 {
|
if innermap.is_http2 {
|
||||||
peer.options.alpn = ALPN::H2;
|
peer.options.alpn = ALPN::H2;
|
||||||
}
|
}
|
||||||
@@ -211,8 +211,7 @@ impl ProxyHttp for LB {
|
|||||||
}
|
}
|
||||||
async fn response_filter(&self, session: &mut Session, _upstream_response: &mut ResponseHeader, ctx: &mut Self::CTX) -> Result<()> {
|
async fn response_filter(&self, session: &mut Session, _upstream_response: &mut ResponseHeader, ctx: &mut Self::CTX) -> Result<()> {
|
||||||
if ctx.extraparams.sticky_sessions {
|
if ctx.extraparams.sticky_sessions {
|
||||||
let backend_id = ctx.backend_id.clone();
|
if let Some(bid) = self.ump_byid.get(ctx.backend_id.as_ref()) {
|
||||||
if let Some(bid) = self.ump_byid.get(backend_id.as_ref()) {
|
|
||||||
let _ = _upstream_response.insert_header("set-cookie", format!("backend_id={}; Path=/; Max-Age=600; HttpOnly; SameSite=Lax", bid.address));
|
let _ = _upstream_response.insert_header("set-cookie", format!("backend_id={}; Path=/; Max-Age=600; HttpOnly; SameSite=Lax", bid.address));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user