some more type changes, performance improvements

This commit is contained in:
Ara Sadoyan
2025-11-27 18:47:04 +01:00
parent c68a4ad83d
commit 5b87391fbb

View File

@@ -14,6 +14,7 @@ use pingora_core::listeners::ALPN;
use pingora_core::prelude::HttpPeer;
use pingora_limits::rate::Rate;
use pingora_proxy::{ProxyHttp, Session};
// use std::net::{IpAddr, Ipv4Addr};
use std::sync::Arc;
use std::time::Duration;
use tokio::time::Instant;
@@ -32,11 +33,12 @@ pub struct LB {
}
pub struct Context {
backend_id: String,
backend_id: Arc<str>,
// backend_id: Arc<(IpAddr, u16, bool)>,
to_https: bool,
redirect_to: String,
redirect_to: Arc<str>,
start_time: Instant,
hostname: Option<String>,
hostname: Option<Arc<str>>,
upstream_peer: Option<InnerMap>,
extraparams: arc_swap::Guard<Arc<Extraparams>>,
client_headers: Arc<Vec<(Arc<str>, Arc<str>)>>,
@@ -47,9 +49,10 @@ impl ProxyHttp for LB {
type CTX = Context;
fn new_ctx(&self) -> Self::CTX {
Context {
backend_id: String::new(),
backend_id: Arc::from(""),
// backend_id: Arc::new((IpAddr::V4(Ipv4Addr::UNSPECIFIED), 0, false)),
to_https: false,
redirect_to: String::new(),
redirect_to: Arc::from(""),
start_time: Instant::now(),
hostname: None,
upstream_peer: None,
@@ -92,7 +95,7 @@ impl ProxyHttp for LB {
None => return Ok(false),
Some(host) => {
// let optioninnermap = self.get_host(host.as_str(), host.as_str(), backend_id);
let optioninnermap = self.get_host(host.as_str(), session.req_header().uri.path(), backend_id);
let optioninnermap = self.get_host(host, session.req_header().uri.path(), backend_id);
match optioninnermap {
None => return Ok(false),
Some(ref innermap) => {
@@ -120,52 +123,47 @@ impl ProxyHttp for LB {
}
async fn upstream_peer(&self, session: &mut Session, ctx: &mut Self::CTX) -> Result<Box<HttpPeer>> {
match ctx.hostname.as_ref() {
Some(hostname) => {
match ctx.upstream_peer.as_ref() {
// Some((address, port, ssl, is_h2, to_https)) => {
Some(innermap) => {
let mut peer = Box::new(HttpPeer::new((innermap.address.clone(), innermap.port.clone()), innermap.is_ssl, String::new()));
// if session.is_http2() {
if innermap.is_http2 {
peer.options.alpn = ALPN::H2;
}
if innermap.is_ssl {
peer.sni = hostname.clone();
peer.options.verify_cert = false;
peer.options.verify_hostname = false;
}
if ctx.to_https || innermap.to_https {
if let Some(stream) = session.stream() {
if stream.get_ssl().is_none() {
if let Some(addr) = session.server_addr() {
if let Some((host, _)) = addr.to_string().split_once(':') {
let uri = session.req_header().uri.path_and_query().map_or("/", |pq| pq.as_str());
let port = self.config.proxy_port_tls.unwrap_or(403);
ctx.to_https = true;
ctx.redirect_to = format!("https://{}:{}{}", host, port, uri);
}
Some(hostname) => match ctx.upstream_peer.as_ref() {
Some(innermap) => {
let mut peer = Box::new(HttpPeer::new((innermap.address.clone(), innermap.port.clone()), innermap.is_ssl, String::new()));
if innermap.is_http2 {
peer.options.alpn = ALPN::H2;
}
if innermap.is_ssl {
peer.sni = hostname.to_string();
peer.options.verify_cert = false;
peer.options.verify_hostname = false;
}
if ctx.to_https || innermap.to_https {
if let Some(stream) = session.stream() {
if stream.get_ssl().is_none() {
if let Some(addr) = session.server_addr() {
if let Some((host, _)) = addr.to_string().split_once(':') {
let uri = session.req_header().uri.path_and_query().map_or("/", |pq| pq.as_str());
let port = self.config.proxy_port_tls.unwrap_or(403);
ctx.to_https = true;
ctx.redirect_to = Arc::from(format!("https://{}:{}{}", host, port, uri));
}
}
}
}
ctx.backend_id = format!("{}:{}:{}", innermap.address.clone(), innermap.port.clone(), innermap.is_ssl);
Ok(peer)
}
None => {
if let Err(e) = session.respond_error_with_body(502, Bytes::from("502 Bad Gateway\n")).await {
error!("Failed to send error response: {:?}", e);
}
Err(Box::new(Error {
etype: HTTPStatus(502),
esource: Upstream,
retry: RetryType::Decided(false),
cause: None,
context: Option::from(ImmutStr::Static("Upstream not found")),
}))
}
ctx.backend_id = Arc::from(format!("{}:{}:{}", innermap.address, innermap.port, innermap.is_ssl));
Ok(peer)
}
}
None => {
if let Err(e) = session.respond_error_with_body(502, Bytes::from("502 Bad Gateway\n")).await {
error!("Failed to send error response: {:?}", e);
}
Err(Box::new(Error {
etype: HTTPStatus(502),
esource: Upstream,
retry: RetryType::Decided(false),
cause: None,
context: Option::from(ImmutStr::Static("Upstream not found")),
}))
}
},
None => {
// session.respond_error_with_body(502, Bytes::from("502 Bad Gateway\n")).await.expect("Failed to send error");
if let Err(e) = session.respond_error_with_body(502, Bytes::from("502 Bad Gateway\n")).await {
@@ -184,13 +182,13 @@ impl ProxyHttp for LB {
async fn upstream_request_filter(&self, session: &mut Session, upstream_request: &mut RequestHeader, ctx: &mut Self::CTX) -> Result<()> {
if let Some(hostname) = ctx.hostname.as_ref() {
upstream_request.insert_header("Host", hostname)?;
upstream_request.insert_header("Host", hostname.as_ref())?;
}
if let Some(peer) = ctx.upstream_peer.as_ref() {
upstream_request.insert_header("X-Forwarded-For", peer.address.to_string())?;
}
if let Some(headers) = self.get_header(ctx.hostname.as_ref().unwrap_or(&"localhost".to_string()), session.req_header().uri.path()) {
if let Some(headers) = self.get_header(ctx.hostname.as_ref().unwrap_or(&Arc::from("localhost")), session.req_header().uri.path()) {
if let Some(server_headers) = headers.server_headers {
for k in server_headers {
upstream_request.insert_header(k.0, k.1)?;
@@ -208,13 +206,13 @@ impl ProxyHttp for LB {
async fn response_filter(&self, session: &mut Session, _upstream_response: &mut ResponseHeader, ctx: &mut Self::CTX) -> Result<()> {
if ctx.extraparams.sticky_sessions {
let backend_id = ctx.backend_id.clone();
if let Some(bid) = self.ump_byid.get(&backend_id) {
if let Some(bid) = self.ump_byid.get(&backend_id.to_string()) {
let _ = _upstream_response.insert_header("set-cookie", format!("backend_id={}; Path=/; Max-Age=600; HttpOnly; SameSite=Lax", bid.address));
}
}
if ctx.to_https {
let mut redirect_response = ResponseHeader::build(StatusCode::MOVED_PERMANENTLY, None)?;
redirect_response.insert_header("Location", ctx.redirect_to.clone())?;
redirect_response.insert_header("Location", ctx.redirect_to.as_ref())?;
redirect_response.insert_header("Content-Length", "0")?;
session.write_response_header(Box::new(redirect_response), false).await?;
}
@@ -240,17 +238,17 @@ impl ProxyHttp for LB {
}
}
fn return_header_host(session: &Session) -> Option<String> {
fn return_header_host(session: &Session) -> Option<Arc<str>> {
if session.is_http2() {
match session.req_header().uri.host() {
Some(host) => Option::from(host.to_string()),
Some(host) => Option::from(Arc::from(host)),
None => None,
}
} else {
match session.req_header().headers.get("host") {
Some(host) => {
let header_host = host.to_str().unwrap().splitn(2, ':').collect::<Vec<&str>>();
Option::from(header_host[0].to_string())
Option::from(Arc::from(header_host[0].to_string()))
}
None => None,
}