initial commit

This commit is contained in:
Ara Sadoyan
2025-02-08 17:09:06 +01:00
parent 5da70255a5
commit cb3ff26de7
2 changed files with 28 additions and 53 deletions

View File

@@ -3,10 +3,14 @@ use dashmap::DashMap;
use log::{info, warn};
use pingora::prelude::*;
use pingora_core::prelude::HttpPeer;
use pingora_core::server::ShutdownWatch;
use pingora_core::services::background::BackgroundService;
use pingora_http::{RequestHeader, ResponseHeader};
use pingora_proxy::{ProxyHttp, Session};
use std::any::type_name;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::time::Duration;
use tokio::time::interval;
#[allow(dead_code)]
pub fn typeoff<T>(_: T) {
@@ -14,18 +18,31 @@ pub fn typeoff<T>(_: T) {
println!("{:?}", to);
}
// pub struct LB(pub Arc<LoadBalancer<RoundRobin>>);
#[allow(dead_code)]
pub struct LB {
// pub load_balancer: Arc<LoadBalancer<RoundRobin>>,
// pub upstreams_map: Arc<HashMap<String, Vec<(String, u16)>>>,
// pub upstreams_map: Arc<Mutex<HashMap<String, Vec<(String, u16)>>>>,
// upstreams: DashMap<String, (Vec<(&'static str, u16)>, AtomicUsize)>,
// pub upstreams_map: DashMap<String, Vec<(String, u16)>>,
// pub upstreams_maps: DashMap<String, Arc<LoadBalancer<RoundRobin>>>,
pub upstreams_map: DashMap<String, (Vec<(String, u16)>, AtomicUsize)>,
}
pub struct BGService;
#[async_trait]
impl BackgroundService for BGService {
async fn start(&self, mut shutdown: ShutdownWatch) {
println!("Starting example background service");
let mut period = interval(Duration::from_secs(1));
loop {
tokio::select! {
_ = shutdown.changed() => {
// shutdown
break;
}
_ = period.tick() => {
println!("= = = = = = = = = = = = = = = = = = = = = = = = = =")
}
}
}
}
}
#[async_trait]
pub trait GetHost {
async fn get_host(&self, peer: &str) -> Option<(String, u16)>;
@@ -35,12 +52,6 @@ pub trait GetHost {
#[async_trait]
impl GetHost for LB {
async fn get_host(&self, peer: &str) -> Option<(String, u16)> {
// println!("{:?}", self.upstreams_map);
// let entry = self.upstreams_map.get(peer)?;
// let first = entry.value().first()?;
// println!("{:?}", entry.value());
// Some((first.0.clone(), first.1))
let entry = self.upstreams_map.get(peer)?;
let (servers, index) = entry.value();
@@ -54,9 +65,6 @@ impl GetHost for LB {
}
fn set_host(&mut self, peer: &str, host: &str, port: u16) {
// let new_value = vec![(host.to_string(), port)];
// self.upstreams_map.insert(peer.to_string(), (new_value, AtomicUsize::new(0)));
let exists = self.upstreams_map.get(peer);
let mut toreturn = vec![];
match exists {
@@ -75,14 +83,6 @@ impl GetHost for LB {
println!(" ==> Updating peer list: name => {} | value => {:?}", peer.to_string(), toreturn);
self.upstreams_map.insert(peer.to_string(), (toreturn, AtomicUsize::new(0)));
// self.upstreams_map.insert(peer.to_string(), toreturn);
// use std::time::Instant;
// let now = Instant::now();
// self.get_host(peer);
// let elapsed = now.elapsed();
// println!("Elapsed: {:.2?}", elapsed);
}
fn discover_hosts(&mut self) {
@@ -99,17 +99,6 @@ impl GetHost for LB {
impl ProxyHttp for LB {
type CTX = ();
fn new_ctx(&self) -> Self::CTX {}
// async fn upstream_peer(&self, _session: &mut Session, _ctx: &mut ()) -> Result<Box<HttpPeer>> {
// let upstream = self.load_balancer.select(b"", 256).ok_or_else(|| Error::explain(HTTPStatus(503), "no upstreams"))?;
// let peer = HttpPeer::new(upstream.addr, false, "".to_string());
//
// let host_name = _session.req_header().headers.get("host");
// let fo = self.get_host(host_name.unwrap().to_str().unwrap());
// println!("{:?}", fo);
//
// Ok(Box::new(peer))
// }
async fn upstream_peer(&self, session: &mut Session, _ctx: &mut Self::CTX) -> Result<Box<HttpPeer>> {
let host_name = session.req_header().headers.get("host");
let ddr = self.get_host(host_name.unwrap().to_str().unwrap());
@@ -125,17 +114,6 @@ impl ProxyHttp for LB {
}
}
}
/*
async fn upstream_peer(&self, session: &mut Session, _ctx: &mut Self::CTX) -> Result<Box<HttpPeer>> {
let host_name = session.req_header().headers.get("host").unwrap();
let addr = self.get_host(host_name.to_str().unwrap()).unwrap();
info!("connecting to {addr:?}");
let peer = Box::new(HttpPeer::new(addr, false, "".to_string()));
Ok(peer)
}
*/
async fn request_filter(&self, _session: &mut Session, _ctx: &mut Self::CTX) -> pingora_core::Result<bool>
where
Self::CTX: Send + Sync,