From cb3ff26de76af0569c26ae24bdd37221cb1b6691 Mon Sep 17 00:00:00 2001 From: Ara Sadoyan Date: Sat, 8 Feb 2025 17:09:06 +0100 Subject: [PATCH] initial commit --- src/web/proxyhttp.rs | 70 +++++++++++++++----------------------------- src/web/start.rs | 11 +++---- 2 files changed, 28 insertions(+), 53 deletions(-) diff --git a/src/web/proxyhttp.rs b/src/web/proxyhttp.rs index ad0e82b..5c5640d 100644 --- a/src/web/proxyhttp.rs +++ b/src/web/proxyhttp.rs @@ -3,10 +3,14 @@ use dashmap::DashMap; use log::{info, warn}; use pingora::prelude::*; use pingora_core::prelude::HttpPeer; +use pingora_core::server::ShutdownWatch; +use pingora_core::services::background::BackgroundService; use pingora_http::{RequestHeader, ResponseHeader}; use pingora_proxy::{ProxyHttp, Session}; use std::any::type_name; use std::sync::atomic::{AtomicUsize, Ordering}; +use std::time::Duration; +use tokio::time::interval; #[allow(dead_code)] pub fn typeoff(_: T) { @@ -14,18 +18,31 @@ pub fn typeoff(_: T) { println!("{:?}", to); } -// pub struct LB(pub Arc>); #[allow(dead_code)] pub struct LB { - // pub load_balancer: Arc>, - // pub upstreams_map: Arc>>, - // pub upstreams_map: Arc>>>, - // upstreams: DashMap, AtomicUsize)>, - // pub upstreams_map: DashMap>, - // pub upstreams_maps: DashMap>>, pub upstreams_map: DashMap, AtomicUsize)>, } +pub struct BGService; +#[async_trait] +impl BackgroundService for BGService { + async fn start(&self, mut shutdown: ShutdownWatch) { + println!("Starting example background service"); + let mut period = interval(Duration::from_secs(1)); + loop { + tokio::select! { + _ = shutdown.changed() => { + // shutdown + break; + } + _ = period.tick() => { + println!("= = = = = = = = = = = = = = = = = = = = = = = = = =") + } + } + } + } +} + #[async_trait] pub trait GetHost { async fn get_host(&self, peer: &str) -> Option<(String, u16)>; @@ -35,12 +52,6 @@ pub trait GetHost { #[async_trait] impl GetHost for LB { async fn get_host(&self, peer: &str) -> Option<(String, u16)> { - // println!("{:?}", self.upstreams_map); - // let entry = self.upstreams_map.get(peer)?; - // let first = entry.value().first()?; - // println!("{:?}", entry.value()); - // Some((first.0.clone(), first.1)) - let entry = self.upstreams_map.get(peer)?; let (servers, index) = entry.value(); @@ -54,9 +65,6 @@ impl GetHost for LB { } fn set_host(&mut self, peer: &str, host: &str, port: u16) { - // let new_value = vec![(host.to_string(), port)]; - // self.upstreams_map.insert(peer.to_string(), (new_value, AtomicUsize::new(0))); - let exists = self.upstreams_map.get(peer); let mut toreturn = vec![]; match exists { @@ -75,14 +83,6 @@ impl GetHost for LB { println!(" ==> Updating peer list: name => {} | value => {:?}", peer.to_string(), toreturn); self.upstreams_map.insert(peer.to_string(), (toreturn, AtomicUsize::new(0))); - - // self.upstreams_map.insert(peer.to_string(), toreturn); - - // use std::time::Instant; - // let now = Instant::now(); - // self.get_host(peer); - // let elapsed = now.elapsed(); - // println!("Elapsed: {:.2?}", elapsed); } fn discover_hosts(&mut self) { @@ -99,17 +99,6 @@ impl GetHost for LB { impl ProxyHttp for LB { type CTX = (); fn new_ctx(&self) -> Self::CTX {} - // async fn upstream_peer(&self, _session: &mut Session, _ctx: &mut ()) -> Result> { - // let upstream = self.load_balancer.select(b"", 256).ok_or_else(|| Error::explain(HTTPStatus(503), "no upstreams"))?; - // let peer = HttpPeer::new(upstream.addr, false, "".to_string()); - // - // let host_name = _session.req_header().headers.get("host"); - // let fo = self.get_host(host_name.unwrap().to_str().unwrap()); - // println!("{:?}", fo); - // - // Ok(Box::new(peer)) - // } - async fn upstream_peer(&self, session: &mut Session, _ctx: &mut Self::CTX) -> Result> { let host_name = session.req_header().headers.get("host"); let ddr = self.get_host(host_name.unwrap().to_str().unwrap()); @@ -125,17 +114,6 @@ impl ProxyHttp for LB { } } } - - /* - - async fn upstream_peer(&self, session: &mut Session, _ctx: &mut Self::CTX) -> Result> { - let host_name = session.req_header().headers.get("host").unwrap(); - let addr = self.get_host(host_name.to_str().unwrap()).unwrap(); - info!("connecting to {addr:?}"); - let peer = Box::new(HttpPeer::new(addr, false, "".to_string())); - Ok(peer) - } - */ async fn request_filter(&self, _session: &mut Session, _ctx: &mut Self::CTX) -> pingora_core::Result where Self::CTX: Send + Sync, diff --git a/src/web/start.rs b/src/web/start.rs index 44db089..71d9acb 100644 --- a/src/web/start.rs +++ b/src/web/start.rs @@ -1,4 +1,4 @@ -use crate::web::proxyhttp::{GetHost, LB}; +use crate::web::proxyhttp::{BGService, GetHost, LB}; use dashmap::DashMap; use pingora_core::prelude::background_service; use pingora_core::server::Server; @@ -33,7 +33,6 @@ pub fn run() { // let backends = Backends::new(Box::new(SD)); // let load_balancer = LoadBalancer::from_backends(backends); - // load_balancer.set_health_check(TcpHealthCheck::new()); // load_balancer.health_check_frequency = Some(Duration::from_secs(1)); // load_balancer.update_frequency = Some(Duration::from_secs(1)); @@ -42,11 +41,9 @@ pub fn run() { let upstreams_map: DashMap, AtomicUsize)> = DashMap::new(); - let mut ll = LB { - upstreams_map, - // upstreams_maps: DashMap::new(), - }; + let mut ll = LB { upstreams_map }; + let bg_service = background_service("bgsrvc", BGService {}); let background = background_service("load balancer", ll.discover_hosts()); background.task(); @@ -54,7 +51,7 @@ pub fn run() { lb.add_tcp("0.0.0.0:6193"); server.add_service(lb); - // server.add_service(background.task()); + server.add_service(bg_service); server.run_forever(); }