Intermediate v2

This commit is contained in:
Ara Sadoyan
2025-03-10 18:27:52 +01:00
parent a10a08678b
commit a824912a84
7 changed files with 64 additions and 51 deletions

View File

@@ -1,10 +1,9 @@
use dashmap::DashMap; use crate::utils::tools::*;
use std::sync::atomic::AtomicUsize;
use tokio::sync::RwLockReadGuard; use tokio::sync::RwLockReadGuard;
/* /*
#[allow(dead_code)] #[allow(dead_code)]
pub fn dashmaps(map1: &RwLockWriteGuard<DashMap<String, (Vec<(String, u16)>, AtomicUsize)>>, map2: &DashMap<String, (Vec<(String, u16)>, AtomicUsize)>) -> bool { pub fn dashmaps(map1: &RwLockWriteGuard<UpstreamMap>, map2: &UpstreamMap) -> bool {
if map1.len() != map2.len() { if map1.len() != map2.len() {
return false; return false;
} }
@@ -26,7 +25,7 @@ pub fn dashmaps(map1: &RwLockWriteGuard<DashMap<String, (Vec<(String, u16)>, Ato
*/ */
#[allow(dead_code)] #[allow(dead_code)]
pub fn dm(map1: &RwLockReadGuard<DashMap<String, (Vec<(String, u16)>, AtomicUsize)>>, map2: &DashMap<String, (Vec<(String, u16)>, AtomicUsize)>) -> bool { pub fn dm(map1: &RwLockReadGuard<UpstreamMap>, map2: &UpstreamMap) -> bool {
if map1.len() != map2.len() { if map1.len() != map2.len() {
return false; // Different number of keys return false; // Different number of keys
} }

View File

@@ -20,23 +20,23 @@ pub struct APIUpstreamProvider;
#[async_trait] #[async_trait]
pub trait Discovery { pub trait Discovery {
async fn run(&self, tx: Sender<DashMap<String, (Vec<(String, u16)>, AtomicUsize)>>); async fn run(&self, tx: Sender<UpstreamMap>);
} }
#[async_trait] #[async_trait]
impl Discovery for APIUpstreamProvider { impl Discovery for APIUpstreamProvider {
async fn run(&self, toreturn: Sender<DashMap<String, (Vec<(String, u16)>, AtomicUsize)>>) { async fn run(&self, toreturn: Sender<UpstreamMap>) {
webserver::run_server(toreturn).await; webserver::run_server(toreturn).await;
} }
} }
#[async_trait] #[async_trait]
impl Discovery for FromFileProvider { impl Discovery for FromFileProvider {
async fn run(&self, tx: Sender<DashMap<String, (Vec<(String, u16)>, AtomicUsize)>>) { async fn run(&self, tx: Sender<UpstreamMap>) {
tokio::spawn(watch_file(self.path.clone(), tx.clone())); tokio::spawn(watch_file(self.path.clone(), tx.clone()));
} }
} }
pub async fn watch_file(fp: String, mut toreturn: Sender<DashMap<String, (Vec<(String, u16)>, AtomicUsize)>>) { pub async fn watch_file(fp: String, mut toreturn: Sender<UpstreamMap>) {
let file_path = fp.as_str(); let file_path = fp.as_str();
let parent_dir = Path::new(file_path).parent().unwrap(); // Watch directory, not file let parent_dir = Path::new(file_path).parent().unwrap(); // Watch directory, not file
let (local_tx, mut local_rx) = tokio::sync::mpsc::channel::<notify::Result<Event>>(1); let (local_tx, mut local_rx) = tokio::sync::mpsc::channel::<notify::Result<Event>>(1);
@@ -77,15 +77,16 @@ pub async fn watch_file(fp: String, mut toreturn: Sender<DashMap<String, (Vec<(S
start = Instant::now(); start = Instant::now();
println!("Config File changed :=> {:?}", e); println!("Config File changed :=> {:?}", e);
let _sd = build_upstreams2("etc/upstreams-long.conf", "filepath"); let upstreams = build_upstreams2("etc/upstreams-long.conf", "filepath");
println!("\n\n"); print_upstreams(&upstreams);
for t in _sd.iter() {
println!("{} ==>", t.key()); let host_entry = upstreams.get("myip.netangels.net").unwrap();
for v in t.value().iter() { let path_entry = host_entry.get("/").unwrap();
println!(" {:?}", v) for p in path_entry.value().0.clone() {
} println!(" {:?}", p);
} }
println!("\n\n"); println!("\n\n");
let snd = build_upstreams(file_path, "filepath"); let snd = build_upstreams(file_path, "filepath");
@@ -99,7 +100,7 @@ pub async fn watch_file(fp: String, mut toreturn: Sender<DashMap<String, (Vec<(S
} }
} }
} }
pub fn build_upstreams(d: &str, kind: &str) -> DashMap<String, (Vec<(String, u16)>, AtomicUsize)> { pub fn build_upstreams(d: &str, kind: &str) -> UpstreamMap {
let upstreams = DashMap::new(); let upstreams = DashMap::new();
let mut contents = d.to_string(); let mut contents = d.to_string();
match kind { match kind {
@@ -150,8 +151,8 @@ pub fn build_upstreams(d: &str, kind: &str) -> DashMap<String, (Vec<(String, u16
upstreams upstreams
} }
pub fn build_upstreams2(d: &str, kind: &str) -> DashMap<String, Vec<UpstreamsStruct>> { pub fn build_upstreams2(d: &str, kind: &str) -> UpstresmDashMap {
let upstreams: DashMap<String, Vec<UpstreamsStruct>> = DashMap::new(); let upstreams: UpstresmDashMap = DashMap::new();
let mut contents = d.to_string(); let mut contents = d.to_string();
match kind { match kind {
"filepath" => { "filepath" => {
@@ -176,7 +177,7 @@ pub fn build_upstreams2(d: &str, kind: &str) -> DashMap<String, Vec<UpstreamsStr
continue; continue;
}; };
let Some(ssl) = crate::utils::tools::string_to_bool(parts.next()) else { let Some(ssl) = string_to_bool(parts.next()) else {
continue; continue;
}; };
@@ -201,13 +202,13 @@ pub fn build_upstreams2(d: &str, kind: &str) -> DashMap<String, Vec<UpstreamsStr
let Ok(port) = port_str.parse::<u16>() else { let Ok(port) = port_str.parse::<u16>() else {
continue; continue;
}; };
let d = UpstreamsStruct {
proto: proto.to_string(), let entry = upstreams.entry(hostname.to_string()).or_insert_with(DashMap::new);
path: path.to_string(), entry
address: (ip.to_string(), port, ssl), .entry(path.to_string())
atom: AtomicUsize::new(0), .or_insert_with(|| (Vec::new(), AtomicUsize::new(0)))
}; .0
upstreams.entry(hostname.to_string()).or_insert_with(|| Vec::new()).push(d); .push((ip.to_string(), port, ssl, proto.to_string()));
} }
upstreams upstreams

View File

@@ -1,3 +1,4 @@
use crate::utils::tools::*;
use dashmap::DashMap; use dashmap::DashMap;
use std::sync::atomic::AtomicUsize; use std::sync::atomic::AtomicUsize;
use std::sync::Arc; use std::sync::Arc;
@@ -5,15 +6,15 @@ use std::time::Duration;
use tokio::sync::RwLock; use tokio::sync::RwLock;
use tokio::time::interval; use tokio::time::interval;
pub async fn hc(upslist: Arc<RwLock<DashMap<String, (Vec<(String, u16)>, AtomicUsize)>>>, fullist: Arc<RwLock<DashMap<String, (Vec<(String, u16)>, AtomicUsize)>>>) { pub async fn hc(upslist: Arc<RwLock<UpstreamMap>>, fullist: Arc<RwLock<UpstreamMap>>) {
let mut period = interval(Duration::from_secs(2)); let mut period = interval(Duration::from_secs(2));
loop { loop {
tokio::select! { tokio::select! {
_ = period.tick() => { _ = period.tick() => {
// let before = Instant::now(); // let before = Instant::now();
let totest: DashMap<String, (Vec<(String, u16)>, AtomicUsize)> = DashMap::new(); let totest: UpstreamMap = DashMap::new();
let fclone: DashMap<String, (Vec<(String, u16)>, AtomicUsize)> = DashMap::new(); let fclone: UpstreamMap = DashMap::new();
// println!("\nElapsed dash: {:.2?}", before.elapsed()); // println!("\nElapsed dash: {:.2?}", before.elapsed());
// let before = Instant::now(); // let before = Instant::now();
{ {

View File

@@ -1,15 +1,27 @@
use dashmap::DashMap;
use std::any::type_name; use std::any::type_name;
use std::sync::atomic::AtomicUsize; use std::sync::atomic::AtomicUsize;
#[derive(Debug)]
#[allow(dead_code)] #[allow(dead_code)]
pub struct UpstreamsStruct { pub fn print_upstreams(upstreams: &UpstresmDashMap) {
pub proto: String, for host_entry in upstreams.iter() {
pub path: String, let hostname = host_entry.key();
pub address: (String, u16, bool), println!("Hostname: {}", hostname);
pub atom: AtomicUsize,
for path_entry in host_entry.value().iter() {
let path = path_entry.key();
println!(" Path: {}", path);
for (ip, port, ssl, proto) in path_entry.value().0.clone() {
println!(" ===> IP: {}, Port: {}, SSL: {}, Proto: {}", ip, port, ssl, proto);
}
}
}
} }
pub type UpstresmDashMap = DashMap<String, DashMap<String, (Vec<(String, u16, bool, String)>, AtomicUsize)>>;
pub type UpstreamMap = DashMap<String, (Vec<(String, u16)>, AtomicUsize)>;
#[allow(dead_code)] #[allow(dead_code)]
pub fn typeoff<T>(_: T) { pub fn typeoff<T>(_: T) {
let to = type_name::<T>(); let to = type_name::<T>();

View File

@@ -1,7 +1,7 @@
use crate::utils::discovery::{APIUpstreamProvider, Discovery, FromFileProvider}; use crate::utils::discovery::{APIUpstreamProvider, Discovery, FromFileProvider};
use crate::utils::tools::*;
use crate::utils::*; use crate::utils::*;
use async_trait::async_trait; use async_trait::async_trait;
use dashmap::DashMap;
use futures::channel::mpsc; use futures::channel::mpsc;
use futures::StreamExt; use futures::StreamExt;
use log::{info, warn}; use log::{info, warn};
@@ -14,11 +14,11 @@ use pingora_proxy::{ProxyHttp, Session};
use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc; use std::sync::Arc;
use tokio::sync::RwLock; use tokio::sync::RwLock;
use tokio::time::Instant; // use tokio::time::Instant;
pub struct LB { pub struct LB {
pub upstreams: Arc<RwLock<DashMap<String, (Vec<(String, u16)>, AtomicUsize)>>>, pub upstreams: Arc<RwLock<UpstreamMap>>,
pub umap_full: Arc<RwLock<DashMap<String, (Vec<(String, u16)>, AtomicUsize)>>>, pub umap_full: Arc<RwLock<UpstreamMap>>,
} }
#[async_trait] #[async_trait]
@@ -26,7 +26,7 @@ impl BackgroundService for LB {
async fn start(&self, mut shutdown: ShutdownWatch) { async fn start(&self, mut shutdown: ShutdownWatch) {
println!("Starting example background service"); println!("Starting example background service");
let (tx, mut rx) = mpsc::channel::<DashMap<String, (Vec<(String, u16)>, AtomicUsize)>>(0); let (tx, mut rx) = mpsc::channel::<UpstreamMap>(0);
let file_load = FromFileProvider { let file_load = FromFileProvider {
path: "etc/upstreams.conf".to_string(), path: "etc/upstreams.conf".to_string(),
}; };
@@ -61,6 +61,7 @@ impl BackgroundService for LB {
full.clear(); full.clear();
for (k,v) in newmap { for (k,v) in newmap {
println!("Host: {}", k); println!("Host: {}", k);
// <UpstreamMap
for vv in v.0.clone() { for vv in v.0.clone() {
println!(" ===> {:?}", vv); println!(" ===> {:?}", vv);
} }
@@ -98,7 +99,7 @@ impl GetHost for LB {
return None; return None;
} }
let idx = index.fetch_add(1, Ordering::Relaxed) % servers.len(); let idx = index.fetch_add(1, Ordering::Relaxed) % servers.len();
// println!("{} {:?} => len: {}, idx: {}", peer, servers[idx], servers.len(), idx); println!("{} {:?} => len: {}, idx: {}", peer, servers[idx], servers.len(), idx);
Some(servers[idx].clone()) Some(servers[idx].clone())
} else { } else {
None None
@@ -113,10 +114,8 @@ impl ProxyHttp for LB {
type CTX = (); type CTX = ();
fn new_ctx(&self) -> Self::CTX {} fn new_ctx(&self) -> Self::CTX {}
async fn upstream_peer(&self, session: &mut Session, _ctx: &mut Self::CTX) -> Result<Box<HttpPeer>> { async fn upstream_peer(&self, session: &mut Session, _ctx: &mut Self::CTX) -> Result<Box<HttpPeer>> {
let before = Instant::now(); // let before = Instant::now();
let host_name = session.req_header().headers.get("host"); let host_name = session.req_header().headers.get("host");
// let fyu = session.req_header().uri.path();
// info!("{:?} ==> {} ==> {:?}", host_name, fyu, session.request_summary());
match host_name { match host_name {
Some(host) => { Some(host) => {
let h = host.to_str().unwrap().split(':').collect::<Vec<&str>>(); let h = host.to_str().unwrap().split(':').collect::<Vec<&str>>();
@@ -124,13 +123,13 @@ impl ProxyHttp for LB {
match ddr.await { match ddr.await {
Some((host, port)) => { Some((host, port)) => {
let peer = Box::new(HttpPeer::new((host, port), false, String::new())); let peer = Box::new(HttpPeer::new((host, port), false, String::new()));
info!("{:?}, Time => {:.2?}", session.request_summary(), before.elapsed()); // info!("{:?}, Time => {:.2?}", session.request_summary(), before.elapsed());
Ok(peer) Ok(peer)
} }
None => { None => {
warn!("Returning default list => {:?}", ("127.0.0.1", 3000)); warn!("Returning default list => {:?}", ("127.0.0.1", 3000));
let peer = Box::new(HttpPeer::new(("127.0.0.1", 3000), false, String::new())); let peer = Box::new(HttpPeer::new(("127.0.0.1", 3000), false, String::new()));
info!("{:?}, Time => {:.2?}", session.request_summary(), before.elapsed()); // info!("{:?}, Time => {:.2?}", session.request_summary(), before.elapsed());
Ok(peer) Ok(peer)
} }
} }
@@ -138,7 +137,7 @@ impl ProxyHttp for LB {
None => { None => {
warn!("Returning default list => {:?}", ("127.0.0.1", 3000)); warn!("Returning default list => {:?}", ("127.0.0.1", 3000));
let peer = Box::new(HttpPeer::new(("127.0.0.1", 3000), false, String::new())); let peer = Box::new(HttpPeer::new(("127.0.0.1", 3000), false, String::new()));
info!("{:?}, Time => {:.2?}", session.request_summary(), before.elapsed()); // info!("{:?}, Time => {:.2?}", session.request_summary(), before.elapsed());
Ok(peer) Ok(peer)
} }
} }

View File

@@ -1,8 +1,8 @@
use crate::utils::tools::*;
use crate::web::proxyhttp::LB; use crate::web::proxyhttp::LB;
use dashmap::DashMap; use dashmap::DashMap;
use pingora_core::prelude::background_service; use pingora_core::prelude::background_service;
use pingora_core::server::Server; use pingora_core::server::Server;
use std::sync::atomic::AtomicUsize;
use std::sync::Arc; use std::sync::Arc;
use tokio::sync::RwLock; use tokio::sync::RwLock;
@@ -12,10 +12,10 @@ pub fn run() {
let mut server = Server::new(None).unwrap(); let mut server = Server::new(None).unwrap();
server.bootstrap(); server.bootstrap();
let upstreams_map: DashMap<String, (Vec<(String, u16)>, AtomicUsize)> = DashMap::new(); let upstreams_map: UpstreamMap = DashMap::new();
let config = Arc::new(RwLock::new(upstreams_map)); let config = Arc::new(RwLock::new(upstreams_map));
let umap_full: DashMap<String, (Vec<(String, u16)>, AtomicUsize)> = DashMap::new(); let umap_full: UpstreamMap = DashMap::new();
let fconfig = Arc::new(RwLock::new(umap_full)); let fconfig = Arc::new(RwLock::new(umap_full));
let lb = LB { let lb = LB {

View File

@@ -1,3 +1,4 @@
use crate::utils::tools::*;
use axum::body::Body; use axum::body::Body;
use axum::http::{Response, StatusCode}; use axum::http::{Response, StatusCode};
use axum::response::IntoResponse; use axum::response::IntoResponse;
@@ -17,7 +18,7 @@ struct UpstreamData {
counter: usize, counter: usize,
} }
pub async fn run_server(mut toreturn: Sender<DashMap<String, (Vec<(String, u16)>, AtomicUsize)>>) { pub async fn run_server(mut toreturn: Sender<UpstreamMap>) {
let mut tr = toreturn.clone(); let mut tr = toreturn.clone();
let app = Router::new() let app = Router::new()
.route("/{*wildcard}", get(getconfig)) .route("/{*wildcard}", get(getconfig))