mirror of
https://github.com/sadoyan/aralez.git
synced 2026-06-10 09:14:18 +08:00
Honoring OS signals
This commit is contained in:
11
Cargo.lock
generated
11
Cargo.lock
generated
@@ -159,6 +159,7 @@ dependencies = [
|
|||||||
"serde_json",
|
"serde_json",
|
||||||
"serde_yml",
|
"serde_yml",
|
||||||
"sha2 0.11.0",
|
"sha2 0.11.0",
|
||||||
|
"signal-hook",
|
||||||
"subtle",
|
"subtle",
|
||||||
"tikv-jemalloc-ctl",
|
"tikv-jemalloc-ctl",
|
||||||
"tikv-jemallocator",
|
"tikv-jemallocator",
|
||||||
@@ -3649,6 +3650,16 @@ version = "1.3.0"
|
|||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "0fda2ff0d084019ba4d7c6f371c95d8fd75ce3524c3cb8fb653a3023f6323e64"
|
checksum = "0fda2ff0d084019ba4d7c6f371c95d8fd75ce3524c3cb8fb653a3023f6323e64"
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "signal-hook"
|
||||||
|
version = "0.4.4"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "b2a0c28ca5908dbdbcd52e6fdaa00358ab88637f8ab33e1f188dd510eb44b53d"
|
||||||
|
dependencies = [
|
||||||
|
"libc",
|
||||||
|
"signal-hook-registry",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "signal-hook-registry"
|
name = "signal-hook-registry"
|
||||||
version = "1.4.8"
|
version = "1.4.8"
|
||||||
|
|||||||
@@ -50,3 +50,4 @@ log4rs = "1.4.0"
|
|||||||
#mimalloc = { version = "0.1.52", default-features = false }
|
#mimalloc = { version = "0.1.52", default-features = false }
|
||||||
tikv-jemallocator = "0.7.0"
|
tikv-jemallocator = "0.7.0"
|
||||||
tikv-jemalloc-ctl = { version = "0.7.0", features = ["stats"] }
|
tikv-jemalloc-ctl = { version = "0.7.0", features = ["stats"] }
|
||||||
|
signal-hook = "0.4.4"
|
||||||
|
|||||||
@@ -1,3 +1,4 @@
|
|||||||
|
pub mod acme;
|
||||||
pub mod bgservice;
|
pub mod bgservice;
|
||||||
pub mod gethosts;
|
pub mod gethosts;
|
||||||
pub mod proxyhttp;
|
pub mod proxyhttp;
|
||||||
|
|||||||
64
src/web/acme.rs
Normal file
64
src/web/acme.rs
Normal file
@@ -0,0 +1,64 @@
|
|||||||
|
use crate::tls::acme::order::CHALLENGES;
|
||||||
|
use crate::tls::acme::{account, order};
|
||||||
|
use axum::body::Body;
|
||||||
|
use axum::extract::State;
|
||||||
|
use axum::http::{Response, StatusCode};
|
||||||
|
use axum::response::IntoResponse;
|
||||||
|
|
||||||
|
#[allow(clippy::needless_return)]
|
||||||
|
pub async fn acme_create(State(state): State<crate::web::webserver::AppState>) -> impl IntoResponse {
|
||||||
|
match account::load_or_create(state.cert_creds.as_str()).await {
|
||||||
|
Ok(txt) => {
|
||||||
|
return Response::builder()
|
||||||
|
.status(StatusCode::OK)
|
||||||
|
.header("Content-Type", "text/plain")
|
||||||
|
.body(Body::from(txt))
|
||||||
|
.unwrap()
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
return Response::builder()
|
||||||
|
.status(StatusCode::INTERNAL_SERVER_ERROR)
|
||||||
|
.body(Body::from(format!("Failed to create account: {}", e)))
|
||||||
|
.unwrap()
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
|
#[allow(clippy::needless_return)]
|
||||||
|
pub async fn acme_order(State(state): State<crate::web::webserver::AppState>, axum::extract::Path(domain): axum::extract::Path<String>) -> impl IntoResponse {
|
||||||
|
let domain_clean = domain.trim_matches('/');
|
||||||
|
match order::order(domain_clean, state.cert_creds.as_str(), state.certs_dir).await {
|
||||||
|
Ok(txt) => {
|
||||||
|
return Response::builder()
|
||||||
|
.status(StatusCode::OK)
|
||||||
|
.header("Content-Type", "text/plain")
|
||||||
|
.body(Body::from(txt))
|
||||||
|
.unwrap()
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
return Response::builder()
|
||||||
|
.status(StatusCode::INTERNAL_SERVER_ERROR)
|
||||||
|
.body(Body::from(format!("Failed to order a certificate: {}", e)))
|
||||||
|
.unwrap()
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
|
pub async fn http01_challenge(axum::extract::Path(token): axum::extract::Path<String>) -> impl IntoResponse {
|
||||||
|
if let Ok(challenges) = CHALLENGES.read() {
|
||||||
|
// for k in challenges.iter() {
|
||||||
|
// println!(" ==> {} : {}", k.0, k.1);
|
||||||
|
// }
|
||||||
|
|
||||||
|
if let Some(key_authorization) = challenges.get(&token) {
|
||||||
|
return Response::builder()
|
||||||
|
.status(StatusCode::OK)
|
||||||
|
.header("Content-Type", "text/plain")
|
||||||
|
.body(Body::from(key_authorization.clone()))
|
||||||
|
.unwrap();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Response::builder()
|
||||||
|
.status(StatusCode::NOT_FOUND)
|
||||||
|
.header("Content-Type", "text/plain")
|
||||||
|
.body(Body::from("Not found"))
|
||||||
|
.unwrap()
|
||||||
|
}
|
||||||
@@ -6,16 +6,22 @@ use crate::utils::structs::Extraparams;
|
|||||||
use crate::utils::tools::*;
|
use crate::utils::tools::*;
|
||||||
use crate::web::proxyhttp::LB;
|
use crate::web::proxyhttp::LB;
|
||||||
use arc_swap::ArcSwap;
|
use arc_swap::ArcSwap;
|
||||||
use ctrlc;
|
|
||||||
use dashmap::DashMap;
|
use dashmap::DashMap;
|
||||||
use log::info;
|
use log::info;
|
||||||
use pingora::tls::ssl::{SslAlert, SslRef};
|
use pingora::tls::ssl::{SslAlert, SslRef};
|
||||||
use pingora_core::listeners::tls::TlsSettings;
|
use pingora_core::listeners::tls::TlsSettings;
|
||||||
use pingora_core::prelude::{background_service, Opt};
|
use pingora_core::prelude::{background_service, Opt};
|
||||||
use pingora_core::server::Server;
|
use pingora_core::server::Server;
|
||||||
|
use privdrop::reexports::libc::SIGQUIT;
|
||||||
|
use signal_hook::{
|
||||||
|
consts::{SIGINT, SIGTERM},
|
||||||
|
iterator::Signals,
|
||||||
|
};
|
||||||
use std::sync::mpsc::{channel, Receiver, Sender};
|
use std::sync::mpsc::{channel, Receiver, Sender};
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
use std::time::Duration;
|
||||||
use std::{fs, thread};
|
use std::{fs, thread};
|
||||||
|
|
||||||
pub fn run() {
|
pub fn run() {
|
||||||
// default_provider().install_default().expect("Failed to install rustls crypto provider");
|
// default_provider().install_default().expect("Failed to install rustls crypto provider");
|
||||||
let parameters = Opt::parse_args();
|
let parameters = Opt::parse_args();
|
||||||
@@ -104,15 +110,23 @@ pub fn run() {
|
|||||||
proxy.add_tcp(bind_address_http.as_str());
|
proxy.add_tcp(bind_address_http.as_str());
|
||||||
server.add_service(proxy);
|
server.add_service(proxy);
|
||||||
server.add_service(bg_srvc);
|
server.add_service(bg_srvc);
|
||||||
|
|
||||||
thread::spawn(move || server.run_forever());
|
thread::spawn(move || server.run_forever());
|
||||||
|
|
||||||
if let (Some(user), Some(group)) = (cfg.rungroup.clone(), cfg.runuser.clone()) {
|
if let (Some(user), Some(group)) = (cfg.rungroup.clone(), cfg.runuser.clone()) {
|
||||||
drop_priv(user, group, cfg.proxy_address_http.clone(), cfg.proxy_address_tls.clone());
|
drop_priv(user, group, cfg.proxy_address_http.clone(), cfg.proxy_address_tls.clone());
|
||||||
}
|
}
|
||||||
|
|
||||||
let (tx, rx) = channel();
|
let mut signals = Signals::new(&[SIGINT, SIGTERM, SIGQUIT]).unwrap();
|
||||||
ctrlc::set_handler(move || tx.send(()).expect("Could not send signal on channel.")).expect("Error setting Ctrl-C handler");
|
for sig in signals.forever() {
|
||||||
rx.recv().expect("Could not receive from channel.");
|
match sig {
|
||||||
info!("Signal received ! Exiting...");
|
SIGINT => info!("SIGINT received! Exiting..."),
|
||||||
|
SIGTERM => info!("SIGTERM received! Exiting..."),
|
||||||
|
SIGQUIT => {
|
||||||
|
thread::sleep(Duration::from_secs(300));
|
||||||
|
info!("SIGQUIT received! Exiting...")
|
||||||
|
}
|
||||||
|
_ => unreachable!(),
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,10 +1,9 @@
|
|||||||
use crate::tls::acme::order::CHALLENGES;
|
|
||||||
use crate::tls::acme::{account, order};
|
|
||||||
use crate::utils::discovery::APIUpstreamProvider;
|
use crate::utils::discovery::APIUpstreamProvider;
|
||||||
use crate::utils::jwt::Claims;
|
use crate::utils::jwt::Claims;
|
||||||
use crate::utils::metrics::{get_memory_usage, get_open_files, MEMORY_USAGE, OPEN_FILES};
|
use crate::utils::metrics::{get_memory_usage, get_open_files, MEMORY_USAGE, OPEN_FILES};
|
||||||
use crate::utils::structs::{Config, Configuration, UpstreamsDashMap};
|
use crate::utils::structs::{Config, Configuration, UpstreamsDashMap};
|
||||||
use crate::utils::tools::{upstreams_liveness_json, upstreams_to_json};
|
use crate::utils::tools::{upstreams_liveness_json, upstreams_to_json};
|
||||||
|
use crate::web::acme::{acme_create, acme_order, http01_challenge};
|
||||||
use axum::body::Body;
|
use axum::body::Body;
|
||||||
use axum::extract::{Query, State};
|
use axum::extract::{Query, State};
|
||||||
use axum::http::{Response, StatusCode};
|
use axum::http::{Response, StatusCode};
|
||||||
@@ -17,10 +16,12 @@ use jsonwebtoken::{encode, EncodingKey, Header};
|
|||||||
use log::{debug, error, info, warn};
|
use log::{debug, error, info, warn};
|
||||||
use prometheus::{gather, Encoder, TextEncoder};
|
use prometheus::{gather, Encoder, TextEncoder};
|
||||||
use serde::Serialize;
|
use serde::Serialize;
|
||||||
|
use signal_hook::{consts::SIGQUIT, iterator::Signals};
|
||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use std::time::{Duration, SystemTime, UNIX_EPOCH};
|
use std::time::{Duration, SystemTime, UNIX_EPOCH};
|
||||||
use tokio::net::TcpListener;
|
use tokio::net::TcpListener;
|
||||||
|
use tokio::sync::mpsc;
|
||||||
use tower_http::services::ServeDir;
|
use tower_http::services::ServeDir;
|
||||||
|
|
||||||
#[derive(Serialize, Debug)]
|
#[derive(Serialize, Debug)]
|
||||||
@@ -29,10 +30,10 @@ struct OutToken {
|
|||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
struct AppState {
|
pub(crate) struct AppState {
|
||||||
master_key: Option<String>,
|
master_key: Option<String>,
|
||||||
cert_creds: String,
|
pub(crate) cert_creds: String,
|
||||||
certs_dir: String,
|
pub(crate) certs_dir: String,
|
||||||
upstreams_file: String,
|
upstreams_file: String,
|
||||||
config_sender: Sender<Configuration>,
|
config_sender: Sender<Configuration>,
|
||||||
config_api_enabled: bool,
|
config_api_enabled: bool,
|
||||||
@@ -63,16 +64,49 @@ pub async fn run_server(config: &APIUpstreamProvider, mut to_return: Sender<Conf
|
|||||||
.route("/metrics", get(metrics))
|
.route("/metrics", get(metrics))
|
||||||
.route("/status", get(status))
|
.route("/status", get(status))
|
||||||
.with_state(app_state);
|
.with_state(app_state);
|
||||||
|
|
||||||
|
let mut static_handle: Option<tokio::task::JoinHandle<()>> = None;
|
||||||
if let (Some(address), Some(folder)) = (&config.file_server_address, &config.file_server_folder) {
|
if let (Some(address), Some(folder)) = (&config.file_server_address, &config.file_server_folder) {
|
||||||
|
port_is_available("File Server", &address).await;
|
||||||
let static_files = ServeDir::new(folder);
|
let static_files = ServeDir::new(folder);
|
||||||
let static_serve: Router = Router::new().fallback_service(static_files);
|
let static_serve: Router = Router::new().fallback_service(static_files);
|
||||||
let static_listen = TcpListener::bind(address).await.unwrap();
|
let static_listen = TcpListener::bind(address).await.unwrap();
|
||||||
drop(tokio::spawn(async move { axum::serve(static_listen, static_serve).await.unwrap() }));
|
// drop(tokio::spawn(async move { axum::serve(static_listen, static_serve).await.unwrap() }));
|
||||||
|
static_handle = Some(tokio::spawn(async move { axum::serve(static_listen, static_serve).await.unwrap() }))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
port_is_available("Config API", &config.address).await;
|
||||||
let listener = TcpListener::bind(config.address.clone()).await.unwrap();
|
let listener = TcpListener::bind(config.address.clone()).await.unwrap();
|
||||||
info!("Starting the API server on: {}", config.address);
|
info!("Starting the API server on: {}", config.address);
|
||||||
axum::serve(listener, app).await.unwrap();
|
let api_server = tokio::spawn(async move { axum::serve(listener, app).await.unwrap() });
|
||||||
|
|
||||||
|
let (tx, mut rx) = mpsc::channel(1);
|
||||||
|
std::thread::spawn(move || {
|
||||||
|
let mut signals = Signals::new(&[SIGQUIT]).unwrap();
|
||||||
|
for sig in signals.forever() {
|
||||||
|
tx.blocking_send(sig).unwrap();
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
rx.recv().await; // async wait, yields to tokio properly
|
||||||
|
api_server.abort();
|
||||||
|
if let Some(handle) = static_handle {
|
||||||
|
handle.abort();
|
||||||
|
}
|
||||||
|
info!("Exiting...");
|
||||||
|
|
||||||
|
// port_is_available("Config API", &config.address).await;
|
||||||
|
// let listener = TcpListener::bind(config.address.clone()).await.unwrap();
|
||||||
|
// info!("Starting the API server on: {}", config.address);
|
||||||
|
// axum::serve(listener, app).await.unwrap();
|
||||||
|
// if let (Some(address), Some(folder)) = (&config.file_server_address, &config.file_server_folder) {
|
||||||
|
// port_is_available("File Server", &address).await;
|
||||||
|
// let static_files = ServeDir::new(folder);
|
||||||
|
// let static_serve: Router = Router::new().fallback_service(static_files);
|
||||||
|
// let static_listen = TcpListener::bind(address).await.unwrap();
|
||||||
|
// static_handle = Some(tokio::spawn(async move { axum::serve(static_listen, static_serve).await.unwrap() }))
|
||||||
|
// }
|
||||||
|
//
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn conf(State(st): State<AppState>, Query(params): Query<HashMap<String, String>>, content: String) -> impl IntoResponse {
|
async fn conf(State(st): State<AppState>, Query(params): Query<HashMap<String, String>>, content: String) -> impl IntoResponse {
|
||||||
@@ -206,63 +240,27 @@ async fn status(State(st): State<AppState>, Query(params): Query<HashMap<String,
|
|||||||
.unwrap()
|
.unwrap()
|
||||||
}
|
}
|
||||||
|
|
||||||
#[allow(clippy::needless_return)]
|
pub async fn port_is_available(name: &str, address: &str) {
|
||||||
async fn acme_create(State(state): State<AppState>) -> impl IntoResponse {
|
let addr_port = address.split(":").collect::<Vec<&str>>();
|
||||||
match account::load_or_create(state.cert_creds.as_str()).await {
|
let t = Duration::from_secs(2);
|
||||||
Ok(txt) => {
|
|
||||||
return Response::builder()
|
|
||||||
.status(StatusCode::OK)
|
|
||||||
.header("Content-Type", "text/plain")
|
|
||||||
.body(Body::from(txt))
|
|
||||||
.unwrap()
|
|
||||||
}
|
|
||||||
Err(e) => {
|
|
||||||
return Response::builder()
|
|
||||||
.status(StatusCode::INTERNAL_SERVER_ERROR)
|
|
||||||
.body(Body::from(format!("Failed to create account: {}", e)))
|
|
||||||
.unwrap()
|
|
||||||
}
|
|
||||||
};
|
|
||||||
}
|
|
||||||
#[allow(clippy::needless_return)]
|
|
||||||
async fn acme_order(State(state): State<AppState>, axum::extract::Path(domain): axum::extract::Path<String>) -> impl IntoResponse {
|
|
||||||
let domain_clean = domain.trim_matches('/');
|
|
||||||
match order::order(domain_clean, state.cert_creds.as_str(), state.certs_dir).await {
|
|
||||||
Ok(txt) => {
|
|
||||||
return Response::builder()
|
|
||||||
.status(StatusCode::OK)
|
|
||||||
.header("Content-Type", "text/plain")
|
|
||||||
.body(Body::from(txt))
|
|
||||||
.unwrap()
|
|
||||||
}
|
|
||||||
Err(e) => {
|
|
||||||
return Response::builder()
|
|
||||||
.status(StatusCode::INTERNAL_SERVER_ERROR)
|
|
||||||
.body(Body::from(format!("Failed to order a certificate: {}", e)))
|
|
||||||
.unwrap()
|
|
||||||
}
|
|
||||||
};
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn http01_challenge(axum::extract::Path(token): axum::extract::Path<String>) -> impl IntoResponse {
|
let mut a = addr_port[0];
|
||||||
if let Ok(challenges) = CHALLENGES.read() {
|
if address == "0.0.0.0" {
|
||||||
// for k in challenges.iter() {
|
a = "127.0.0.1";
|
||||||
// println!(" ==> {} : {}", k.0, k.1);
|
}
|
||||||
// }
|
let p = addr_port[1].parse::<u16>().unwrap();
|
||||||
|
|
||||||
if let Some(key_authorization) = challenges.get(&token) {
|
loop {
|
||||||
return Response::builder()
|
match TcpListener::bind((a, p)).await {
|
||||||
.status(StatusCode::OK)
|
Ok(_) => {
|
||||||
.header("Content-Type", "text/plain")
|
break;
|
||||||
.body(Body::from(key_authorization.clone()))
|
}
|
||||||
.unwrap();
|
Err(_) => {
|
||||||
|
warn!("{} port is not available: {} will try again in {:?}", name, p, t);
|
||||||
|
tokio::time::sleep(t).await;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Response::builder()
|
|
||||||
.status(StatusCode::NOT_FOUND)
|
|
||||||
.header("Content-Type", "text/plain")
|
|
||||||
.body(Body::from("Not found"))
|
|
||||||
.unwrap()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// -- ⚝ by Dave -- in NeoVim ⚝ --
|
// -- ⚝ by Dave -- in NeoVim ⚝ --
|
||||||
|
|||||||
Reference in New Issue
Block a user