diff --git a/src/utils/healthcheck.rs b/src/utils/healthcheck.rs index 91f6e19..1ba969d 100644 --- a/src/utils/healthcheck.rs +++ b/src/utils/healthcheck.rs @@ -71,23 +71,13 @@ async fn build_upstreams(fullist: &UpstreamsDashMap, method: &str, client: &Clie if resp.1 { scheme.is_http2 = is_h2; // could be adjusted further } - innervec.push(scheme); + innervec.push(Arc::from(scheme)); } else { warn!("Dead Upstream : {}", link); } } else { - innervec.push(scheme); + innervec.push(Arc::from(scheme)); } - - // let resp = http_request(&link, method, "", &client).await; - // if resp.0 { - // if resp.1 { - // scheme.is_http2 = is_h2; // could be adjusted further - // } - // innervec.push(scheme); - // } else { - // warn!("Dead Upstream : {}", link); - // } } inner.insert(path.clone(), (innervec, AtomicUsize::new(0))); } diff --git a/src/utils/httpclient.rs b/src/utils/httpclient.rs index 2a107e4..2f890fa 100644 --- a/src/utils/httpclient.rs +++ b/src/utils/httpclient.rs @@ -4,9 +4,10 @@ use axum::http::{HeaderMap, HeaderValue}; use dashmap::DashMap; use reqwest::Client; use std::sync::atomic::AtomicUsize; +use std::sync::Arc; use std::time::Duration; -pub async fn for_consul(url: String, token: Option, conf: &ServiceMapping) -> Option, AtomicUsize)>> { +pub async fn for_consul(url: String, token: Option, conf: &ServiceMapping) -> Option>, AtomicUsize)>> { let client = Client::builder().timeout(Duration::from_secs(2)).danger_accept_invalid_certs(true).build().ok()?; let mut headers = HeaderMap::new(); if let Some(token) = token { @@ -19,14 +20,14 @@ pub async fn for_consul(url: String, token: Option, conf: &ServiceMappin return None; } let mut inner_vec = Vec::new(); - let upstreams: DashMap, AtomicUsize)> = DashMap::new(); + let upstreams: DashMap>, AtomicUsize)> = DashMap::new(); let endpoints: Vec = resp.json().await.ok()?; for subsets in endpoints { // let addr = subsets.tagged_addresses.get("lan_ipv4").unwrap().address.clone(); // let prt = subsets.tagged_addresses.get("lan_ipv4").unwrap().port.clone(); let addr = subsets.tagged_addresses.get("lan_ipv4").unwrap().address.clone().parse().unwrap(); let prt = subsets.tagged_addresses.get("lan_ipv4").unwrap().port.clone(); - let to_add = InnerMap { + let to_add = Arc::from(InnerMap { address: addr, port: prt, is_ssl: false, @@ -34,14 +35,14 @@ pub async fn for_consul(url: String, token: Option, conf: &ServiceMappin to_https: conf.to_https.unwrap_or(false), rate_limit: conf.rate_limit, healthcheck: None, - }; + }); inner_vec.push(to_add); } match_path(&conf, &upstreams, inner_vec.clone()); Some(upstreams) } -pub async fn for_kuber(url: &str, token: &str, conf: &ServiceMapping) -> Option, AtomicUsize)>> { +pub async fn for_kuber(url: &str, token: &str, conf: &ServiceMapping) -> Option>, AtomicUsize)>> { let to = Duration::from_secs(10); let client = Client::builder().timeout(Duration::from_secs(10)).danger_accept_invalid_certs(true).build().ok()?; let resp = client.get(url).timeout(to).bearer_auth(token).send().await.ok()?; @@ -50,14 +51,14 @@ pub async fn for_kuber(url: &str, token: &str, conf: &ServiceMapping) -> Option< return None; } let endpoints: KubeEndpoints = resp.json().await.ok()?; - let upstreams: DashMap, AtomicUsize)> = DashMap::new(); + let upstreams: DashMap>, AtomicUsize)> = DashMap::new(); if let Some(subsets) = endpoints.subsets { for subset in subsets { if let (Some(addresses), Some(ports)) = (subset.addresses, subset.ports) { let mut inner_vec = Vec::new(); for addr in addresses { for port in &ports { - let to_add = InnerMap { + let to_add = Arc::from(InnerMap { address: addr.ip.parse().unwrap(), port: port.port.clone(), is_ssl: false, @@ -65,7 +66,7 @@ pub async fn for_kuber(url: &str, token: &str, conf: &ServiceMapping) -> Option< to_https: conf.to_https.unwrap_or(false), rate_limit: conf.rate_limit, healthcheck: None, - }; + }); inner_vec.push(to_add); } } diff --git a/src/utils/kuberconsul.rs b/src/utils/kuberconsul.rs index 653bb8f..0cf1713 100644 --- a/src/utils/kuberconsul.rs +++ b/src/utils/kuberconsul.rs @@ -52,7 +52,7 @@ pub struct ConsulTaggedAddress { #[serde(rename = "Port")] pub port: u16, } -pub fn list_to_upstreams(lt: Option, AtomicUsize)>>, upstreams: &UpstreamsDashMap, i: &ServiceMapping) { +pub fn list_to_upstreams(lt: Option>, AtomicUsize)>>, upstreams: &UpstreamsDashMap, i: &ServiceMapping) { if let Some(list) = lt { match upstreams.get(&i.hostname.clone()) { Some(upstr) => { @@ -67,7 +67,7 @@ pub fn list_to_upstreams(lt: Option, AtomicUsize) } } -pub fn match_path(conf: &ServiceMapping, upstreams: &DashMap, AtomicUsize)>, values: Vec) { +pub fn match_path(conf: &ServiceMapping, upstreams: &DashMap>, AtomicUsize)>, values: Vec>) { match conf.path { Some(ref p) => { upstreams.insert(p.to_string(), (values, AtomicUsize::new(0))); diff --git a/src/utils/parceyaml.rs b/src/utils/parceyaml.rs index 1135247..2a47a41 100644 --- a/src/utils/parceyaml.rs +++ b/src/utils/parceyaml.rs @@ -134,7 +134,7 @@ async fn populate_file_upstreams(config: &mut Configuration, parsed: &Config) { for server in &path_config.servers { if let Some((ip, port_str)) = server.split_once(':') { if let Ok(port) = port_str.parse::() { - server_list.push(InnerMap { + server_list.push(Arc::from(InnerMap { address: ip.trim().parse().unwrap(), port, is_ssl: true, @@ -142,7 +142,7 @@ async fn populate_file_upstreams(config: &mut Configuration, parsed: &Config) { to_https: path_config.to_https.unwrap_or(false), rate_limit: path_config.rate_limit, healthcheck: path_config.healthcheck, - }); + })); } } } diff --git a/src/utils/structs.rs b/src/utils/structs.rs index 6b326c3..349e908 100644 --- a/src/utils/structs.rs +++ b/src/utils/structs.rs @@ -2,11 +2,11 @@ use dashmap::DashMap; use serde::{Deserialize, Serialize}; use std::collections::HashMap; use std::sync::atomic::AtomicUsize; -pub type UpstreamsDashMap = DashMap, AtomicUsize)>>; +pub type UpstreamsDashMap = DashMap>, AtomicUsize)>>; use std::net::IpAddr; use std::sync::Arc; -pub type UpstreamsIdMap = DashMap; +pub type UpstreamsIdMap = DashMap>; pub type Headers = DashMap, Vec<(Arc, Arc)>>>; #[derive(Debug, Default, Clone, Serialize, Deserialize)] diff --git a/src/utils/tools.rs b/src/utils/tools.rs index 39905cb..8af730a 100644 --- a/src/utils/tools.rs +++ b/src/utils/tools.rs @@ -168,8 +168,8 @@ pub fn clone_idmap_into(original: &UpstreamsDashMap, cloned: &UpstreamsIdMap) { rate_limit: None, healthcheck: None, }; - cloned.insert(id, to_add); - cloned.insert(hh, x.to_owned()); + cloned.insert(id, Arc::from(to_add)); + cloned.insert(hh, Arc::from(x.to_owned())); } new_inner_map.insert(path.clone(), new_vec); } diff --git a/src/web/gethosts.rs b/src/web/gethosts.rs index fb332d7..fa6ac73 100644 --- a/src/web/gethosts.rs +++ b/src/web/gethosts.rs @@ -14,13 +14,13 @@ pub struct GetHostsReturHeaders { pub trait GetHost { // fn get_host<'a>(&self, peer: &str, path: &str, backend_id: Option<&str>) -> Option<&'a InnerMap>; - fn get_host(&self, peer: &str, path: &str, backend_id: Option<&str>) -> Option; + fn get_host(&self, peer: &str, path: &str, backend_id: Option<&str>) -> Option>; fn get_header(&self, peer: &str, path: &str) -> Option; } #[async_trait] impl GetHost for LB { - fn get_host(&self, peer: &str, path: &str, backend_id: Option<&str>) -> Option { + fn get_host(&self, peer: &str, path: &str, backend_id: Option<&str>) -> Option> { if let Some(b) = backend_id { if let Some(bb) = self.ump_byid.get(b) { return Some(bb.value().clone()); diff --git a/src/web/proxyhttp.rs b/src/web/proxyhttp.rs index d4d4c65..16b3919 100644 --- a/src/web/proxyhttp.rs +++ b/src/web/proxyhttp.rs @@ -39,7 +39,7 @@ pub struct Context { redirect_to: Arc, start_time: Instant, hostname: Option>, - upstream_peer: Option, + upstream_peer: Option>, extraparams: arc_swap::Guard>, client_headers: Arc, Arc)>>, } @@ -61,7 +61,7 @@ impl ProxyHttp for LB { } } async fn request_filter(&self, session: &mut Session, _ctx: &mut Self::CTX) -> Result { - let ep = _ctx.extraparams.clone(); + let ep = _ctx.extraparams.as_ref(); if let Some(auth) = ep.authentication.get("authorization") { let authenticated = authenticate(&auth.value(), &session); @@ -125,7 +125,7 @@ impl ProxyHttp for LB { match ctx.hostname.as_ref() { Some(hostname) => match ctx.upstream_peer.as_ref() { Some(innermap) => { - let mut peer = Box::new(HttpPeer::new((innermap.address.clone(), innermap.port.clone()), innermap.is_ssl, String::new())); + let mut peer = Box::new(HttpPeer::new((innermap.address, innermap.port), innermap.is_ssl, String::new())); if innermap.is_http2 { peer.options.alpn = ALPN::H2; } @@ -211,8 +211,7 @@ impl ProxyHttp for LB { } async fn response_filter(&self, session: &mut Session, _upstream_response: &mut ResponseHeader, ctx: &mut Self::CTX) -> Result<()> { if ctx.extraparams.sticky_sessions { - let backend_id = ctx.backend_id.clone(); - if let Some(bid) = self.ump_byid.get(backend_id.as_ref()) { + if let Some(bid) = self.ump_byid.get(ctx.backend_id.as_ref()) { let _ = _upstream_response.insert_header("set-cookie", format!("backend_id={}; Path=/; Max-Age=600; HttpOnly; SameSite=Lax", bid.address)); } }