diff --git a/Cargo.lock b/Cargo.lock index 6a5bdff..246c2d9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -136,7 +136,6 @@ dependencies = [ "base16ct 1.0.0", "base64", "dashmap", - "futures", "instant-acme", "jsonwebtoken", "libc", diff --git a/Cargo.toml b/Cargo.toml index 227ad45..689eab1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -24,7 +24,6 @@ pingora-limits = "0.8.1" dashmap = "7.0.0-rc2" async-trait = "0.1.89" log = "0.4.30" -futures = "0.3.32" notify = "9.0.0-rc.4" axum = { version = "0.8.9" } reqwest = { version = "0.13.4", features = ["json", "stream", "blocking"] } diff --git a/src/utils/discovery.rs b/src/utils/discovery.rs index 43e37fe..4e873de 100644 --- a/src/utils/discovery.rs +++ b/src/utils/discovery.rs @@ -3,8 +3,8 @@ use crate::utils::kuberconsul::{ConsulDiscovery, KubernetesDiscovery, ServiceDis use crate::utils::structs::{Configuration, UpstreamsDashMap}; use crate::web::webserver; use async_trait::async_trait; -use futures::channel::mpsc::Sender; use std::sync::Arc; +use tokio::sync::mpsc::Sender; pub struct APIUpstreamProvider { pub config_api_enabled: bool, @@ -46,7 +46,7 @@ impl Discovery for APIUpstreamProvider { #[async_trait] impl Discovery for FromFileProvider { async fn start(&self, tx: Sender) { - tokio::spawn(filewatch::start(self.path.clone(), tx.clone())); + tokio::spawn(filewatch::start(self.path.clone(), tx)); } } diff --git a/src/utils/filewatch.rs b/src/utils/filewatch.rs index c8ccc45..49f41e9 100644 --- a/src/utils/filewatch.rs +++ b/src/utils/filewatch.rs @@ -1,16 +1,15 @@ use crate::utils::parceyaml::load_configuration; use crate::utils::structs::Configuration; -use futures::channel::mpsc::Sender; -use futures::SinkExt; use log::error; use notify::event::ModifyKind; use notify::{Config, Event, EventKind, RecommendedWatcher, RecursiveMode, Watcher}; use pingora::prelude::sleep; use std::path::Path; use std::time::{Duration, Instant}; +use tokio::sync::mpsc::Sender; use tokio::task; -pub async fn start(fp: String, mut toreturn: Sender) { +pub async fn start(fp: String, toreturn: Sender) { sleep(Duration::from_millis(50)).await; // For having nice logs :-) let file_path = fp.as_str(); let parent_dir = Path::new(file_path).parent().unwrap(); diff --git a/src/utils/kuberconsul.rs b/src/utils/kuberconsul.rs index 7b6b477..c1821ba 100644 --- a/src/utils/kuberconsul.rs +++ b/src/utils/kuberconsul.rs @@ -4,8 +4,6 @@ use crate::utils::structs::{Configuration, GlobalServiceMapping, InnerMap, Upstr use crate::utils::tools::{clone_dashmap_into, compare_dashmaps, print_upstreams}; use async_trait::async_trait; use dashmap::DashMap; -use futures::channel::mpsc::Sender; -use futures::SinkExt; use pingora::prelude::sleep; use rand::RngExt; use serde::Deserialize; @@ -18,6 +16,7 @@ use std::sync::Arc; use std::time::Duration; use tokio::fs::File; use tokio::io::AsyncReadExt; +use tokio::sync::mpsc::Sender; #[derive(Debug, serde::Deserialize)] pub struct KubeEndpoints { @@ -95,7 +94,7 @@ pub struct ConsulDiscovery; #[async_trait] impl ServiceDiscovery for KubernetesDiscovery { - async fn fetch_upstreams(&self, config: Arc, mut toreturn: Sender) { + async fn fetch_upstreams(&self, config: Arc, toreturn: Sender) { let prev_upstreams = UpstreamsDashMap::new(); if let Some(kuber) = config.kubernetes.clone() { @@ -162,7 +161,7 @@ fn get_current_namespace() -> Option { #[async_trait] impl ServiceDiscovery for ConsulDiscovery { - async fn fetch_upstreams(&self, config: Arc, mut toreturn: Sender) { + async fn fetch_upstreams(&self, config: Arc, toreturn: Sender) { let prev_upstreams = UpstreamsDashMap::new(); loop { let upstreams = UpstreamsDashMap::new(); diff --git a/src/utils/metrics.rs b/src/utils/metrics.rs index 491dee8..b0a921e 100644 --- a/src/utils/metrics.rs +++ b/src/utils/metrics.rs @@ -23,15 +23,6 @@ pub static REQUEST_COUNT: LazyLock = LazyLock::new(|| register_int_c pub static RESPONSE_CODES: LazyLock = LazyLock::new(|| register_int_counter_vec!("aralez_responses_total", "Responses grouped by status code", &["status"]).unwrap()); -// pub static RESPONSE_LATENCY: LazyLock = LazyLock::new(|| { -// register_histogram!( -// "aralez_response_latency_seconds", -// "Response latency in seconds", -// vec![0.01, 0.05, 0.1, 0.25, 0.5, 1.0, 2.0, 5.0] -// ) -// .unwrap() -// }); - pub static RESPONSE_LATENCY: LazyLock = LazyLock::new(|| { register_histogram!( "aralez_response_latency_seconds", diff --git a/src/web/bgservice.rs b/src/web/bgservice.rs index 055c4ed..6fefa79 100644 --- a/src/web/bgservice.rs +++ b/src/web/bgservice.rs @@ -8,18 +8,17 @@ use crate::web::logging::init_logging; use crate::web::proxyhttp::LB; use async_trait::async_trait; use dashmap::DashMap; -use futures::channel::mpsc; -use futures::{SinkExt, StreamExt}; use log::{error, info}; use pingora_core::server::ShutdownWatch; use pingora_core::services::background::BackgroundService; use std::sync::Arc; +use tokio::sync::mpsc; #[async_trait] impl BackgroundService for LB { async fn start(&self, mut shutdown: ShutdownWatch) { info!("Starting background service"); // tx: Sender - let (mut tx, mut rx) = mpsc::channel::(1); + let (tx, mut rx) = mpsc::channel::(1); let tx_api = tx.clone(); let config = load_configuration(self.config.upstreams_conf.clone().as_str(), "filepath") .await @@ -84,7 +83,7 @@ impl BackgroundService for LB { _ = shutdown.changed() => { break; } - val = rx.next() => { + val = rx.recv() => { if let Some(ss) = val { clone_dashmap_into(&ss.upstreams, &self.ump_full); clone_dashmap_into(&ss.upstreams, &self.ump_upst); diff --git a/src/web/webserver.rs b/src/web/webserver.rs index a537fe2..97636f2 100644 --- a/src/web/webserver.rs +++ b/src/web/webserver.rs @@ -10,8 +10,6 @@ use axum::http::{Response, StatusCode}; use axum::response::IntoResponse; use axum::routing::{any, get, post}; use axum::{Json, Router}; -use futures::channel::mpsc::Sender; -use futures::SinkExt; use jsonwebtoken::{encode, EncodingKey, Header}; use log::{debug, error, info, warn}; use prometheus::{gather, Encoder, TextEncoder}; @@ -37,14 +35,14 @@ pub(crate) struct AppState { pub(crate) cert_creds: String, pub(crate) certs_dir: String, upstreams_file: String, - config_sender: Sender, + config_sender: mpsc::Sender, config_api_enabled: bool, current_upstreams: Arc, full_upstreams: Arc, } #[allow(unused_mut)] -pub async fn run_server(config: &APIUpstreamProvider, mut to_return: Sender, upstreams_curr: Arc, upstreams_full: Arc) { +pub async fn run_server(config: &APIUpstreamProvider, mut to_return: mpsc::Sender, upstreams_curr: Arc, upstreams_full: Arc) { let credsfile = config.config_dir.clone() + "/acme_credentials.json"; let app_state = AppState { master_key: config.masterkey.clone(), @@ -119,7 +117,7 @@ async fn conf(State(st): State, Query(params): Query