Error handling on API server

This commit is contained in:
Ara Sadoyan
2026-01-09 18:44:36 +01:00
parent 2d1a827007
commit 6c1d3c5ef8
8 changed files with 375 additions and 120 deletions

View File

@@ -41,7 +41,7 @@ pub async fn start(fp: String, mut toreturn: Sender<Configuration>) {
if start.elapsed() > Duration::from_secs(2) {
start = Instant::now();
// info!("Config File changed :=> {:?}", e);
let snd = load_configuration(file_path, "filepath").await;
let snd = load_configuration(file_path, "filepath").await.0;
match snd {
Some(snd) => {
toreturn.send(snd).await.unwrap();

View File

@@ -51,7 +51,34 @@ pub async fn for_kuber(url: &str, token: &str, conf: &ServiceMapping) -> Option<
return None;
}
let endpoints: KubeEndpoints = resp.json().await.ok()?;
// let endpoints: KubeEndpointsList = resp.json().await.ok()?;
let upstreams: DashMap<String, (Vec<Arc<InnerMap>>, AtomicUsize)> = DashMap::new();
// for endpoints in endpoints.items {
// if let Some(subsets) = endpoints.subsets {
// for subset in subsets {
// if let (Some(addresses), Some(ports)) = (subset.addresses, subset.ports) {
// let mut inner_vec = Vec::new();
// for addr in addresses {
// for port in &ports {
// let to_add = Arc::from(InnerMap {
// address: addr.ip.parse().unwrap(),
// port: port.port.clone(),
// is_ssl: false,
// is_http2: false,
// to_https: conf.to_https.unwrap_or(false),
// rate_limit: conf.rate_limit,
// healthcheck: None,
// });
// inner_vec.push(to_add);
// }
// }
// match_path(&conf, &upstreams, inner_vec.clone());
// }
// }
// }
// }
if let Some(subsets) = endpoints.subsets {
for subset in subsets {
if let (Some(addresses), Some(ports)) = (subset.addresses, subset.ports) {

View File

@@ -19,6 +19,10 @@ use std::time::Duration;
use tokio::fs::File;
use tokio::io::AsyncReadExt;
// #[derive(Debug, Deserialize)]
// pub struct KubeEndpointsList {
// pub items: Vec<KubeEndpoints>,
// }
#[derive(Debug, serde::Deserialize)]
pub struct KubeEndpoints {
pub subsets: Option<Vec<KubeSubset>>,
@@ -114,12 +118,12 @@ impl ServiceDiscovery for KubernetesDiscovery {
let upstreams = UpstreamsDashMap::new();
if let Some(kuber) = config.kubernetes.clone() {
if let Some(svc) = kuber.services {
for i in svc {
for service in svc {
let header_list: DashMap<Arc<str>, Vec<(Arc<str>, Arc<str>)>> = DashMap::new();
let mut hl = Vec::new();
build_headers(&i.client_headers, config.as_ref(), &mut hl);
build_headers(&service.client_headers, config.as_ref(), &mut hl);
if !hl.is_empty() {
match i.path.clone() {
match service.path.clone() {
Some(path) => {
header_list.insert(Arc::from(path.as_str()), hl);
}
@@ -130,11 +134,13 @@ impl ServiceDiscovery for KubernetesDiscovery {
// header_list.insert(Arc::from(path.as_str()), hl);
// header_list.insert(Arc::from(i.path).unwrap_or(Arc::from("/")).as_str(), hl);
config.client_headers.insert(i.hostname.clone(), header_list);
config.client_headers.insert(service.hostname.clone(), header_list);
}
let url = format!("https://{}/api/v1/namespaces/{}/endpoints/{}", server, namespace, i.hostname);
let list = httpclient::for_kuber(&*url, &*token, &i).await;
list_to_upstreams(list, &upstreams, &i);
let url = format!("https://{}/api/v1/namespaces/{}/endpoints/{}", server, namespace, service.hostname);
// let url = format!("https://{}/api/v1/namespaces/{}/endpoints?labelSelector=app", server, namespace);
let list = httpclient::for_kuber(&*url, &*token, &service).await;
// println!("{:?}", list);
list_to_upstreams(list, &upstreams, &service);
}
}
if let Some(lt) = clone_compare(&upstreams, &prev_upstreams, &config).await {

View File

@@ -10,7 +10,7 @@ use std::sync::Arc;
use std::{env, fs};
// use tokio::sync::oneshot::{Receiver, Sender};
pub async fn load_configuration(d: &str, kind: &str) -> Option<Configuration> {
pub async fn load_configuration(d: &str, kind: &str) -> (Option<Configuration>, String) {
let yaml_data = match kind {
"filepath" => match fs::read_to_string(d) {
Ok(data) => {
@@ -20,7 +20,7 @@ pub async fn load_configuration(d: &str, kind: &str) -> Option<Configuration> {
Err(e) => {
error!("Reading: {}: {:?}", d, e);
warn!("Running with empty upstreams list, update it via API");
return None;
return (None, e.to_string());
}
},
"content" => {
@@ -29,7 +29,7 @@ pub async fn load_configuration(d: &str, kind: &str) -> Option<Configuration> {
}
_ => {
error!("Mismatched parameter, only filepath|content is allowed");
return None;
return (None, "Mismatched parameter, only filepath|content is allowed".to_string());
}
};
@@ -37,7 +37,7 @@ pub async fn load_configuration(d: &str, kind: &str) -> Option<Configuration> {
Ok(cfg) => cfg,
Err(e) => {
error!("Failed to parse upstreams file: {}", e);
return None;
return (None, e.to_string());
}
};
@@ -49,19 +49,19 @@ pub async fn load_configuration(d: &str, kind: &str) -> Option<Configuration> {
match parsed.provider.as_str() {
"file" => {
populate_file_upstreams(&mut toreturn, &parsed).await;
Some(toreturn)
(Some(toreturn), "Ok".to_string())
}
"consul" => {
toreturn.consul = parsed.consul;
toreturn.consul.is_some().then_some(toreturn)
(toreturn.consul.is_some().then_some(toreturn), "Ok".to_string())
}
"kubernetes" => {
toreturn.kubernetes = parsed.kubernetes;
toreturn.kubernetes.is_some().then_some(toreturn)
(toreturn.kubernetes.is_some().then_some(toreturn), "Ok".to_string())
}
_ => {
warn!("Unknown provider {}", parsed.provider);
None
(None, "Unknown provider".to_string())
}
}
}