From 1338d04963760500f022b9a8621ab82fcf44a082 Mon Sep 17 00:00:00 2001 From: Ara Sadoyan Date: Thu, 20 Feb 2025 18:44:12 +0100 Subject: [PATCH] Axum api server with json and "conf" file support. --- Cargo.lock | 129 +++++++++++++++++++++++++++++++++++++++++ Cargo.toml | 3 + src/utils/discovery.rs | 57 +++++++++--------- src/web.rs | 1 + src/web/proxyhttp.rs | 20 +------ src/web/webserver.rs | 69 ++++++++++++++++++++++ 6 files changed, 235 insertions(+), 44 deletions(-) create mode 100644 src/web/webserver.rs diff --git a/Cargo.lock b/Cargo.lock index 71052b9..e27d908 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -184,6 +184,7 @@ name = "asyncweb" version = "0.1.0" dependencies = [ "async-trait", + "axum", "bytes", "dashmap", "env_logger", @@ -201,6 +202,8 @@ dependencies = [ "pingora-load-balancing", "pingora-proxy", "rand", + "serde", + "serde_json", "tokio", ] @@ -236,6 +239,60 @@ version = "1.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ace50bade8e6234aa140d9a2f552bbee1db4d353f69b8217bc503490fc1a9f26" +[[package]] +name = "axum" +version = "0.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6d6fd624c75e18b3b4c6b9caf42b1afe24437daaee904069137d8bab077be8b8" +dependencies = [ + "axum-core", + "bytes", + "form_urlencoded", + "futures-util", + "http 1.2.0", + "http-body 1.0.1", + "http-body-util", + "hyper 1.6.0", + "hyper-util", + "itoa", + "matchit", + "memchr", + "mime", + "percent-encoding", + "pin-project-lite", + "rustversion", + "serde", + "serde_json", + "serde_path_to_error", + "serde_urlencoded", + "sync_wrapper", + "tokio", + "tower", + "tower-layer", + "tower-service", + "tracing", +] + +[[package]] +name = "axum-core" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "df1362f362fd16024ae199c1970ce98f9661bf5ef94b9808fee734bc3698b733" +dependencies = [ + "bytes", + "futures-util", + "http 1.2.0", + "http-body 1.0.1", + "http-body-util", + "mime", + "pin-project-lite", + "rustversion", + "sync_wrapper", + "tower-layer", + "tower-service", + "tracing", +] + [[package]] name = "backtrace" version = "0.3.74" @@ -582,6 +639,15 @@ version = "0.1.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a0d2fde1f7b3d48b8395d5f2de76c18a528bd6a9cdde438df747bfcba3e05d6f" +[[package]] +name = "form_urlencoded" +version = "1.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e13624c2627564efccf4934284bdd98cbaa14e79b0b5a141218e507b3a823456" +dependencies = [ + "percent-encoding", +] + [[package]] name = "fsevent-sys" version = "4.1.0" @@ -1157,6 +1223,12 @@ dependencies = [ "regex-automata 0.1.10", ] +[[package]] +name = "matchit" +version = "0.8.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "47e1ffaa40ddd1f3ed91f717a33c8c0ee23fff369e3aa8772b9605cc1d22f4c3" + [[package]] name = "memchr" version = "2.7.4" @@ -1172,6 +1244,12 @@ dependencies = [ "autocfg", ] +[[package]] +name = "mime" +version = "0.3.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6877bb514081ee2a7ff5ef9de3281f14a4dd4bceac4c09388074a6b5df8a139a" + [[package]] name = "mini-redis" version = "0.4.1" @@ -1871,6 +1949,28 @@ dependencies = [ "serde", ] +[[package]] +name = "serde_path_to_error" +version = "0.1.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "af99884400da37c88f5e9146b7f1fd0fbcae8f6eec4e9da38b67d05486f814a6" +dependencies = [ + "itoa", + "serde", +] + +[[package]] +name = "serde_urlencoded" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d3491c14715ca2294c4d6a88f15e84739788c1d030eed8c110436aafdaa2f3fd" +dependencies = [ + "form_urlencoded", + "itoa", + "ryu", + "serde", +] + [[package]] name = "serde_yaml" version = "0.8.26" @@ -2029,6 +2129,12 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "sync_wrapper" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0bf256ce5efdfa370213c1dabab5935a12e49f2c58d15e9eac2870d3b4f27263" + [[package]] name = "termcolor" version = "1.4.1" @@ -2159,6 +2265,28 @@ dependencies = [ "tokio", ] +[[package]] +name = "tower" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d039ad9159c98b70ecfd540b2573b97f7f52c3e8d9f8ad57a24b916a536975f9" +dependencies = [ + "futures-core", + "futures-util", + "pin-project-lite", + "sync_wrapper", + "tokio", + "tower-layer", + "tower-service", + "tracing", +] + +[[package]] +name = "tower-layer" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "121c2a6cda46980bb0fcd1647ffaf6cd3fc79a013de288782836f6df9c48780e" + [[package]] name = "tower-service" version = "0.3.3" @@ -2171,6 +2299,7 @@ version = "0.1.41" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "784e0ac535deb450455cbfa28a6f0df145ea1bb7ae51b821cf5e7927fdcfbdd0" dependencies = [ + "log", "pin-project-lite", "tracing-attributes", "tracing-core", diff --git a/Cargo.toml b/Cargo.toml index 5438cf4..ea6692f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -25,4 +25,7 @@ env_logger = { version = "0.11.6" } log = "0.4.25" futures = "0.3.31" notify = "8.0.0" +axum = "0.8.1" +serde = { version = "1.0.217", features = ["derive"] } +serde_json = "1.0.137" diff --git a/src/utils/discovery.rs b/src/utils/discovery.rs index 039352b..7ae2a1c 100644 --- a/src/utils/discovery.rs +++ b/src/utils/discovery.rs @@ -5,6 +5,7 @@ use std::fs; use std::sync::atomic::AtomicUsize; use std::time::{Duration, Instant}; +use crate::web::webserver; use async_trait::async_trait; use notify::event::ModifyKind; use notify::{Config, Event, EventKind, RecommendedWatcher, RecursiveMode, Watcher}; @@ -14,9 +15,8 @@ use tokio::task; pub struct FromFileProvider { pub path: String, } -pub struct APIUpstreamProvider { - pub api_url: String, -} +pub struct APIUpstreamProvider; + #[async_trait] pub trait Discovery { async fn run(&self, tx: Sender, AtomicUsize)>>); @@ -24,16 +24,15 @@ pub trait Discovery { #[async_trait] impl Discovery for APIUpstreamProvider { - async fn run(&self, mut toreturn: Sender, AtomicUsize)>>) { + async fn run(&self, toreturn: Sender, AtomicUsize)>>) { + let _ = tokio::spawn(async move { webserver::run_server(toreturn).await }); loop { - let dm: DashMap, AtomicUsize)> = DashMap::new(); - dm.insert( - self.api_url.to_string(), - (vec![("192.168.1.1".parse().unwrap(), 8000), ("192.168.1.10".parse().unwrap(), 8000)], AtomicUsize::new(0)), - ); - println!("= = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = "); - let _ = toreturn.send(dm).await.unwrap(); - println!("= = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = "); + // let dm: DashMap, AtomicUsize)> = DashMap::new(); + // dm.insert( + // "popok.netangels.net".to_string(), + // (vec![("192.168.1.1".parse().unwrap(), 8000), ("192.168.1.10".parse().unwrap(), 8000)], AtomicUsize::new(0)), + // ); + // let _ = toreturn.send(dm).await.unwrap(); tokio::time::sleep(Duration::from_secs(20)).await; } } @@ -56,7 +55,7 @@ pub async fn watch_file(fp: String, mut toreturn: Sender {:?}", e); - let snd = read_upstreams_from_file(file_path); + let snd = read_upstreams_from_file(file_path, "filepath"); let _ = toreturn.send(snd).await.unwrap(); } } } - _ => (), //println!("* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * *"), + _ => (), }, Err(e) => println!("Watch error: {:?}", e), } } } -fn read_upstreams_from_file(path: &str) -> DashMap, AtomicUsize)> { +pub fn read_upstreams_from_file(d: &str, kind: &str) -> DashMap, AtomicUsize)> { let upstreams = DashMap::new(); - let contents = match fs::read_to_string(path) { - Ok(data) => data, - Err(e) => { - eprintln!("Error reading file: {:?}", e); - return upstreams; + let mut contents = d.to_string(); + match kind { + "filepath" => { + println!("Reading upstreams from {}", d); + let _ = match fs::read_to_string(d) { + Ok(data) => contents = data, + Err(e) => { + eprintln!("Error reading file: {:?}", e); + return upstreams; + } + }; } - }; + "content" => { + println!("Reading upstreams from API post body"); + } + _ => println!("*******************> nothing <*******************"), + } for line in contents.lines().filter(|line| !line.trim().is_empty()) { let mut parts = line.split_whitespace(); diff --git a/src/web.rs b/src/web.rs index 1148a78..42b6bdd 100644 --- a/src/web.rs +++ b/src/web.rs @@ -1,2 +1,3 @@ pub mod proxyhttp; pub mod start; +pub mod webserver; diff --git a/src/web/proxyhttp.rs b/src/web/proxyhttp.rs index 002a393..fa28332 100644 --- a/src/web/proxyhttp.rs +++ b/src/web/proxyhttp.rs @@ -1,5 +1,3 @@ -// use crate::utils::compare; -// use crate::utils::discovery; use crate::utils::discovery::{APIUpstreamProvider, Discovery, FromFileProvider}; use crate::utils::*; use async_trait::async_trait; @@ -21,26 +19,19 @@ pub struct LB { pub upstreams: Arc, AtomicUsize)>>>, pub umap_full: Arc, AtomicUsize)>>>, } -// pub struct BGService { -// pub upstreams: Arc, AtomicUsize)>>>, -// pub umap_full: Arc, AtomicUsize)>>>, -// } #[async_trait] impl BackgroundService for LB { async fn start(&self, mut shutdown: ShutdownWatch) { tokio::spawn(healthcheck::hc(self.upstreams.clone(), self.umap_full.clone())); println!("Starting example background service"); - // let (tra, mut rec) = broadcast::channel::, AtomicUsize)>>(16); let (tx, mut rx) = mpsc::channel::, AtomicUsize)>>(0); let file_load = FromFileProvider { path: "etc/upstreams.conf".to_string(), }; - let api_load = APIUpstreamProvider { - api_url: "myip.netangels.net".to_string(), - }; + let api_load = APIUpstreamProvider; let tx_file = tx.clone(); let tx_api = tx.clone(); @@ -52,23 +43,19 @@ impl BackgroundService for LB { _ = shutdown.changed() => { break; } - // _ = period.tick() => { val = rx.next() => { match val { Some(newmap) => { - println!("{:?}", newmap); let umap_work = self.upstreams.write().await; let umap_full = self.umap_full.write().await; if !compare::dashmaps(&umap_full, &newmap) { - println!("DashMaps are different. Syncing !!!!!"); umap_work.clear(); umap_full.clear(); for (k,v) in newmap { println!("Host: {}", k); for vv in v.0.clone() { - println!(" Upstreams: {:?}", vv); + println!(" ===> {:?}", vv); } - // println!("{} -> {:?}", k, v); umap_work.insert(k.clone(), (v.0.clone(), AtomicUsize::new(0))); // No need for extra vec! umap_full.insert(k, (v.0, AtomicUsize::new(0))); // Use `value.0` directly } @@ -92,9 +79,6 @@ pub trait GetHost { impl GetHost for LB { async fn get_host(&self, peer: &str) -> Option<(String, u16)> { let map_read = self.upstreams.read().await; - // let ful_read = self.umap_full.read().await; - // println!("DN ==> {:?}", map_read); - // println!("FU ==> {:?}", ful_read); let x = if let Some(entry) = map_read.get(peer) { let (servers, index) = entry.value(); // No clone here diff --git a/src/web/webserver.rs b/src/web/webserver.rs new file mode 100644 index 0000000..8f1c065 --- /dev/null +++ b/src/web/webserver.rs @@ -0,0 +1,69 @@ +use axum::body::Body; +use axum::http::{Response, StatusCode}; +use axum::routing::post; +use axum::{routing::get, Json, Router}; +use dashmap::DashMap; +use futures::channel::mpsc::Sender; +use futures::SinkExt; +use serde::{Deserialize, Serialize}; +use std::collections::HashMap; +use std::sync::atomic::AtomicUsize; +use tokio::net::TcpListener; + +#[derive(Debug, Serialize, Deserialize)] +struct UpstreamData { + servers: Vec<(String, u16)>, + counter: usize, +} + +pub async fn run_server(mut toreturn: Sender, AtomicUsize)>>) { + let mut tr = toreturn.clone(); + let app = Router::new() + .route("/", get(getconfig)) + .route( + "/conf", + post(|up: String| async move { + let serverlist = crate::utils::discovery::read_upstreams_from_file(up.as_str(), "content"); + let _ = tr.send(serverlist).await.unwrap(); + Response::builder().status(StatusCode::CREATED).body(Body::from("Config, conf file, updated!\n")).unwrap() + }) + .with_state("state"), + ) + .route( + "/json", + post(|Json(payload): Json>| async move { + let upstreams = DashMap::new(); + for (key, value) in payload { + upstreams.insert(key, (value.servers, AtomicUsize::new(value.counter))); + } + let _ = toreturn.send(upstreams).await.unwrap(); + Response::builder().status(StatusCode::CREATED).body(Body::from("Config, json, updated!\n")).unwrap() + }), + ); + let listener = TcpListener::bind("0.0.0.0:3000").await.unwrap(); + println!("Axum API server running on port 3000"); + axum::serve(listener, app).await.unwrap(); +} + +async fn getconfig() -> String { + "Hello from Axum API inside Pingora!\n".to_string() +} +// curl -XPOST -H 'Content-Type: application/json' --data-binary @./push.json 127.0.0.1:3000/json +// curl -XPOST --data-binary @./etc/upstreams.txt 127.0.0.1:3000/conf + +/* +async fn config(Json(payload): Json>) -> impl IntoResponse { + let upstreams = DashMap::new(); + for (key, value) in payload { + upstreams.insert(key, (value.servers, AtomicUsize::new(value.counter))); + } + println!("{:?}", upstreams); + Response::builder().status(StatusCode::CREATED).body(Body::from("Config updated!\n")).unwrap() +} +async fn parse_upstreams(up: String) -> impl IntoResponse { + println!("Parsing: {}", up); + let serverlist = read_upstreams_from_file(up.as_str()); + println!("{:?}", serverlist); + Response::builder().status(StatusCode::CREATED).body(Body::from("Config updated!\n")).unwrap() +} +*/