16 Commits

Author SHA1 Message Date
Ara Sadoyan
e304482667 Optimized healthchecks and config file loading 2025-08-20 14:03:09 +02:00
Ara Sadoyan
f8118f9596 TLS grades change 2025-08-05 19:08:58 +02:00
Ara Sadoyan
f654312466 SSL cipher management 2025-07-29 21:25:27 +02:00
Ara Sadoyan
b44f7069a0 Configurable TLS ciphers 2025-07-27 11:15:49 +02:00
Ara Sadoyan
a44979ec82 Configurable TLS ciphers 2025-07-27 11:13:39 +02:00
Ara Sadoyan
ece4fa20af README 2025-07-24 13:50:15 +02:00
Ara Sadoyan
2ad3a059ab Per path rate limiter 2025-07-24 13:34:15 +02:00
Ara Sadoyan
6f012cee69 Code cleanup 2025-07-22 17:40:58 +02:00
Ara Sadoyan
51c88c8f7c Some structural changes and improvements 2025-07-12 16:17:45 +02:00
Ara Sadoyan
f91bc41103 benchmark image 2025-07-10 17:46:05 +02:00
Ara Sadoyan
21e1276ff5 Readme update 2025-07-09 15:22:38 +02:00
Ara Sadoyan
8463cdabbc Added configurable rate limiter 2025-07-09 15:01:20 +02:00
Ara Sadoyan
d0e4b52ce6 Enable/Disable config API from config 2025-07-04 15:06:05 +02:00
Ara Sadoyan
b552d24497 README 2025-07-02 19:00:05 +02:00
Ara Sadoyan
2e33d692bb Added optional minimal file server 2025-07-02 18:29:14 +02:00
Ara Sadoyan
e586967830 Code cleanup, nothing special 2025-06-30 18:24:25 +02:00
21 changed files with 699 additions and 406 deletions

1
.gitignore vendored
View File

@@ -8,6 +8,7 @@
/target/ /target/
*.iml *.iml
.idea/ .idea/
.etc/
*.ipr *.ipr
*.iws *.iws
/out/ /out/

38
Cargo.lock generated
View File

@@ -128,9 +128,11 @@ dependencies = [
"log", "log",
"mimalloc", "mimalloc",
"notify", "notify",
"once_cell",
"pingora", "pingora",
"pingora-core", "pingora-core",
"pingora-http", "pingora-http",
"pingora-limits",
"pingora-proxy", "pingora-proxy",
"prometheus 0.14.0", "prometheus 0.14.0",
"rand 0.9.1", "rand 0.9.1",
@@ -141,6 +143,7 @@ dependencies = [
"sha2", "sha2",
"tokio", "tokio",
"tonic", "tonic",
"tower-http",
"urlencoding", "urlencoding",
"x509-parser", "x509-parser",
] ]
@@ -1078,6 +1081,12 @@ dependencies = [
"pin-project-lite", "pin-project-lite",
] ]
[[package]]
name = "http-range-header"
version = "0.4.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9171a2ea8a68358193d15dd5d70c1c10a2afc3e7e4c5bc92bc9f025cebd7359c"
[[package]] [[package]]
name = "httparse" name = "httparse"
version = "1.9.5" version = "1.9.5"
@@ -1617,6 +1626,16 @@ version = "0.3.17"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6877bb514081ee2a7ff5ef9de3281f14a4dd4bceac4c09388074a6b5df8a139a" checksum = "6877bb514081ee2a7ff5ef9de3281f14a4dd4bceac4c09388074a6b5df8a139a"
[[package]]
name = "mime_guess"
version = "2.0.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f7c44f8e672c00fe5308fa235f821cb4198414e1c77935c1ab6948d3fd78550e"
dependencies = [
"mime",
"unicase",
]
[[package]] [[package]]
name = "minimal-lexical" name = "minimal-lexical"
version = "0.2.1" version = "0.2.1"
@@ -2068,6 +2087,15 @@ dependencies = [
"crc32fast", "crc32fast",
] ]
[[package]]
name = "pingora-limits"
version = "0.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a719a8cb5558ca06bd6076c97b8905d500ea556da89e132ba53d4272844f95b9"
dependencies = [
"ahash",
]
[[package]] [[package]]
name = "pingora-load-balancing" name = "pingora-load-balancing"
version = "0.5.0" version = "0.5.0"
@@ -3167,14 +3195,24 @@ checksum = "adc82fd73de2a9722ac5da747f12383d2bfdb93591ee6c58486e0097890f05f2"
dependencies = [ dependencies = [
"bitflags 2.8.0", "bitflags 2.8.0",
"bytes", "bytes",
"futures-core",
"futures-util", "futures-util",
"http", "http",
"http-body", "http-body",
"http-body-util",
"http-range-header",
"httpdate",
"iri-string", "iri-string",
"mime",
"mime_guess",
"percent-encoding",
"pin-project-lite", "pin-project-lite",
"tokio",
"tokio-util",
"tower", "tower",
"tower-layer", "tower-layer",
"tower-service", "tower-service",
"tracing",
] ]
[[package]] [[package]]

View File

@@ -19,6 +19,8 @@ dashmap = "7.0.0-rc2"
pingora-core = "0.5.0" pingora-core = "0.5.0"
pingora-proxy = "0.5.0" pingora-proxy = "0.5.0"
pingora-http = "0.5.0" pingora-http = "0.5.0"
pingora-limits = "0.5.0"
#pingora-pool = "0.5.0"
async-trait = "0.1.88" async-trait = "0.1.88"
env_logger = "0.11.8" env_logger = "0.11.8"
log = "0.4.27" log = "0.4.27"
@@ -46,4 +48,8 @@ lazy_static = "1.5.0"
#openssl = "0.10.73" #openssl = "0.10.73"
x509-parser = "0.17.0" x509-parser = "0.17.0"
rustls-pemfile = "2.2.0" rustls-pemfile = "2.2.0"
tower-http = { version = "0.6.6", features = ["fs"] }
once_cell = "1.20.2"
#moka = { version = "0.12.10", features = ["sync"] }

101
README.md
View File

@@ -13,7 +13,11 @@ Built on Rust, on top of **Cloudflares Pingora engine**, **Aralez** delivers
- **Dynamic Config Reloads** — Upstreams can be updated live via API, no restart required. - **Dynamic Config Reloads** — Upstreams can be updated live via API, no restart required.
- **TLS Termination** — Built-in OpenSSL support. - **TLS Termination** — Built-in OpenSSL support.
- **Automatic load of certificates** — Automatically reads and loads certificates from a folder, without a restart.
- **Upstreams TLS detection** — Aralez will automatically detect if upstreams uses secure connection. - **Upstreams TLS detection** — Aralez will automatically detect if upstreams uses secure connection.
- **Built in rate limiter** — Limit requests to server, by setting up upper limit for requests per seconds, per virtualhost.
- **Global rate limiter** — Set rate limit for all virtualhosts.
- **Per path rate limiter** — Set rate limit for specific paths. Path limits will override global limits.
- **Authentication** — Supports Basic Auth, API tokens, and JWT verification. - **Authentication** — Supports Basic Auth, API tokens, and JWT verification.
- **Basic Auth** - **Basic Auth**
- **API Key** via `x-api-key` header - **API Key** via `x-api-key` header
@@ -24,6 +28,7 @@ Built on Rust, on top of **Cloudflares Pingora engine**, **Aralez** delivers
- Failover with health checks - Failover with health checks
- Sticky sessions via cookies - Sticky sessions via cookies
- **Unified Port** — Serve HTTP and WebSocket traffic over the same connection. - **Unified Port** — Serve HTTP and WebSocket traffic over the same connection.
- **Built in file server** — Build in minimalistic file server for serving static files, should be added as upstreams for public access.
- **Memory Safe** — Created purely on Rust. - **Memory Safe** — Created purely on Rust.
- **High Performance** — Built with [Pingora](https://github.com/cloudflare/pingora) and tokio for async I/O. - **High Performance** — Built with [Pingora](https://github.com/cloudflare/pingora) and tokio for async I/O.
@@ -61,28 +66,32 @@ Built on Rust, on top of **Cloudflares Pingora engine**, **Aralez** delivers
### 🔧 `main.yaml` ### 🔧 `main.yaml`
| Key | Example Value | Description | | Key | Example Value | Description |
|----------------------------------|--------------------------------------|--------------------------------------------------------------------------------------------------| |----------------------------------|--------------------------------------|----------------------------------------------------------------------------------------------------|
| **threads** | 12 | Number of running daemon threads. Optional, defaults to 1 | | **threads** | 12 | Number of running daemon threads. Optional, defaults to 1 |
| **user** | aralez | Optional, Username for running aralez after dropping root privileges, requires to launch as root | | **user** | aralez | Optional, Username for running aralez after dropping root privileges, requires to launch as root |
| **group** | aralez | Optional,Group for running aralez after dropping root privileges, requires to launch as root | | **group** | aralez | Optional,Group for running aralez after dropping root privileges, requires to launch as root |
| **daemon** | false | Run in background (boolean) | | **daemon** | false | Run in background (boolean) |
| **upstream_keepalive_pool_size** | 500 | Pool size for upstream keepalive connections | | **upstream_keepalive_pool_size** | 500 | Pool size for upstream keepalive connections |
| **pid_file** | /tmp/aralez.pid | Path to PID file | | **pid_file** | /tmp/aralez.pid | Path to PID file |
| **error_log** | /tmp/aralez_err.log | Path to error log file | | **error_log** | /tmp/aralez_err.log | Path to error log file |
| **upgrade_sock** | /tmp/aralez.sock | Path to live upgrade socket file | | **upgrade_sock** | /tmp/aralez.sock | Path to live upgrade socket file |
| **config_address** | 0.0.0.0:3000 | HTTP API address for pushing upstreams.yaml from remote location | | **config_address** | 0.0.0.0:3000 | HTTP API address for pushing upstreams.yaml from remote location |
| **config_tls_address** | 0.0.0.0:3001 | HTTPS API address for pushing upstreams.yaml from remote location | | **config_tls_address** | 0.0.0.0:3001 | HTTPS API address for pushing upstreams.yaml from remote location |
| **config_tls_certificate** | etc/server.crt | Certificate file path for API. Mandatory if proxy_address_tls is set, else optional | | **config_tls_certificate** | etc/server.crt | Certificate file path for API. Mandatory if proxy_address_tls is set, else optional |
| **config_tls_key_file** | etc/key.pem | Private Key file path. Mandatory if proxy_address_tls is set, else optional | | **proxy_tls_grade** | (high, medium, unsafe) | Grade of TLS ciphers, for easy configuration. High matches Qualys SSL Labs A+ (defaults to medium) |
| **proxy_address_http** | 0.0.0.0:6193 | Aralez HTTP bind address | | **config_tls_key_file** | etc/key.pem | Private Key file path. Mandatory if proxy_address_tls is set, else optional |
| **proxy_address_tls** | 0.0.0.0:6194 | Aralez HTTPS bind address (Optional) | | **proxy_address_http** | 0.0.0.0:6193 | Aralez HTTP bind address |
| **proxy_certificates** | etc/certs/ | The directory containing certificate and key files. In a format {NAME}.crt, {NAME}.key. | | **proxy_address_tls** | 0.0.0.0:6194 | Aralez HTTPS bind address (Optional) |
| **upstreams_conf** | etc/upstreams.yaml | The location of upstreams file | | **proxy_certificates** | etc/certs/ | The directory containing certificate and key files. In a format {NAME}.crt, {NAME}.key. |
| **log_level** | info | Log level , possible values : info, warn, error, debug, trace, off | | **upstreams_conf** | etc/upstreams.yaml | The location of upstreams file |
| **hc_method** | HEAD | Healthcheck method (HEAD, GET, POST are supported) UPPERCASE | | **log_level** | info | Log level , possible values : info, warn, error, debug, trace, off |
| **hc_interval** | 2 | Interval for health checks in seconds | | **hc_method** | HEAD | Healthcheck method (HEAD, GET, POST are supported) UPPERCASE |
| **master_key** | 5aeff7f9-7b94-447c-af60-e8c488544a3e | Master key for working with API server and JWT Secret generation | | **hc_interval** | 2 | Interval for health checks in seconds |
| **master_key** | 5aeff7f9-7b94-447c-af60-e8c488544a3e | Master key for working with API server and JWT Secret generation |
| **file_server_folder** | /some/local/folder | Optional, local folder to serve |
| **file_server_address** | 127.0.0.1:3002 | Optional, Local address for file server. Can set as upstream for public access |
| **config_api_enabled** | true | Boolean to enable/disable remote config push capability |
### 🌐 `upstreams.yaml` ### 🌐 `upstreams.yaml`
@@ -110,6 +119,24 @@ File names:
| `aralez-aarch64-musl.gz` | Static Linux ARM64 binary, without any system dependency | | `aralez-aarch64-musl.gz` | Static Linux ARM64 binary, without any system dependency |
| `aralez-aarch64-glibc.gz` | Dynamic Linux ARM64 binary, with minimal system dependencies | | `aralez-aarch64-glibc.gz` | Dynamic Linux ARM64 binary, with minimal system dependencies |
## 💡 Note
In general **glibc** builds are working faster, but have few, basic, system dependencies for example :
```
linux-vdso.so.1 (0x00007ffeea33b000)
libgcc_s.so.1 => /lib/x86_64-linux-gnu/libgcc_s.so.1 (0x00007f09e7377000)
libm.so.6 => /lib/x86_64-linux-gnu/libm.so.6 (0x00007f09e6320000)
libc.so.6 => /lib/x86_64-linux-gnu/libc.so.6 (0x00007f09e613f000)
/lib64/ld-linux-x86-64.so.2 (0x00007f09e73b1000)
```
These are common to any Linux systems, so the binary should work on almost any Linux system.
**musl** builds are 100% portable, static compiled binaries and have zero system depencecies.
In general musl builds have a little less performance.
The most intensive tests shows 107k-110k requests per second on **Glibc** binaries against 97k-100k **Musl** ones.
## 🔌 Running the Proxy ## 🔌 Running the Proxy
```bash ```bash
@@ -142,6 +169,7 @@ A sample `upstreams.yaml` entry:
provider: "file" provider: "file"
sticky_sessions: false sticky_sessions: false
to_https: false to_https: false
rate_limit: 10
headers: headers:
- "Access-Control-Allow-Origin:*" - "Access-Control-Allow-Origin:*"
- "Access-Control-Allow-Methods:POST, GET, OPTIONS" - "Access-Control-Allow-Methods:POST, GET, OPTIONS"
@@ -152,6 +180,7 @@ authorization:
myhost.mydomain.com: myhost.mydomain.com:
paths: paths:
"/": "/":
rate_limit: 20
to_https: false to_https: false
headers: headers:
- "X-Some-Thing:Yaaaaaaaaaaaaaaa" - "X-Some-Thing:Yaaaaaaaaaaaaaaa"
@@ -172,13 +201,18 @@ myhost.mydomain.com:
- Sticky sessions are disabled globally. This setting applies to all upstreams. If enabled all requests will be 301 redirected to HTTPS. - Sticky sessions are disabled globally. This setting applies to all upstreams. If enabled all requests will be 301 redirected to HTTPS.
- HTTP to HTTPS redirect disabled globally, but can be overridden by `to_https` setting per upstream. - HTTP to HTTPS redirect disabled globally, but can be overridden by `to_https` setting per upstream.
- Requests to each hosted domains will be limited to 10 requests per second per virtualhost.
- Requests limits are calculated per requester ip plus requested virtualhost.
- If the requester exceeds the limit it will receive `429 Too Many Requests` error.
- Optional. Rate limiter will be disabled if the parameter is entirely removed from config.
- Requests to `myhost.mydomain.com/` will be limited to 20 requests per second.
- Requests to `myhost.mydomain.com/` will be proxied to `127.0.0.1` and `127.0.0.2`. - Requests to `myhost.mydomain.com/` will be proxied to `127.0.0.1` and `127.0.0.2`.
- Plain HTTP to `myhost.mydomain.com/foo` will get 301 redirect to configured TLS port of Aralez. - Plain HTTP to `myhost.mydomain.com/foo` will get 301 redirect to configured TLS port of Aralez.
- Requests to `myhost.mydomain.com/foo` will be proxied to `127.0.0.4` and `127.0.0.5`. - Requests to `myhost.mydomain.com/foo` will be proxied to `127.0.0.4` and `127.0.0.5`.
- SSL/TLS for upstreams is detected automatically, no need to set any config parameter. - SSL/TLS for upstreams is detected automatically, no need to set any config parameter.
- Assuming the `127.0.0.5:8443` is SSL protected. The inner traffic will use TLS. - Assuming the `127.0.0.5:8443` is SSL protected. The inner traffic will use TLS.
- Self signed certificates are silently accepted. - Self-signed certificates are silently accepted.
- Global headers (CORS for this case) will be injected to all upstreams - Global headers (CORS for this case) will be injected to all upstreams.
- Additional headers will be injected into the request for `myhost.mydomain.com`. - Additional headers will be injected into the request for `myhost.mydomain.com`.
- You can choose any path, deep nested paths are supported, the best match chosen. - You can choose any path, deep nested paths are supported, the best match chosen.
- All requests to servers will require JWT token authentication (You can comment out the authorization to disable it), - All requests to servers will require JWT token authentication (You can comment out the authorization to disable it),
@@ -442,4 +476,21 @@ Error distribution:
[228] aborted due to deadline [228] aborted due to deadline
``` ```
![Aralez](https://netangels.net/utils/musl10.png) ![Aralez](https://netangels.net/utils/musl10.png)
## 🚀 Aralez, Nginx, Traefik performance benchmark
This benchmark is done on 4 servers. With CPU Intel(R) Xeon(R) E-2174G CPU @ 3.80GHz, 64 GB RAM.
1. Sever runs Aralez, Traefik, Nginx on different ports. Tuned as much as I could .
2. 3x Upstreams servers, running Nginx. Replying with dummy json hardcoded in config file for max performance.
All servers are connected to the same switch with 1GB port in datacenter , not a home lab. The results:
![Aralez](https://raw.githubusercontent.com/sadoyan/aralez/refs/heads/main/assets/bench.png)
The results show requests per second performed by Load balancer. You can see 3 batches with 800 concurrent users.
1. Requests via http1.1 to plain text endpoint.
2. Requests to via http2 to SSL endpoint.
3. Mixed workload with plain http1.1 and htt2 SSL.

BIN
assets/bench.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 160 KiB

View File

@@ -1,5 +1,5 @@
# Main configuration file , applied on startup # Main configuration file, applied on startup
threads: 12 # Nubber of daemon threads default setting threads: 12 # Number of daemon threads default setting
#user: pastor # Username for running aralez after dropping root privileges, requires program to start as root #user: pastor # Username for running aralez after dropping root privileges, requires program to start as root
#group: pastor # Group for running aralez after dropping root privileges, requires program to start as root #group: pastor # Group for running aralez after dropping root privileges, requires program to start as root
daemon: false # Run in background daemon: false # Run in background
@@ -7,14 +7,18 @@ upstream_keepalive_pool_size: 500 # Pool size for upstream keepalive connections
pid_file: /tmp/aralez.pid # Path to PID file pid_file: /tmp/aralez.pid # Path to PID file
error_log: /tmp/aralez_err.log # Path to error log error_log: /tmp/aralez_err.log # Path to error log
upgrade_sock: /tmp/aralez.sock # Path to socket file upgrade_sock: /tmp/aralez.sock # Path to socket file
config_api_enabled: true # Boolean to enable/disable remote config push capability.
config_address: 0.0.0.0:3000 # HTTP API address for pushing upstreams.yaml from remote location config_address: 0.0.0.0:3000 # HTTP API address for pushing upstreams.yaml from remote location
config_tls_address: 0.0.0.0:3001 # HTTP TLS API address for pushing upstreams.yaml from remote location config_tls_address: 0.0.0.0:3001 # HTTP TLS API address for pushing upstreams.yaml from remote location
config_tls_certificate: etc/server.crt # Mandatory if config_tls_address is set config_tls_certificate: /etc/server.crt # Mandatory if config_tls_address is set
config_tls_key_file: etc/key.pem # Mandatory if config_tls_address is set config_tls_key_file: /etc/key.pem # Mandatory if config_tls_address is set
proxy_address_http: 0.0.0.0:6193 # Proxy HTTP bind address proxy_address_http: 0.0.0.0:6193 # Proxy HTTP bind address
proxy_address_tls: 0.0.0.0:6194 # Optional, Proxy TLS bind address proxy_address_tls: 0.0.0.0:6194 # Optional, Proxy TLS bind address
proxy_certificates: etc/yoyo # Mandatory if proxy_address_tls set, should contain certificate and key files strictly in a format {NAME}.crt, {NAME}.key. proxy_certificates: /etc/yoyo # Mandatory if proxy_address_tls set, should contain a certificate and key files strictly in a format {NAME}.crt, {NAME}.key.
upstreams_conf: etc/upstreams.yaml # the location of upstreams file proxy_tls_grade: a+ # Grade of TLS suite for proxy (a+, a, b, c, unsafe), matching grades of Qualys SSL Labs
upstreams_conf: /etc/upstreams.yaml # the location of upstreams file
file_server_folder: /opt/storage # Optional, local folder to serve
file_server_address: 127.0.0.1:3002 # Optional, Local address for file server. Can set as upstream for public access.
log_level: info # info, warn, error, debug, trace, off log_level: info # info, warn, error, debug, trace, off
hc_method: HEAD # Healthcheck method (HEAD, GET, POST are supported) UPPERCASE hc_method: HEAD # Healthcheck method (HEAD, GET, POST are supported) UPPERCASE
hc_interval: 2 #Interval for health checks in seconds hc_interval: 2 #Interval for health checks in seconds

View File

@@ -2,6 +2,7 @@
provider: "file" # consul provider: "file" # consul
sticky_sessions: false sticky_sessions: false
to_ssl: false to_ssl: false
#rate_limit: 100
headers: headers:
- "Access-Control-Allow-Origin:*" - "Access-Control-Allow-Origin:*"
- "Access-Control-Allow-Methods:POST, GET, OPTIONS" - "Access-Control-Allow-Methods:POST, GET, OPTIONS"
@@ -16,9 +17,9 @@ authorization:
# creds: "5ecbf799-1343-4e94-a9b5-e278af5cd313-56b45249-1839-4008-a450-a60dc76d2bae" # creds: "5ecbf799-1343-4e94-a9b5-e278af5cd313-56b45249-1839-4008-a450-a60dc76d2bae"
consul: # If the provider is consul. Otherwise, ignored. consul: # If the provider is consul. Otherwise, ignored.
servers: servers:
- "http://master1:8500" - "http://consul1:8500"
- "http://192.168.22.1:8500" - "http://consul2:8500"
- "http://master1.foo.local:8500" - "http://consul3:8500"
services: # proxy: The hostname to access the proxy server, real : The real service name in Consul database. services: # proxy: The hostname to access the proxy server, real : The real service name in Consul database.
- proxy: "proxy-frontend-dev-frontend-srv" - proxy: "proxy-frontend-dev-frontend-srv"
real: "frontend-dev-frontend-srv" real: "frontend-dev-frontend-srv"
@@ -26,10 +27,11 @@ consul: # If the provider is consul. Otherwise, ignored.
upstreams: upstreams:
myip.mydomain.com: myip.mydomain.com:
paths: paths:
rate_limit: 10 # Per path rate limit have higher priority than global rate limit. If not set, the global rate limit will be used
"/": "/":
to_https: false to_https: false
headers: headers:
- "X-Proxy-From:Gazan" - "X-Proxy-From:Aralez"
servers: # List of upstreams HOST:PORT servers: # List of upstreams HOST:PORT
- "127.0.0.1:8000" - "127.0.0.1:8000"
- "127.0.0.2:8000" - "127.0.0.2:8000"
@@ -39,7 +41,7 @@ upstreams:
to_https: true to_https: true
headers: headers:
- "X-Some-Thing:Yaaaaaaaaaaaaaaa" - "X-Some-Thing:Yaaaaaaaaaaaaaaa"
- "X-Proxy-From:Gazan" - "X-Proxy-From:Aralez"
servers: servers:
- "127.0.0.1:8000" - "127.0.0.1:8000"
- "127.0.0.2:8000" - "127.0.0.2:8000"

View File

@@ -1,5 +1,5 @@
use crate::utils::parceyaml::load_configuration; use crate::utils::parceyaml::load_configuration;
use crate::utils::structs::{Configuration, ServiceMapping, UpstreamsDashMap}; use crate::utils::structs::{Configuration, InnerMap, ServiceMapping, UpstreamsDashMap};
use crate::utils::tools::{clone_dashmap_into, compare_dashmaps}; use crate::utils::tools::{clone_dashmap_into, compare_dashmaps};
use dashmap::DashMap; use dashmap::DashMap;
use futures::channel::mpsc::Sender; use futures::channel::mpsc::Sender;
@@ -28,7 +28,7 @@ struct TaggedAddress {
} }
pub async fn start(fp: String, mut toreturn: Sender<Configuration>) { pub async fn start(fp: String, mut toreturn: Sender<Configuration>) {
let config = load_configuration(fp.as_str(), "filepath"); let config = load_configuration(fp.as_str(), "filepath").await;
let headers = DashMap::new(); let headers = DashMap::new();
match config { match config {
Some(config) => { Some(config) => {
@@ -109,7 +109,7 @@ async fn consul_request(url: String, whitelist: Option<Vec<ServiceMapping>>, tok
Some(upstreams) Some(upstreams)
} }
async fn get_by_http(url: String, token: Option<String>) -> Option<DashMap<String, (Vec<(String, u16, bool, bool, bool)>, AtomicUsize)>> { async fn get_by_http(url: String, token: Option<String>) -> Option<DashMap<String, (Vec<InnerMap>, AtomicUsize)>> {
let client = reqwest::Client::new(); let client = reqwest::Client::new();
let mut headers = HeaderMap::new(); let mut headers = HeaderMap::new();
if let Some(token) = token { if let Some(token) = token {
@@ -118,7 +118,7 @@ async fn get_by_http(url: String, token: Option<String>) -> Option<DashMap<Strin
let to = Duration::from_secs(1); let to = Duration::from_secs(1);
let u = client.get(url).timeout(to).send(); let u = client.get(url).timeout(to).send();
let mut values = Vec::new(); let mut values = Vec::new();
let upstreams: DashMap<String, (Vec<(String, u16, bool, bool, bool)>, AtomicUsize)> = DashMap::new(); let upstreams: DashMap<String, (Vec<InnerMap>, AtomicUsize)> = DashMap::new();
match u.await { match u.await {
Ok(r) => { Ok(r) => {
let jason = r.json::<Vec<Service>>().await; let jason = r.json::<Vec<Service>>().await;
@@ -127,7 +127,14 @@ async fn get_by_http(url: String, token: Option<String>) -> Option<DashMap<Strin
for service in whitelist { for service in whitelist {
let addr = service.tagged_addresses.get("lan_ipv4").unwrap().address.clone(); let addr = service.tagged_addresses.get("lan_ipv4").unwrap().address.clone();
let prt = service.tagged_addresses.get("lan_ipv4").unwrap().port.clone(); let prt = service.tagged_addresses.get("lan_ipv4").unwrap().port.clone();
let to_add = (addr, prt, false, false, false); let to_add = InnerMap {
address: addr,
port: prt,
is_ssl: false,
is_http2: false,
to_https: false,
rate_limit: None,
};
values.push(to_add); values.push(to_add);
} }
} }

View File

@@ -9,11 +9,14 @@ pub struct FromFileProvider {
pub path: String, pub path: String,
} }
pub struct APIUpstreamProvider { pub struct APIUpstreamProvider {
pub config_api_enabled: bool,
pub address: String, pub address: String,
pub masterkey: String, pub masterkey: String,
pub tls_address: Option<String>, pub tls_address: Option<String>,
pub tls_certificate: Option<String>, pub tls_certificate: Option<String>,
pub tls_key_file: Option<String>, pub tls_key_file: Option<String>,
pub file_server_address: Option<String>,
pub file_server_folder: Option<String>,
} }
pub struct ConsulProvider { pub struct ConsulProvider {

View File

@@ -15,7 +15,7 @@ pub async fn start(fp: String, mut toreturn: Sender<Configuration>) {
let file_path = fp.as_str(); let file_path = fp.as_str();
let parent_dir = Path::new(file_path).parent().unwrap(); let parent_dir = Path::new(file_path).parent().unwrap();
let (local_tx, mut local_rx) = tokio::sync::mpsc::channel::<notify::Result<Event>>(1); let (local_tx, mut local_rx) = tokio::sync::mpsc::channel::<notify::Result<Event>>(1);
let snd = load_configuration(file_path, "filepath"); let snd = load_configuration(file_path, "filepath").await;
match snd { match snd {
Some(snd) => { Some(snd) => {
@@ -53,7 +53,7 @@ pub async fn start(fp: String, mut toreturn: Sender<Configuration>) {
if start.elapsed() > Duration::from_secs(2) { if start.elapsed() > Duration::from_secs(2) {
start = Instant::now(); start = Instant::now();
// info!("Config File changed :=> {:?}", e); // info!("Config File changed :=> {:?}", e);
let snd = load_configuration(file_path, "filepath"); let snd = load_configuration(file_path, "filepath").await;
match snd { match snd {
Some(snd) => { Some(snd) => {
toreturn.send(snd).await.unwrap(); toreturn.send(snd).await.unwrap();

View File

@@ -1,7 +1,7 @@
use crate::utils::structs::{UpstreamsDashMap, UpstreamsIdMap}; use crate::utils::structs::{InnerMap, UpstreamsDashMap, UpstreamsIdMap};
use crate::utils::tools::*; use crate::utils::tools::*;
use dashmap::DashMap; use dashmap::DashMap;
use log::{error, info, warn}; use log::{error, warn};
use reqwest::{Client, Version}; use reqwest::{Client, Version};
use std::sync::atomic::AtomicUsize; use std::sync::atomic::AtomicUsize;
use std::sync::Arc; use std::sync::Arc;
@@ -11,113 +11,96 @@ use tonic::transport::Endpoint;
pub async fn hc2(upslist: Arc<UpstreamsDashMap>, fullist: Arc<UpstreamsDashMap>, idlist: Arc<UpstreamsIdMap>, params: (&str, u64)) { pub async fn hc2(upslist: Arc<UpstreamsDashMap>, fullist: Arc<UpstreamsDashMap>, idlist: Arc<UpstreamsIdMap>, params: (&str, u64)) {
let mut period = interval(Duration::from_secs(params.1)); let mut period = interval(Duration::from_secs(params.1));
let mut first_run = 0; let client = Client::builder().timeout(Duration::from_secs(params.1)).danger_accept_invalid_certs(true).build().unwrap();
loop { loop {
tokio::select! { tokio::select! {
_ = period.tick() => { _ = period.tick() => {
let totest : UpstreamsDashMap = DashMap::new(); populate_upstreams(&upslist, &fullist, &idlist, params, &client).await;
let fclone : UpstreamsDashMap = clone_dashmap(&fullist);
for val in fclone.iter() {
let host = val.key();
let inner = DashMap::new();
let mut _scheme: (String, u16, bool, bool, bool) = ("".to_string(), 0, false, false, false);
for path_entry in val.value().iter() {
// let inner = DashMap::new();
let path = path_entry.key();
let mut innervec= Vec::new();
for k in path_entry.value().0 .iter().enumerate() {
let (ip, port, _ssl, _version, _redir) = k.1;
let mut _link = String::new();
let tls = detect_tls(ip, port).await;
let mut is_h2 = false;
// if tls.1 == Some(Version::HTTP_11) {
// println!(" V1: ==> {:?}", tls.1)
// }else if tls.1 == Some(Version::HTTP_2) {
// is_h2 = true;
// println!(" V2: ==> {:?}", tls.1)
// }
if tls.1 == Some(Version::HTTP_2) {
is_h2 = true;
// println!(" V2: ==> {} ==> {:?}", tls.0, tls.1)
}
match tls.0 {
true => _link = format!("https://{}:{}{}", ip, port, path),
false => _link = format!("http://{}:{}{}", ip, port, path),
}
// if _pref == "https://" {
// _scheme = (ip.to_string(), *port, true);
// }else {
// _scheme = (ip.to_string(), *port, false);
// }
_scheme = (ip.to_string(), *port, tls.0, is_h2, *_redir);
// let link = format!("{}{}:{}{}", _pref, ip, port, path);
let resp = http_request(_link.as_str(), params.0, "").await;
match resp.0 {
true => {
if resp.1 {
_scheme = (ip.to_string(), *port, tls.0, true, *_redir);
}
innervec.push(_scheme.clone());
}
false => {
warn!("Dead Upstream : {}", _link);
}
}
}
inner.insert(path.clone().to_owned(), (innervec, AtomicUsize::new(0)));
}
totest.insert(host.clone(), inner);
}
if first_run == 1 {
info!("Performing initial hatchecks and upstreams ssl detection");
clone_idmap_into(&totest, &idlist);
info!("Aralez is up and ready to serve requests, the upstreams list is:");
print_upstreams(&totest)
}
first_run+=1;
if ! compare_dashmaps(&totest, &upslist){
clone_dashmap_into(&totest, &upslist);
clone_idmap_into(&totest, &idlist);
}
} }
} }
} }
} }
#[allow(dead_code)] pub async fn populate_upstreams(upslist: &Arc<UpstreamsDashMap>, fullist: &Arc<UpstreamsDashMap>, idlist: &Arc<UpstreamsIdMap>, params: (&str, u64), client: &Client) {
async fn http_request(url: &str, method: &str, payload: &str) -> (bool, bool) { let totest = build_upstreams(fullist, params.0, client).await;
let client = Client::builder().danger_accept_invalid_certs(true).build().unwrap(); if !compare_dashmaps(&totest, upslist) {
let timeout = Duration::from_secs(1); clone_dashmap_into(&totest, upslist);
clone_idmap_into(&totest, idlist);
}
}
pub async fn initiate_upstreams(fullist: UpstreamsDashMap) -> UpstreamsDashMap {
let client = Client::builder().timeout(Duration::from_secs(2)).danger_accept_invalid_certs(true).build().unwrap();
build_upstreams(&fullist, "HEAD", &client).await
}
async fn build_upstreams(fullist: &UpstreamsDashMap, method: &str, client: &Client) -> UpstreamsDashMap {
let totest: UpstreamsDashMap = DashMap::new();
let fclone = clone_dashmap(fullist);
for val in fclone.iter() {
let host = val.key();
let inner = DashMap::new();
for path_entry in val.value().iter() {
let path = path_entry.key();
let mut innervec = Vec::new();
for (_, upstream) in path_entry.value().0.iter().enumerate() {
let tls = detect_tls(upstream.address.as_str(), &upstream.port, &client).await;
let is_h2 = matches!(tls.1, Some(Version::HTTP_2));
let link = if tls.0 {
format!("https://{}:{}{}", upstream.address, upstream.port, path)
} else {
format!("http://{}:{}{}", upstream.address, upstream.port, path)
};
let mut scheme = InnerMap {
address: upstream.address.clone(),
port: upstream.port,
is_ssl: tls.0,
is_http2: is_h2,
to_https: upstream.to_https,
rate_limit: upstream.rate_limit,
};
let resp = http_request(&link, method, "", &client).await;
if resp.0 {
if resp.1 {
scheme.is_http2 = is_h2; // could be adjusted further
}
innervec.push(scheme);
} else {
warn!("Dead Upstream : {}", link);
}
}
inner.insert(path.clone(), (innervec, AtomicUsize::new(0)));
}
totest.insert(host.clone(), inner);
}
totest
}
async fn http_request(url: &str, method: &str, payload: &str, client: &Client) -> (bool, bool) {
if !["POST", "GET", "HEAD"].contains(&method) { if !["POST", "GET", "HEAD"].contains(&method) {
error!("Method {} not supported. Only GET|POST|HEAD are supported ", method); error!("Method {} not supported. Only GET|POST|HEAD are supported ", method);
return (false, false); return (false, false);
} }
async fn send_request(client: &Client, method: &str, url: &str, payload: &str, timeout: Duration) -> Option<reqwest::Response> { async fn send_request(client: &Client, method: &str, url: &str, payload: &str) -> Option<reqwest::Response> {
match method { match method {
"POST" => client.post(url).body(payload.to_owned()).timeout(timeout).send().await.ok(), "POST" => client.post(url).body(payload.to_owned()).send().await.ok(),
"GET" => client.get(url).timeout(timeout).send().await.ok(), "GET" => client.get(url).send().await.ok(),
"HEAD" => client.head(url).timeout(timeout).send().await.ok(), "HEAD" => client.head(url).send().await.ok(),
_ => None, _ => None,
} }
} }
match send_request(&client, method, url, payload, timeout).await { match send_request(&client, method, url, payload).await {
Some(response) => { Some(response) => {
let status = response.status().as_u16(); let status = response.status().as_u16();
((99..499).contains(&status), false) ((99..499).contains(&status), false)
} }
None => { None => (ping_grpc(&url).await, true),
// let fallback_url = url.replace("https", "http");
// ping_grpc(&fallback_url).await
(ping_grpc(&url).await, true)
}
} }
} }
@@ -128,10 +111,7 @@ pub async fn ping_grpc(addr: &str) -> bool {
let endpoint = endpoint.timeout(Duration::from_secs(2)); let endpoint = endpoint.timeout(Duration::from_secs(2));
match tokio::time::timeout(Duration::from_secs(3), endpoint.connect()).await { match tokio::time::timeout(Duration::from_secs(3), endpoint.connect()).await {
Ok(Ok(_channel)) => { Ok(Ok(_channel)) => true,
// println!("{:?} ==> {:?} ==> {}", endpoint, _channel, addr);
true
}
_ => false, _ => false,
} }
} else { } else {
@@ -139,15 +119,24 @@ pub async fn ping_grpc(addr: &str) -> bool {
} }
} }
async fn detect_tls(ip: &str, port: &u16) -> (bool, Option<Version>) { async fn detect_tls(ip: &str, port: &u16, client: &Client) -> (bool, Option<Version>) {
let url = format!("https://{}:{}", ip, port); let https_url = format!("https://{}:{}", ip, port);
// let url = format!("{}:{}", ip, port); match client.get(&https_url).send().await {
let client = Client::builder().timeout(Duration::from_secs(2)).danger_accept_invalid_certs(true).build().unwrap(); Ok(response) => {
match client.get(&url).send().await { // println!("{} => {:?} (HTTPS)", https_url, response.version());
Ok(response) => (true, Some(response.version())), return (true, Some(response.version()));
Err(e) => { }
if e.is_builder() || e.is_connect() || e.to_string().contains("tls") { _ => {}
(false, None) }
let http_url = format!("http://{}:{}", ip, port);
match client.get(&http_url).send().await {
Ok(response) => {
// println!("{} => {:?} (HTTP)", http_url, response.version());
(false, Some(response.version()))
}
Err(_) => {
if ping_grpc(&http_url).await {
(false, Some(Version::HTTP_2))
} else { } else {
(false, None) (false, None)
} }

View File

@@ -61,27 +61,3 @@ pub fn calc_metrics(metric_types: &MetricTypes) {
REQUESTS_BY_METHOD.with_label_values(&[&metric_types.method]).inc(); REQUESTS_BY_METHOD.with_label_values(&[&metric_types.method]).inc();
RESPONSE_LATENCY.observe(metric_types.latency.as_secs_f64()); RESPONSE_LATENCY.observe(metric_types.latency.as_secs_f64());
} }
/*
pub fn calc_metrics(method: String, code: u16, latency: Duration) {
REQUEST_COUNT.inc();
let timer = REQUEST_LATENCY.start_timer();
timer.observe_duration();
RESPONSE_CODES.with_label_values(&[&code.to_string()]).inc();
REQUESTS_BY_METHOD.with_label_values(&[&method]).inc();
RESPONSE_LATENCY.observe(latency.as_secs_f64());
}
tokio::spawn(async move {
let mut interval = tokio::time::interval(std::time::Duration::from_secs(5));
loop {
interval.tick().await;
// read Pingora stats
let stats = pingora.get_stats();
// update Prometheus metrics accordingly
REQUEST_COUNT.set(stats.requests_total);
// ... etc
}
});
*/

View File

@@ -1,139 +1,158 @@
use crate::utils::healthcheck;
use crate::utils::structs::*; use crate::utils::structs::*;
use crate::utils::tools::{clone_dashmap, clone_dashmap_into, print_upstreams};
use dashmap::DashMap; use dashmap::DashMap;
use log::{error, info, warn}; use log::{error, info, warn};
use serde_yaml::Error;
use std::collections::HashMap; use std::collections::HashMap;
use std::fs;
use std::sync::atomic::AtomicUsize; use std::sync::atomic::AtomicUsize;
// use std::sync::mpsc::{channel, Receiver, Sender};
use std::{env, fs};
// use tokio::sync::oneshot::{Receiver, Sender};
pub fn load_configuration(d: &str, kind: &str) -> Option<Configuration> { pub async fn load_configuration(d: &str, kind: &str) -> Option<Configuration> {
let mut toreturn: Configuration = Configuration { let yaml_data = match kind {
upstreams: Default::default(), "filepath" => match fs::read_to_string(d) {
headers: Default::default(), Ok(data) => {
consul: None, info!("Reading upstreams from {}", d);
typecfg: "".to_string(), data
extraparams: Extraparams { }
sticky_sessions: false, Err(e) => {
to_https: None, error!("Reading: {}: {:?}", d, e);
authentication: DashMap::new(), warn!("Running with empty upstreams list, update it via API");
return None;
}
}, },
};
toreturn.upstreams = UpstreamsDashMap::new();
toreturn.headers = Headers::new();
let mut yaml_data = d.to_string();
match kind {
"filepath" => {
let _ = match fs::read_to_string(d) {
Ok(data) => {
info!("Reading upstreams from {}", d);
yaml_data = data
}
Err(e) => {
error!("Reading: {}: {:?}", d, e.to_string());
warn!("Running with empty upstreams list, update it via API");
return None;
}
};
}
"content" => { "content" => {
info!("Reading upstreams from API post body"); info!("Reading upstreams from API post body");
d.to_string()
} }
_ => error!("Mismatched parameter, only filepath|content is allowed "), _ => {
} error!("Mismatched parameter, only filepath|content is allowed");
return None;
let p: Result<Config, Error> = serde_yaml::from_str(&yaml_data);
match p {
Ok(parsed) => {
let global_headers = DashMap::new();
let mut hl = Vec::new();
if let Some(headers) = &parsed.headers {
for header in headers.iter() {
if let Some((key, val)) = header.split_once(':') {
hl.push((key.to_string(), val.to_string()));
}
}
global_headers.insert("/".to_string(), hl);
toreturn.headers.insert("GLOBAL_HEADERS".to_string(), global_headers);
toreturn.extraparams.sticky_sessions = parsed.sticky_sessions;
toreturn.extraparams.to_https = parsed.to_https;
}
if let Some(auth) = &parsed.authorization {
let name = auth.get("type").unwrap().to_string();
let creds = auth.get("creds").unwrap().to_string();
let val: Vec<String> = vec![name, creds];
toreturn.extraparams.authentication.insert("authorization".to_string(), val);
} else {
toreturn.extraparams.authentication = DashMap::new();
}
match parsed.provider.as_str() {
"file" => {
toreturn.typecfg = "file".to_string();
if let Some(upstream) = parsed.upstreams {
for (hostname, host_config) in upstream {
let path_map = DashMap::new();
let header_list = DashMap::new();
for (path, path_config) in host_config.paths {
let mut server_list = Vec::new();
let mut hl = Vec::new();
if let Some(headers) = &path_config.headers {
for header in headers.iter().by_ref() {
if let Some((key, val)) = header.split_once(':') {
hl.push((key.to_string(), val.to_string()));
}
}
}
header_list.insert(path.clone(), hl);
for server in path_config.servers {
if let Some((ip, port_str)) = server.split_once(':') {
if let Ok(port) = port_str.parse::<u16>() {
// let to_https = matches!(path_config.to_https, Some(true));
let to_https = path_config.to_https.unwrap_or(false);
server_list.push((ip.to_string(), port, true, false, to_https));
}
}
}
path_map.insert(path, (server_list, AtomicUsize::new(0)));
}
toreturn.headers.insert(hostname.clone(), header_list);
toreturn.upstreams.insert(hostname, path_map);
}
}
Some(toreturn)
}
"consul" => {
toreturn.typecfg = "consul".to_string();
let consul = parsed.consul;
match consul {
Some(consul) => {
toreturn.consul = Some(consul);
Some(toreturn)
}
None => None,
}
}
"kubernetes" => None,
_ => {
warn!("Unknown provider {}", parsed.provider);
None
}
}
} }
};
let parsed: Config = match serde_yaml::from_str(&yaml_data) {
Ok(cfg) => cfg,
Err(e) => { Err(e) => {
error!("Failed to parse upstreams file: {}", e); error!("Failed to parse upstreams file: {}", e);
return None;
}
};
let mut toreturn = Configuration::default();
populate_headers_and_auth(&mut toreturn, &parsed).await;
toreturn.typecfg = parsed.provider.clone();
match parsed.provider.as_str() {
"file" => {
populate_file_upstreams(&mut toreturn, &parsed).await;
Some(toreturn)
}
"consul" => {
toreturn.consul = parsed.consul;
if toreturn.consul.is_some() {
Some(toreturn)
} else {
None
}
}
"kubernetes" => None,
_ => {
warn!("Unknown provider {}", parsed.provider);
None None
} }
} }
} }
async fn populate_headers_and_auth(config: &mut Configuration, parsed: &Config) {
if let Some(headers) = &parsed.headers {
let mut hl = Vec::new();
for header in headers {
if let Some((key, val)) = header.split_once(':') {
hl.push((key.trim().to_string(), val.trim().to_string()));
}
}
let global_headers = DashMap::new();
global_headers.insert("/".to_string(), hl);
config.headers.insert("GLOBAL_HEADERS".to_string(), global_headers);
}
config.extraparams.sticky_sessions = parsed.sticky_sessions;
config.extraparams.to_https = parsed.to_https;
config.extraparams.rate_limit = parsed.rate_limit;
if let Some(rate) = &parsed.rate_limit {
info!("Applied Global Rate Limit : {} request per second", rate);
}
if let Some(auth) = &parsed.authorization {
let name = auth.get("type").unwrap_or(&"".to_string()).to_string();
let creds = auth.get("creds").unwrap_or(&"".to_string()).to_string();
config.extraparams.authentication.insert("authorization".to_string(), vec![name, creds]);
} else {
config.extraparams.authentication = DashMap::new();
}
}
async fn populate_file_upstreams(config: &mut Configuration, parsed: &Config) {
let imtdashmap = UpstreamsDashMap::new();
if let Some(upstreams) = &parsed.upstreams {
for (hostname, host_config) in upstreams {
let path_map = DashMap::new();
let header_list = DashMap::new();
for (path, path_config) in &host_config.paths {
if let Some(rate) = &path_config.rate_limit {
info!("Applied Rate Limit for {} : {} request per second", hostname, rate);
}
let mut server_list = Vec::new();
let mut hl = Vec::new();
if let Some(headers) = &path_config.headers {
for header in headers {
if let Some((key, val)) = header.split_once(':') {
hl.push((key.trim().to_string(), val.trim().to_string()));
}
}
}
header_list.insert(path.clone(), hl);
for server in &path_config.servers {
if let Some((ip, port_str)) = server.split_once(':') {
if let Ok(port) = port_str.parse::<u16>() {
server_list.push(InnerMap {
address: ip.trim().to_string(),
port,
is_ssl: true,
is_http2: false,
to_https: path_config.to_https.unwrap_or(false),
// rate_limit: rate,
rate_limit: path_config.rate_limit,
});
}
}
}
path_map.insert(path.clone(), (server_list, AtomicUsize::new(0)));
}
config.headers.insert(hostname.clone(), header_list);
imtdashmap.insert(hostname.clone(), path_map);
}
let y = clone_dashmap(&imtdashmap);
let r = healthcheck::initiate_upstreams(y).await;
clone_dashmap_into(&r, &config.upstreams);
println!("Upstream Config:");
print_upstreams(&config.upstreams);
}
}
pub fn parce_main_config(path: &str) -> AppConfig { pub fn parce_main_config(path: &str) -> AppConfig {
info!("Parsing configuration");
let data = fs::read_to_string(path).unwrap(); let data = fs::read_to_string(path).unwrap();
let reply = DashMap::new(); let reply = DashMap::new();
let cfg: HashMap<String, String> = serde_yaml::from_str(&*data).expect("Failed to parse main config file"); let cfg: HashMap<String, String> = serde_yaml::from_str(&*data).expect("Failed to parse main config file");
let mut cfo: AppConfig = serde_yaml::from_str(&*data).expect("Failed to parse main config file"); let mut cfo: AppConfig = serde_yaml::from_str(&*data).expect("Failed to parse main config file");
log_builder(&cfo);
cfo.hc_method = cfo.hc_method.to_uppercase(); cfo.hc_method = cfo.hc_method.to_uppercase();
for (k, v) in cfg { for (k, v) in cfg {
reply.insert(k.to_string(), v.to_string()); reply.insert(k.to_string(), v.to_string());
@@ -150,15 +169,52 @@ pub fn parce_main_config(path: &str) -> AppConfig {
} }
} }
}; };
// match cfo.config_tls_address.clone() { cfo.proxy_tls_grade = parce_tls_grades(cfo.proxy_tls_grade.clone());
// Some(tls_cert) => {
// if let Some((ip, port_str)) = tls_cert.split_once(':') {
// if let Ok(port) = port_str.parse::<u16>() {
// cfo.local_tls_server = Option::from((ip.to_string(), port));
// }
// }
// }
// None => {}
// };
cfo cfo
} }
fn parce_tls_grades(what: Option<String>) -> Option<String> {
match what {
Some(g) => match g.to_ascii_lowercase().as_str() {
"high" => {
// info!("TLS grade set to: [ HIGH ]");
Some("high".to_string())
}
"medium" => {
// info!("TLS grade set to: [ MEDIUM ]");
Some("medium".to_string())
}
"unsafe" => {
// info!("TLS grade set to: [ UNSAFE ]");
Some("unsafe".to_string())
}
_ => {
warn!("Error parsing TLS grade, defaulting to: `medium`");
Some("medium".to_string())
}
},
None => {
warn!("TLS grade not set, defaulting to: medium");
Some("b".to_string())
}
}
}
fn log_builder(conf: &AppConfig) {
let log_level = conf.log_level.clone();
unsafe {
match log_level.as_str() {
"info" => env::set_var("RUST_LOG", "info"),
"error" => env::set_var("RUST_LOG", "error"),
"warn" => env::set_var("RUST_LOG", "warn"),
"debug" => env::set_var("RUST_LOG", "debug"),
"trace" => env::set_var("RUST_LOG", "trace"),
"off" => env::set_var("RUST_LOG", "off"),
_ => {
println!("Error reading log level, defaulting to: INFO");
env::set_var("RUST_LOG", "info")
}
}
}
env_logger::builder().init();
}

View File

@@ -3,54 +3,64 @@ use serde::{Deserialize, Serialize};
use std::collections::HashMap; use std::collections::HashMap;
use std::sync::atomic::AtomicUsize; use std::sync::atomic::AtomicUsize;
pub type InnerMap = (String, u16, bool, bool, bool);
pub type UpstreamsDashMap = DashMap<String, DashMap<String, (Vec<InnerMap>, AtomicUsize)>>; pub type UpstreamsDashMap = DashMap<String, DashMap<String, (Vec<InnerMap>, AtomicUsize)>>;
pub type UpstreamsIdMap = DashMap<String, InnerMap>; pub type UpstreamsIdMap = DashMap<String, InnerMap>;
pub type Headers = DashMap<String, DashMap<String, Vec<(String, String)>>>; pub type Headers = DashMap<String, DashMap<String, Vec<(String, String)>>>;
#[derive(Debug, Clone, Serialize, Deserialize)] #[derive(Debug, Default, Clone, Serialize, Deserialize)]
pub struct ServiceMapping { pub struct ServiceMapping {
pub proxy: String, pub proxy: String,
pub real: String, pub real: String,
} }
#[derive(Clone, Debug)] #[derive(Clone, Debug, Default)]
pub struct Extraparams { pub struct Extraparams {
pub sticky_sessions: bool, pub sticky_sessions: bool,
pub to_https: Option<bool>, pub to_https: Option<bool>,
pub authentication: DashMap<String, Vec<String>>, pub authentication: DashMap<String, Vec<String>>,
pub rate_limit: Option<isize>,
} }
#[derive(Clone, Debug, Serialize, Deserialize)] #[derive(Clone, Default, Debug, Serialize, Deserialize)]
pub struct Consul { pub struct Consul {
pub servers: Option<Vec<String>>, pub servers: Option<Vec<String>>,
pub services: Option<Vec<ServiceMapping>>, pub services: Option<Vec<ServiceMapping>>,
pub token: Option<String>, pub token: Option<String>,
} }
#[derive(Debug, Serialize, Deserialize)] #[derive(Debug, Default, Serialize, Deserialize)]
pub struct Config { pub struct Config {
pub provider: String, pub provider: String,
pub sticky_sessions: bool, pub sticky_sessions: bool,
pub to_https: Option<bool>, pub to_https: Option<bool>,
#[serde(default)]
pub upstreams: Option<HashMap<String, HostConfig>>, pub upstreams: Option<HashMap<String, HostConfig>>,
#[serde(default)]
pub globals: Option<HashMap<String, Vec<String>>>, pub globals: Option<HashMap<String, Vec<String>>>,
#[serde(default)]
pub headers: Option<Vec<String>>, pub headers: Option<Vec<String>>,
#[serde(default)]
pub authorization: Option<HashMap<String, String>>, pub authorization: Option<HashMap<String, String>>,
#[serde(default)]
pub consul: Option<Consul>, pub consul: Option<Consul>,
#[serde(default)]
pub rate_limit: Option<isize>,
} }
#[derive(Debug, Serialize, Deserialize)] #[derive(Debug, Default, Serialize, Deserialize)]
pub struct HostConfig { pub struct HostConfig {
pub paths: HashMap<String, PathConfig>, pub paths: HashMap<String, PathConfig>,
pub rate_limit: Option<isize>,
} }
#[derive(Debug, Serialize, Deserialize)] #[derive(Debug, Default, Serialize, Deserialize)]
pub struct PathConfig { pub struct PathConfig {
pub servers: Vec<String>, pub servers: Vec<String>,
pub to_https: Option<bool>, pub to_https: Option<bool>,
pub headers: Option<Vec<String>>, pub headers: Option<Vec<String>>,
pub rate_limit: Option<isize>,
} }
#[derive(Debug)] #[derive(Debug, Default)]
pub struct Configuration { pub struct Configuration {
pub upstreams: UpstreamsDashMap, pub upstreams: UpstreamsDashMap,
pub headers: Headers, pub headers: Headers,
@@ -59,7 +69,7 @@ pub struct Configuration {
pub extraparams: Extraparams, pub extraparams: Extraparams,
} }
#[derive(Debug, Deserialize)] #[derive(Debug, Default, Serialize, Deserialize)]
pub struct AppConfig { pub struct AppConfig {
pub hc_interval: u16, pub hc_interval: u16,
pub hc_method: String, pub hc_method: String,
@@ -68,19 +78,39 @@ pub struct AppConfig {
pub master_key: String, pub master_key: String,
pub config_address: String, pub config_address: String,
pub proxy_address_http: String, pub proxy_address_http: String,
pub config_api_enabled: bool,
pub config_tls_address: Option<String>, pub config_tls_address: Option<String>,
pub config_tls_certificate: Option<String>, pub config_tls_certificate: Option<String>,
pub config_tls_key_file: Option<String>, pub config_tls_key_file: Option<String>,
pub proxy_address_tls: Option<String>, pub proxy_address_tls: Option<String>,
pub proxy_port_tls: Option<u16>, pub proxy_port_tls: Option<u16>,
// pub tls_certificate: Option<String>,
// pub tls_key_file: Option<String>,
pub local_server: Option<(String, u16)>, pub local_server: Option<(String, u16)>,
pub proxy_certificates: Option<String>, pub proxy_certificates: Option<String>,
pub proxy_tls_grade: Option<String>,
pub file_server_address: Option<String>,
pub file_server_folder: Option<String>,
} }
// #[derive(Debug)] #[derive(Debug, Clone, PartialEq, Eq, Hash)]
// pub struct CertificateMove { pub struct InnerMap {
// pub cert_tx: Sender<CertificateConfig>, pub address: String,
// pub cert_rx: Receiver<CertificateConfig>, pub port: u16,
// } pub is_ssl: bool,
pub is_http2: bool,
pub to_https: bool,
pub rate_limit: Option<isize>,
}
#[allow(dead_code)]
impl InnerMap {
pub fn new() -> Self {
Self {
address: Default::default(),
port: Default::default(),
is_ssl: Default::default(),
is_http2: Default::default(),
to_https: Default::default(),
rate_limit: Default::default(),
}
}
}

View File

@@ -1,12 +1,12 @@
use dashmap::DashMap; use dashmap::DashMap;
use log::error; use log::{error, info, warn};
use pingora::tls::ssl::{select_next_proto, AlpnError, NameType, SniError, SslAlert, SslContext, SslFiletype, SslMethod, SslRef}; use pingora::tls::ssl::{select_next_proto, AlpnError, NameType, SniError, SslAlert, SslContext, SslFiletype, SslMethod, SslRef, SslVersion};
use pingora_core::listeners::tls::TlsSettings;
use rustls_pemfile::{read_one, Item}; use rustls_pemfile::{read_one, Item};
use serde::Deserialize; use serde::Deserialize;
use std::collections::HashSet; use std::collections::HashSet;
use std::fs::File; use std::fs::File;
use std::io::BufReader; use std::io::BufReader;
// use tokio::time::Instant;
use x509_parser::extensions::GeneralName; use x509_parser::extensions::GeneralName;
use x509_parser::nom::Err as NomErr; use x509_parser::nom::Err as NomErr;
use x509_parser::prelude::*; use x509_parser::prelude::*;
@@ -37,12 +37,12 @@ pub struct Certificates {
} }
impl Certificates { impl Certificates {
pub fn new(configs: &Vec<CertificateConfig>) -> Option<Self> { pub fn new(configs: &Vec<CertificateConfig>, _grade: &str) -> Option<Self> {
let default_cert = configs.first().expect("At least one TLS certificate required"); let default_cert = configs.first().expect("At least one TLS certificate required");
let mut cert_infos = Vec::new(); let mut cert_infos = Vec::new();
let name_map: DashMap<String, SslContext> = DashMap::new(); let name_map: DashMap<String, SslContext> = DashMap::new();
for config in configs { for config in configs {
let cert_info = load_cert_info(&config.cert_path, &config.key_path); let cert_info = load_cert_info(&config.cert_path, &config.key_path, _grade);
match cert_info { match cert_info {
Some(cert) => { Some(cert) => {
for name in &cert.common_names { for name in &cert.common_names {
@@ -106,7 +106,7 @@ impl Certificates {
} }
} }
fn load_cert_info(cert_path: &str, key_path: &str) -> Option<CertificateInfo> { fn load_cert_info(cert_path: &str, key_path: &str, _grade: &str) -> Option<CertificateInfo> {
let mut common_names = HashSet::new(); let mut common_names = HashSet::new();
let mut alt_names = HashSet::new(); let mut alt_names = HashSet::new();
@@ -185,9 +185,72 @@ fn create_ssl_context(cert_path: &str, key_path: &str) -> Result<SslContext, Box
Ok(built) Ok(built)
} }
#[derive(Debug)]
pub struct CipherSuite {
pub high: &'static str,
pub medium: &'static str,
pub legacy: &'static str,
}
const CIPHERS: CipherSuite = CipherSuite {
high: "TLS_AES_256_GCM_SHA384:TLS_CHACHA20_POLY1305_SHA256:TLS_AES_128_GCM_SHA256:ECDHE-ECDSA-AES256-GCM-SHA384:ECDHE-RSA-AES256-GCM-SHA384:ECDHE-ECDSA-CHACHA20-POLY1305:ECDHE-RSA-CHACHA20-POLY1305",
// aa: "TLS_AES_256_GCM_SHA384:TLS_CHACHA20_POLY1305_SHA256:TLS_AES_128_GCM_SHA256:ECDHE-ECDSA-AES256-GCM-SHA384:ECDHE-RSA-AES256-GCM-SHA384:ECDHE-ECDSA-CHACHA20-POLY1305:ECDHE-RSA-CHACHA20-POLY1305:ECDHE-RSA-AES128-GCM-SHA256:ECDHE-ECDSA-AES128-GCM-SHA256",
medium: "ECDHE-RSA-AES128-SHA:ECDHE-ECDSA-AES128-SHA:AES128-GCM-SHA256",
// cc: "AES128-SHA:DES-CBC3-SHA",
legacy: "ALL:!ADH:!LOW:!EXP:!MD5:@STRENGTH",
};
#[derive(Debug)]
pub enum TlsGrade {
HIGH,
MEDIUM,
LEGACY,
}
impl TlsGrade {
pub fn from_str(s: &str) -> Option<Self> {
match s.to_ascii_lowercase().as_str() {
"high" => Some(TlsGrade::HIGH),
"medium" => Some(TlsGrade::MEDIUM),
"unsafe" => Some(TlsGrade::LEGACY),
_ => None,
}
}
}
pub fn prefer_h2<'a>(_ssl: &mut SslRef, alpn_in: &'a [u8]) -> Result<&'a [u8], AlpnError> { pub fn prefer_h2<'a>(_ssl: &mut SslRef, alpn_in: &'a [u8]) -> Result<&'a [u8], AlpnError> {
match select_next_proto("\x02h2\x08http/1.1".as_bytes(), alpn_in) { match select_next_proto("\x02h2\x08http/1.1".as_bytes(), alpn_in) {
Some(p) => Ok(p), Some(p) => Ok(p),
_ => Err(AlpnError::NOACK), _ => Err(AlpnError::NOACK),
} }
} }
pub fn set_tsl_grade(tls_settings: &mut TlsSettings, grade: &str) {
let config_grade = TlsGrade::from_str(grade);
match config_grade {
Some(TlsGrade::HIGH) => {
let _ = tls_settings.set_min_proto_version(Some(SslVersion::TLS1_2));
// let _ = tls_settings.set_max_proto_version(Some(SslVersion::TLS1_3));
let _ = tls_settings.set_cipher_list(CIPHERS.high);
let _ = tls_settings.set_ciphersuites(CIPHERS.high);
info!("TLS grade: {:?}, => HIGH", tls_settings.options());
}
Some(TlsGrade::MEDIUM) => {
let _ = tls_settings.set_min_proto_version(Some(SslVersion::TLS1));
let _ = tls_settings.set_cipher_list(CIPHERS.medium);
let _ = tls_settings.set_ciphersuites(CIPHERS.medium);
info!("TLS grade: {:?}, => MEDIUM", tls_settings.options());
}
Some(TlsGrade::LEGACY) => {
let _ = tls_settings.set_min_proto_version(Some(SslVersion::SSL3));
let _ = tls_settings.set_cipher_list(CIPHERS.legacy);
let _ = tls_settings.set_ciphersuites(CIPHERS.legacy);
warn!("TLS grade: {:?}, => UNSAFE", tls_settings.options());
}
None => {
// Defaults to MEDIUM
let _ = tls_settings.set_min_proto_version(Some(SslVersion::TLS1));
let _ = tls_settings.set_cipher_list(CIPHERS.medium);
let _ = tls_settings.set_ciphersuites(CIPHERS.medium);
warn!("TLS grade is not detected defaulting top MEDIUM");
}
}
}

View File

@@ -1,4 +1,4 @@
use crate::utils::structs::{UpstreamsDashMap, UpstreamsIdMap}; use crate::utils::structs::{InnerMap, UpstreamsDashMap, UpstreamsIdMap};
use crate::utils::tls; use crate::utils::tls;
use crate::utils::tls::CertificateConfig; use crate::utils::tls::CertificateConfig;
use dashmap::DashMap; use dashmap::DashMap;
@@ -21,10 +21,12 @@ pub fn print_upstreams(upstreams: &UpstreamsDashMap) {
for path_entry in host_entry.value().iter() { for path_entry in host_entry.value().iter() {
let path = path_entry.key(); let path = path_entry.key();
println!(" Path: {}", path); println!(" Path: {}", path);
for f in path_entry.value().0.clone() {
for (ip, port, ssl, vers, to_https) in path_entry.value().0.clone() { println!(
println!(" ===> IP: {}, Port: {}, SSL: {}, H2: {}, To HTTPS: {}", ip, port, ssl, vers, to_https); " IP: {}, Port: {}, SSL: {}, H2: {}, To HTTPS: {}",
f.address, f.port, f.is_ssl, f.is_http2, f.to_https
);
} }
} }
} }
@@ -140,13 +142,21 @@ pub fn clone_idmap_into(original: &UpstreamsDashMap, cloned: &UpstreamsIdMap) {
let new_vec = vec.clone(); let new_vec = vec.clone();
for x in vec.iter() { for x in vec.iter() {
let mut id = String::new(); let mut id = String::new();
write!(&mut id, "{}:{}:{}", x.0, x.1, x.2).unwrap(); write!(&mut id, "{}:{}:{}", x.address, x.port, x.is_ssl).unwrap();
let mut hasher = Sha256::new(); let mut hasher = Sha256::new();
hasher.update(id.clone().into_bytes()); hasher.update(id.clone().into_bytes());
let hash = hasher.finalize(); let hash = hasher.finalize();
let hex_hash = base16ct::lower::encode_string(&hash); let hex_hash = base16ct::lower::encode_string(&hash);
let hh = hex_hash[0..50].to_string(); let hh = hex_hash[0..50].to_string();
cloned.insert(id, (hh.clone(), 0000, false, false, false)); let to_add = InnerMap {
address: hh.clone(),
port: 0,
is_ssl: false,
is_http2: false,
to_https: false,
rate_limit: None,
};
cloned.insert(id, to_add);
cloned.insert(hh, x.to_owned()); cloned.insert(hh, x.to_owned());
} }
new_inner_map.insert(path.clone(), new_vec); new_inner_map.insert(path.clone(), new_vec);

View File

@@ -36,9 +36,12 @@ impl BackgroundService for LB {
let api_load = APIUpstreamProvider { let api_load = APIUpstreamProvider {
address: self.config.config_address.clone(), address: self.config.config_address.clone(),
masterkey: self.config.master_key.clone(), masterkey: self.config.master_key.clone(),
config_api_enabled: self.config.config_api_enabled.clone(),
tls_address: self.config.config_tls_address.clone(), tls_address: self.config.config_tls_address.clone(),
tls_certificate: self.config.config_tls_certificate.clone(), tls_certificate: self.config.config_tls_certificate.clone(),
tls_key_file: self.config.config_tls_key_file.clone(), tls_key_file: self.config.config_tls_key_file.clone(),
file_server_address: self.config.file_server_address.clone(),
file_server_folder: self.config.file_server_folder.clone(),
}; };
let tx_api = tx.clone(); let tx_api = tx.clone();
let _ = tokio::spawn(async move { api_load.start(tx_api).await }); let _ = tokio::spawn(async move { api_load.start(tx_api).await });
@@ -64,6 +67,7 @@ impl BackgroundService for LB {
new.sticky_sessions = ss.extraparams.sticky_sessions; new.sticky_sessions = ss.extraparams.sticky_sessions;
new.to_https = ss.extraparams.to_https; new.to_https = ss.extraparams.to_https;
new.authentication = ss.extraparams.authentication.clone(); new.authentication = ss.extraparams.authentication.clone();
new.rate_limit = ss.extraparams.rate_limit;
self.extraparams.store(Arc::new(new)); self.extraparams.store(Arc::new(new));
self.headers.clear(); self.headers.clear();

View File

@@ -45,7 +45,7 @@ impl GetHost for LB {
} }
} }
} }
// println!("BMT :===> {:?}", best_match); // println!("Best Match :===> {:?}", best_match);
best_match best_match
} }
fn get_header(&self, peer: &str, path: &str) -> Option<Vec<(String, String)>> { fn get_header(&self, peer: &str, path: &str) -> Option<Vec<(String, String)>> {

View File

@@ -1,20 +1,25 @@
use crate::utils::auth::authenticate; use crate::utils::auth::authenticate;
use crate::utils::metrics::*; use crate::utils::metrics::*;
use crate::utils::structs::{AppConfig, Extraparams, Headers, UpstreamsDashMap, UpstreamsIdMap}; use crate::utils::structs::{AppConfig, Extraparams, Headers, InnerMap, UpstreamsDashMap, UpstreamsIdMap};
use crate::web::gethosts::GetHost; use crate::web::gethosts::GetHost;
use arc_swap::ArcSwap; use arc_swap::ArcSwap;
use async_trait::async_trait; use async_trait::async_trait;
use axum::body::Bytes; use axum::body::Bytes;
use log::{debug, warn}; use log::{debug, warn};
use once_cell::sync::Lazy;
use pingora::http::{RequestHeader, ResponseHeader, StatusCode}; use pingora::http::{RequestHeader, ResponseHeader, StatusCode};
use pingora::prelude::*; use pingora::prelude::*;
use pingora::ErrorSource::Upstream; use pingora::ErrorSource::Upstream;
use pingora_core::listeners::ALPN; use pingora_core::listeners::ALPN;
use pingora_core::prelude::HttpPeer; use pingora_core::prelude::HttpPeer;
use pingora_limits::rate::Rate;
use pingora_proxy::{ProxyHttp, Session}; use pingora_proxy::{ProxyHttp, Session};
use std::sync::Arc; use std::sync::Arc;
use std::time::Duration;
use tokio::time::Instant; use tokio::time::Instant;
static RATE_LIMITER: Lazy<Rate> = Lazy::new(|| Rate::new(Duration::from_secs(1)));
#[derive(Clone)] #[derive(Clone)]
pub struct LB { pub struct LB {
pub ump_upst: Arc<UpstreamsDashMap>, pub ump_upst: Arc<UpstreamsDashMap>,
@@ -30,12 +35,13 @@ pub struct Context {
to_https: bool, to_https: bool,
redirect_to: String, redirect_to: String,
start_time: Instant, start_time: Instant,
hostname: Option<String>,
upstream_peer: Option<InnerMap>,
extraparams: arc_swap::Guard<Arc<Extraparams>>,
} }
#[async_trait] #[async_trait]
impl ProxyHttp for LB { impl ProxyHttp for LB {
// type CTX = ();
// fn new_ctx(&self) -> Self::CTX {}
type CTX = Context; type CTX = Context;
fn new_ctx(&self) -> Self::CTX { fn new_ctx(&self) -> Self::CTX {
Context { Context {
@@ -43,56 +49,89 @@ impl ProxyHttp for LB {
to_https: false, to_https: false,
redirect_to: String::new(), redirect_to: String::new(),
start_time: Instant::now(), start_time: Instant::now(),
hostname: None,
upstream_peer: None,
extraparams: self.extraparams.load(),
} }
} }
async fn request_filter(&self, session: &mut Session, _ctx: &mut Self::CTX) -> Result<bool> { async fn request_filter(&self, session: &mut Session, _ctx: &mut Self::CTX) -> Result<bool> {
if let Some(auth) = self.extraparams.load().authentication.get("authorization") { let ep = _ctx.extraparams.clone();
if let Some(auth) = ep.authentication.get("authorization") {
let authenticated = authenticate(&auth.value(), &session); let authenticated = authenticate(&auth.value(), &session);
if !authenticated { if !authenticated {
let _ = session.respond_error(401).await; let _ = session.respond_error(401).await;
warn!("Forbidden: {:?}, {}", session.client_addr(), session.req_header().uri.path().to_string()); warn!("Forbidden: {:?}, {}", session.client_addr(), session.req_header().uri.path());
return Ok(true); return Ok(true);
} }
}; };
Ok(false)
}
async fn upstream_peer(&self, session: &mut Session, ctx: &mut Self::CTX) -> Result<Box<HttpPeer>> {
let host_name = return_header_host(&session);
match host_name {
Some(hostname) => {
// session.req_header_mut().headers.insert("X-Host-Name", host.to_string().parse().unwrap());
let mut backend_id = None;
if self.extraparams.load().sticky_sessions { let hostname = return_header_host(&session);
if let Some(cookies) = session.req_header().headers.get("cookie") { _ctx.hostname = hostname.clone();
if let Ok(cookie_str) = cookies.to_str() {
for cookie in cookie_str.split(';') { let mut backend_id = None;
let trimmed = cookie.trim();
if let Some(value) = trimmed.strip_prefix("backend_id=") { if ep.sticky_sessions {
backend_id = Some(value); if let Some(cookies) = session.req_header().headers.get("cookie") {
break; if let Ok(cookie_str) = cookies.to_str() {
} for cookie in cookie_str.split(';') {
let trimmed = cookie.trim();
if let Some(value) = trimmed.strip_prefix("backend_id=") {
backend_id = Some(value);
break;
}
}
}
}
}
match hostname {
None => return Ok(false),
Some(host) => {
let optioninnermap = self.get_host(host.as_str(), host.as_str(), backend_id);
match optioninnermap {
None => return Ok(false),
Some(ref innermap) => {
if let Some(rate) = innermap.rate_limit.or(ep.rate_limit) {
// let rate_key = session.client_addr().and_then(|addr| addr.as_inet()).map(|inet| inet.ip().to_string()).unwrap_or_else(|| host.to_string());
let rate_key = session.client_addr().and_then(|addr| addr.as_inet()).map(|inet| inet.ip());
let curr_window_requests = RATE_LIMITER.observe(&rate_key, 1);
if curr_window_requests > rate {
let mut header = ResponseHeader::build(429, None).unwrap();
header.insert_header("X-Rate-Limit-Limit", rate.to_string()).unwrap();
header.insert_header("X-Rate-Limit-Remaining", "0").unwrap();
header.insert_header("X-Rate-Limit-Reset", "1").unwrap();
session.set_keepalive(None);
session.write_response_header(Box::new(header), true).await?;
debug!("Rate limited: {:?}, {}", rate_key, rate);
return Ok(true);
} }
} }
} }
} }
_ctx.upstream_peer = optioninnermap;
let ddr = self.get_host(hostname, hostname, backend_id); }
}
match ddr { Ok(false)
Some((address, port, ssl, is_h2, to_https)) => { }
let mut peer = Box::new(HttpPeer::new((address.clone(), port.clone()), ssl, String::new())); async fn upstream_peer(&self, session: &mut Session, ctx: &mut Self::CTX) -> Result<Box<HttpPeer>> {
// let host_name = return_header_host(&session);
match ctx.hostname.as_ref() {
Some(hostname) => {
match ctx.upstream_peer.as_ref() {
// Some((address, port, ssl, is_h2, to_https)) => {
Some(innermap) => {
let mut peer = Box::new(HttpPeer::new((innermap.address.clone(), innermap.port.clone()), innermap.is_ssl, String::new()));
// if session.is_http2() { // if session.is_http2() {
if is_h2 { if innermap.is_http2 {
peer.options.alpn = ALPN::H2; peer.options.alpn = ALPN::H2;
} }
if ssl { if innermap.is_ssl {
peer.sni = hostname.to_string(); peer.sni = hostname.clone();
peer.options.verify_cert = false; peer.options.verify_cert = false;
peer.options.verify_hostname = false; peer.options.verify_hostname = false;
} }
// println!("{}, {}, alpn {}, h2 {:?}, to_https {}", hostname, address.as_str(), peer.options.alpn, is_h2, _to_https); if ctx.to_https || innermap.to_https {
if self.extraparams.load().to_https.unwrap_or(false) || to_https {
if let Some(stream) = session.stream() { if let Some(stream) = session.stream() {
if stream.get_ssl().is_none() { if stream.get_ssl().is_none() {
if let Some(addr) = session.server_addr() { if let Some(addr) = session.server_addr() {
@@ -107,7 +146,7 @@ impl ProxyHttp for LB {
} }
} }
ctx.backend_id = format!("{}:{}:{}", address.clone(), port.clone(), ssl); ctx.backend_id = format!("{}:{}:{}", innermap.address.clone(), innermap.port.clone(), innermap.is_ssl);
Ok(peer) Ok(peer)
} }
None => { None => {
@@ -155,12 +194,18 @@ impl ProxyHttp for LB {
Ok(()) Ok(())
} }
// async fn request_body_filter(&self, _session: &mut Session, _body: &mut Option<Bytes>, _end_of_stream: bool, _ctx: &mut Self::CTX) -> Result<()>
// where
// Self::CTX: Send + Sync,
// {
// Ok(())
// }
async fn response_filter(&self, session: &mut Session, _upstream_response: &mut ResponseHeader, ctx: &mut Self::CTX) -> Result<()> { async fn response_filter(&self, session: &mut Session, _upstream_response: &mut ResponseHeader, ctx: &mut Self::CTX) -> Result<()> {
// _upstream_response.insert_header("X-Proxied-From", "Fooooooooooooooo").unwrap(); // _upstream_response.insert_header("X-Proxied-From", "Fooooooooooooooo").unwrap();
if self.extraparams.load().sticky_sessions { if ctx.extraparams.sticky_sessions {
let backend_id = ctx.backend_id.clone(); let backend_id = ctx.backend_id.clone();
if let Some(bid) = self.ump_byid.get(&backend_id) { if let Some(bid) = self.ump_byid.get(&backend_id) {
let _ = _upstream_response.insert_header("set-cookie", format!("backend_id={}; Path=/; Max-Age=600; HttpOnly; SameSite=Lax", bid.0)); let _ = _upstream_response.insert_header("set-cookie", format!("backend_id={}; Path=/; Max-Age=600; HttpOnly; SameSite=Lax", bid.address));
} }
} }
if ctx.to_https { if ctx.to_https {
@@ -169,7 +214,7 @@ impl ProxyHttp for LB {
redirect_response.insert_header("Content-Length", "0")?; redirect_response.insert_header("Content-Length", "0")?;
session.write_response_header(Box::new(redirect_response), false).await?; session.write_response_header(Box::new(redirect_response), false).await?;
} }
match return_header_host(&session) { match ctx.hostname.as_ref() {
Some(host) => { Some(host) => {
let path = session.req_header().uri.path(); let path = session.req_header().uri.path();
let host_header = host; let host_header = host;
@@ -195,6 +240,7 @@ impl ProxyHttp for LB {
} }
None => {} None => {}
} }
session.set_keepalive(Some(300));
Ok(()) Ok(())
} }
@@ -211,26 +257,19 @@ impl ProxyHttp for LB {
} }
} }
fn return_header_host(session: &Session) -> Option<&str> { fn return_header_host(session: &Session) -> Option<String> {
if session.is_http2() { if session.is_http2() {
match session.req_header().uri.host() { match session.req_header().uri.host() {
Some(host) => Option::from(host), Some(host) => Option::from(host.to_string()),
None => None, None => None,
} }
} else { } else {
match session.req_header().headers.get("host") { match session.req_header().headers.get("host") {
Some(host) => { Some(host) => {
let header_host = host.to_str().unwrap().splitn(2, ':').collect::<Vec<&str>>(); let header_host = host.to_str().unwrap().splitn(2, ':').collect::<Vec<&str>>();
Option::from(header_host[0]) Option::from(header_host[0].to_string())
} }
None => None, None => None,
} }
} }
} }
// fn return_no_host(inp: &Option<(String, u16)>) -> Box<HttpPeer> {
// match inp {
// Some(t) => Box::new(HttpPeer::new(t, false, String::new())),
// None => Box::new(HttpPeer::new(("0.0.0.0", 0), false, String::new())),
// }
// }

View File

@@ -13,7 +13,7 @@ use pingora_core::prelude::{background_service, Opt};
use pingora_core::server::Server; use pingora_core::server::Server;
use std::sync::mpsc::{channel, Receiver, Sender}; use std::sync::mpsc::{channel, Receiver, Sender};
use std::sync::Arc; use std::sync::Arc;
use std::{env, thread}; use std::thread;
pub fn run() { pub fn run() {
// default_provider().install_default().expect("Failed to install rustls crypto provider"); // default_provider().install_default().expect("Failed to install rustls crypto provider");
@@ -28,51 +28,44 @@ pub fn run() {
let ff_config = Arc::new(DashMap::new()); let ff_config = Arc::new(DashMap::new());
let im_config = Arc::new(DashMap::new()); let im_config = Arc::new(DashMap::new());
let hh_config = Arc::new(DashMap::new()); let hh_config = Arc::new(DashMap::new());
let ec_config = Arc::new(ArcSwap::from_pointee(Extraparams { let ec_config = Arc::new(ArcSwap::from_pointee(Extraparams {
sticky_sessions: false, sticky_sessions: false,
to_https: None, to_https: None,
authentication: DashMap::new(), authentication: DashMap::new(),
rate_limit: None,
})); }));
let cfg = Arc::new(maincfg); let cfg = Arc::new(maincfg);
let lb = LB { let lb = LB {
ump_upst: uf_config.clone(), ump_upst: uf_config,
ump_full: ff_config.clone(), ump_full: ff_config,
ump_byid: im_config.clone(), ump_byid: im_config,
config: cfg.clone(), config: cfg.clone(),
headers: hh_config.clone(), headers: hh_config,
extraparams: ec_config.clone(), extraparams: ec_config,
}; };
// let bg = LB { /*
// ump_upst: uf_config.clone(), let log_level = cfg.log_level.clone();
// ump_full: ff_config.clone(), unsafe {
// ump_byid: im_config.clone(), match log_level.as_str() {
// config: cfg.clone(), "info" => env::set_var("RUST_LOG", "info"),
// headers: hh_config.clone(), "error" => env::set_var("RUST_LOG", "error"),
// extraparams: ec_config.clone(), "warn" => env::set_var("RUST_LOG", "warn"),
// config_rx: Arc::from(Mutex::new(rx)), "debug" => env::set_var("RUST_LOG", "debug"),
// }; "trace" => env::set_var("RUST_LOG", "trace"),
"off" => env::set_var("RUST_LOG", "off"),
// env_logger::Env::new(); _ => {
// env_logger::init(); println!("Error reading log level, defaulting to: INFO");
env::set_var("RUST_LOG", "info")
let log_level = cfg.log_level.clone(); }
unsafe {
match log_level.as_str() {
"info" => env::set_var("RUST_LOG", "info"),
"error" => env::set_var("RUST_LOG", "error"),
"warn" => env::set_var("RUST_LOG", "warn"),
"debug" => env::set_var("RUST_LOG", "debug"),
"trace" => env::set_var("RUST_LOG", "trace"),
"off" => env::set_var("RUST_LOG", "off"),
_ => {
println!("Error reading log level, defaulting to: INFO");
env::set_var("RUST_LOG", "info")
} }
} }
} env_logger::builder().init();
env_logger::builder().init(); */
let grade = cfg.proxy_tls_grade.clone().unwrap_or("medium".to_string());
info!("TLS grade set to: [ {} ]", grade);
let bg_srvc = background_service("bgsrvc", lb.clone()); let bg_srvc = background_service("bgsrvc", lb.clone());
let mut proxy = pingora_proxy::http_proxy_service(&server.configuration, lb.clone()); let mut proxy = pingora_proxy::http_proxy_service(&server.configuration, lb.clone());
@@ -87,25 +80,27 @@ pub fn run() {
watch_folder(certs_path, tx).unwrap(); watch_folder(certs_path, tx).unwrap();
}); });
let certificate_configs = rx.recv().unwrap(); let certificate_configs = rx.recv().unwrap();
let first_set = tls::Certificates::new(&certificate_configs).unwrap_or_else(|| panic!("Unable to load initial certificate info")); let first_set = tls::Certificates::new(&certificate_configs, grade.as_str()).unwrap_or_else(|| panic!("Unable to load initial certificate info"));
let certificates = Arc::new(ArcSwap::from_pointee(first_set)); let certificates = Arc::new(ArcSwap::from_pointee(first_set));
let certs_for_callback = certificates.clone(); let certs_for_callback = certificates.clone();
let certs_for_watcher = certificates.clone(); let certs_for_watcher = certificates.clone();
let new_certs = tls::Certificates::new(&certificate_configs); let new_certs = tls::Certificates::new(&certificate_configs, grade.as_str());
certs_for_watcher.store(Arc::new(new_certs.unwrap())); certs_for_watcher.store(Arc::new(new_certs.unwrap()));
let mut tls_settings = let mut tls_settings =
TlsSettings::intermediate(&certs_for_callback.load().default_cert_path, &certs_for_callback.load().default_key_path).expect("unable to load or parse cert/key"); TlsSettings::intermediate(&certs_for_callback.load().default_cert_path, &certs_for_callback.load().default_key_path).expect("unable to load or parse cert/key");
tls::set_tsl_grade(&mut tls_settings, grade.as_str());
tls_settings.set_servername_callback(move |ssl_ref: &mut SslRef, ssl_alert: &mut SslAlert| certs_for_callback.load().server_name_callback(ssl_ref, ssl_alert)); tls_settings.set_servername_callback(move |ssl_ref: &mut SslRef, ssl_alert: &mut SslAlert| certs_for_callback.load().server_name_callback(ssl_ref, ssl_alert));
tls_settings.set_alpn_select_callback(tls::prefer_h2); tls_settings.set_alpn_select_callback(tls::prefer_h2);
proxy.add_tls_with_settings(&bind_address_tls, None, tls_settings); proxy.add_tls_with_settings(&bind_address_tls, None, tls_settings);
let certs_for_watcher = certificates.clone(); let certs_for_watcher = certificates.clone();
thread::spawn(move || { thread::spawn(move || {
while let Ok(new_configs) = rx.recv() { while let Ok(new_configs) = rx.recv() {
let new_certs = tls::Certificates::new(&new_configs); let new_certs = tls::Certificates::new(&new_configs, grade.as_str());
match new_certs { match new_certs {
Some(new_certs) => { Some(new_certs) => {
certs_for_watcher.store(Arc::new(new_certs)); certs_for_watcher.store(Arc::new(new_certs));

View File

@@ -17,6 +17,7 @@ use std::collections::HashMap;
use std::net::SocketAddr; use std::net::SocketAddr;
use std::time::{Duration, SystemTime, UNIX_EPOCH}; use std::time::{Duration, SystemTime, UNIX_EPOCH};
use tokio::net::TcpListener; use tokio::net::TcpListener;
use tower_http::services::ServeDir;
#[derive(Deserialize)] #[derive(Deserialize)]
struct InputKey { struct InputKey {
@@ -34,6 +35,7 @@ struct OutToken {
struct AppState { struct AppState {
master_key: String, master_key: String,
config_sender: Sender<Configuration>, config_sender: Sender<Configuration>,
config_api_enabled: bool,
} }
#[allow(unused_mut)] #[allow(unused_mut)]
@@ -41,13 +43,16 @@ pub async fn run_server(config: &APIUpstreamProvider, mut to_return: Sender<Conf
let app_state = AppState { let app_state = AppState {
master_key: config.masterkey.clone(), master_key: config.masterkey.clone(),
config_sender: to_return.clone(), config_sender: to_return.clone(),
config_api_enabled: config.config_api_enabled.clone(),
}; };
let app = Router::new() let app = Router::new()
// .route("/{*wildcard}", get(senderror)) // .route("/{*wildcard}", get(senderror))
// .route("/{*wildcard}", post(senderror)) // .route("/{*wildcard}", post(senderror))
// .route("/{*wildcard}", put(senderror)) // .route("/{*wildcard}", put(senderror))
// .route("/{*wildcard}", head(senderror)) // .route("/{*wildcard}", head(senderror))
// .route("/{*wildcard}", delete(senderror)) // .route("/{*wildcard}", delete(senderror))
// .nest_service("/static", static_files)
.route("/jwt", post(jwt_gen)) .route("/jwt", post(jwt_gen))
.route("/conf", post(conf)) .route("/conf", post(conf))
.route("/metrics", get(metrics)) .route("/metrics", get(metrics))
@@ -65,15 +70,29 @@ pub async fn run_server(config: &APIUpstreamProvider, mut to_return: Sender<Conf
info!("Starting the TLS API server on: {}", value); info!("Starting the TLS API server on: {}", value);
} }
if let (Some(address), Some(folder)) = (&config.file_server_address, &config.file_server_folder) {
let static_files = ServeDir::new(folder);
let static_serve: Router = Router::new().fallback_service(static_files);
let static_listen = TcpListener::bind(address).await.unwrap();
let _ = tokio::spawn(async move { axum::serve(static_listen, static_serve).await.unwrap() });
}
let listener = TcpListener::bind(config.address.clone()).await.unwrap(); let listener = TcpListener::bind(config.address.clone()).await.unwrap();
info!("Starting the API server on: {}", config.address); info!("Starting the API server on: {}", config.address);
axum::serve(listener, app).await.unwrap(); axum::serve(listener, app).await.unwrap();
} }
async fn conf(State(mut st): State<AppState>, Query(params): Query<HashMap<String, String>>, content: String) -> impl IntoResponse { async fn conf(State(mut st): State<AppState>, Query(params): Query<HashMap<String, String>>, content: String) -> impl IntoResponse {
if !st.config_api_enabled {
return Response::builder()
.status(StatusCode::FORBIDDEN)
.body(Body::from("Config remote API is disabled !\n"))
.unwrap();
}
if let Some(s) = params.get("key") { if let Some(s) = params.get("key") {
if s.to_owned() == st.master_key.to_owned() { if s.to_owned() == st.master_key {
if let Some(serverlist) = crate::utils::parceyaml::load_configuration(content.as_str(), "content") { if let Some(serverlist) = crate::utils::parceyaml::load_configuration(content.as_str(), "content").await {
st.config_sender.send(serverlist).await.unwrap(); st.config_sender.send(serverlist).await.unwrap();
return Response::builder().status(StatusCode::OK).body(Body::from("Config, conf file, updated !\n")).unwrap(); return Response::builder().status(StatusCode::OK).body(Body::from("Config, conf file, updated !\n")).unwrap();
} else { } else {