54 Commits

Author SHA1 Message Date
Ara Sadoyan
2b62d1e6de configs update 2025-10-02 10:56:55 +02:00
Ara Sadoyan
8a290e5084 Kubernetes path based routing 2025-10-01 20:18:36 +02:00
Ara Sadoyan
3541b20c80 intermediate minor optimization 2025-10-01 13:47:30 +02:00
Ara Sadoyan
bd5fed9be0 Fix drop privileges, check root 2025-09-28 12:23:53 +02:00
Ara Sadoyan
b916b152ea Changed config file parser at startup, to keep initially dead nodes in list. 2025-09-25 18:32:46 +02:00
Ara Sadoyan
5d4915d6b9 Fixed drop root privileges on ports below 1024 2025-09-19 12:46:17 +02:00
Ara Sadoyan
3ea3996e27 upgrade to pingora 0.6 2025-09-18 14:15:50 +02:00
Ara Sadoyan
dd069b8532 minor fix 2025-09-17 16:51:57 +02:00
Ara Sadoyan
c78245e695 disable HC for upstream. 2025-09-16 12:54:23 +02:00
Ara Sadoyan
66b1a1c399 upstreams pathconfig fix 2025-09-15 15:22:21 +02:00
Ara Sadoyan
bba6dd8514 minor cleanup 2025-09-09 14:51:37 +02:00
Ara Sadoyan
79485ac69d minor cleanup 2025-09-04 18:16:09 +02:00
Ara Sadoyan
61c5625016 A coffee :-) 2025-09-02 14:57:47 +02:00
Ara Sadoyan
57bdc71acd A coffee :-) 2025-09-02 14:56:36 +02:00
Ara Sadoyan
9e09b829a6 README update 2025-09-01 17:02:57 +02:00
Ara Sadoyan
d3602fa578 Added Kubernetes API support, fo ingress controller. 2025-09-01 16:32:30 +02:00
Ara Sadoyan
e304482667 Optimized healthchecks and config file loading 2025-08-20 14:03:09 +02:00
Ara Sadoyan
f8118f9596 TLS grades change 2025-08-05 19:08:58 +02:00
Ara Sadoyan
f654312466 SSL cipher management 2025-07-29 21:25:27 +02:00
Ara Sadoyan
b44f7069a0 Configurable TLS ciphers 2025-07-27 11:15:49 +02:00
Ara Sadoyan
a44979ec82 Configurable TLS ciphers 2025-07-27 11:13:39 +02:00
Ara Sadoyan
ece4fa20af README 2025-07-24 13:50:15 +02:00
Ara Sadoyan
2ad3a059ab Per path rate limiter 2025-07-24 13:34:15 +02:00
Ara Sadoyan
6f012cee69 Code cleanup 2025-07-22 17:40:58 +02:00
Ara Sadoyan
51c88c8f7c Some structural changes and improvements 2025-07-12 16:17:45 +02:00
Ara Sadoyan
f91bc41103 benchmark image 2025-07-10 17:46:05 +02:00
Ara Sadoyan
21e1276ff5 Readme update 2025-07-09 15:22:38 +02:00
Ara Sadoyan
8463cdabbc Added configurable rate limiter 2025-07-09 15:01:20 +02:00
Ara Sadoyan
d0e4b52ce6 Enable/Disable config API from config 2025-07-04 15:06:05 +02:00
Ara Sadoyan
b552d24497 README 2025-07-02 19:00:05 +02:00
Ara Sadoyan
2e33d692bb Added optional minimal file server 2025-07-02 18:29:14 +02:00
Ara Sadoyan
e586967830 Code cleanup, nothing special 2025-06-30 18:24:25 +02:00
Ara Sadoyan
8d4e434d6a Dynamic load of SSL certificates from disk. 2025-06-19 18:32:44 +02:00
Ara Sadoyan
60b7b3aa7a README 2025-06-16 13:42:30 +02:00
Ara Sadoyan
569db8e18d Project rename. Load multiple certificates from folder. 2025-06-16 13:32:05 +02:00
Ara Sadoyan
4126249bcd Project rename. Load multiple certificates from folder. 2025-06-16 13:29:13 +02:00
Ara Sadoyan
0779f97277 README Update 2025-06-09 18:12:25 +02:00
Ara Sadoyan
b047331e6a README Update 2025-06-09 18:11:44 +02:00
Ara Sadoyan
a341fa30db Add TLS to API server 2025-06-09 18:06:16 +02:00
Ara Sadoyan
9d604d62e7 METRICS.md update 2025-06-07 15:51:23 +02:00
Ara Sadoyan
4a21700552 README update 2025-06-07 11:47:29 +02:00
Ara Sadoyan
f0157b6e8f README update 2025-06-07 11:38:07 +02:00
Ara Sadoyan
1370396ae8 README update 2025-06-07 10:56:31 +02:00
Ara Sadoyan
64ef4e14af README update 2025-06-07 10:11:39 +02:00
Ara Sadoyan
ffc2bab79f API server changes, improvements 2025-06-06 19:30:51 +02:00
Ara Sadoyan
8e05794784 Metrics exporter for Prometheus 2025-05-28 21:24:22 +02:00
Ara Sadoyan
423c7afa90 Metrics exporter for Prometheus 2025-05-28 21:23:10 +02:00
Ara Sadoyan
78a084380a Name and config changes 2025-05-28 14:54:01 +02:00
Ara Sadoyan
ada2032732 http to https redirect cleanup 2025-05-26 18:30:42 +02:00
Ara Sadoyan
a89592bd07 http to https redirect cleanup 2025-05-26 16:24:15 +02:00
Ara Sadoyan
2a93bc2cd6 http to https redirect cleanup 2025-05-26 12:42:01 +02:00
Ara Sadoyan
d38588a299 http to https redirect 2025-05-25 11:19:28 +02:00
Ara Sadoyan
3e93920a0d Some type changes 2025-05-21 16:49:37 +02:00
Ara Sadoyan
fce25b8d15 Turned back to OpenSSL, with static link 2025-05-21 15:54:38 +02:00
27 changed files with 2599 additions and 1199 deletions

3
.gitignore vendored
View File

@@ -5,9 +5,12 @@
*.dll
*.exe
*.sh
/docs/
/docs
/target/
*.iml
.idea/
.etc/
*.ipr
*.iws
/out/

1057
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -1,41 +1,55 @@
[package]
name = "gazan"
version = "0.1.0"
name = "aralez"
version = "0.9.2"
edition = "2021"
[profile.release]
opt-level = 3
lto = true
codegen-units = 1
opt-level = 3
strip = "symbols"
panic = "abort"
strip = true
[dependencies]
tokio = { version = "1.45.0", features = ["full"] }
pingora = { version = "0.5.0", features = ["lb", "rustls"] } # openssl, rustls, boringssl
tokio = { version = "1.45.1", features = ["full"] }
pingora = { version = "0.6.0", features = ["lb", "openssl"] } # openssl, rustls, boringssl
serde = { version = "1.0.219", features = ["derive"] }
dashmap = "7.0.0-rc2"
pingora-core = "0.5.0"
pingora-proxy = "0.5.0"
pingora-http = "0.5.0"
async-trait = "0.1.88"
pingora-core = "0.6.0"
pingora-proxy = "0.6.0"
pingora-http = "0.6.0"
pingora-limits = "0.6.0"
async-trait = "0.1.89"
env_logger = "0.11.8"
log = "0.4.27"
log = "0.4.28"
futures = "0.3.31"
notify = "8.0.0"
notify = "8.2.0"
axum = { version = "0.8.4" }
#reqwest = { version = "0.12.15", features = ["json", "native-tls-alpn"] }
axum-server = { version = "0.7.2", features = ["tls-openssl"] }
reqwest = { version = "0.12.23", features = ["json", "native-tls-alpn"] }
#reqwest = { version = "0.12.15", features = ["json", "rustls-tls"] }
reqwest = { version = "0.12.15", default-features = false, features = ["rustls-tls", "json"] }
#reqwest = { version = "0.12.15", default-features = false, features = ["rustls-tls", "json"] }
serde_yaml = "0.9.34-deprecated"
rand = "0.9.0"
rand = "0.9.2"
base64 = "0.22.1"
jsonwebtoken = "9.3.1"
tonic = "0.13.0"
sha2 = { version = "0.11.0-pre.5", default-features = false }
base16ct = { version = "0.2.0", features = ["alloc"] }
tonic = "0.14.2"
sha2 = { version = "0.11.0-rc.2", default-features = false }
base16ct = { version = "0.3.0", features = ["alloc"] }
urlencoding = "2.1.3"
arc-swap = "1.7.1"
rustls = { version = "0.23.27", features = ["ring"] }
mimalloc = { version = "0.1.46", default-features = false }
mimalloc = { version = "0.1.48", default-features = false }
prometheus = "0.14.0"
lazy_static = "1.5.0"
x509-parser = "0.18.0"
rustls-pemfile = "2.2.0"
tower-http = { version = "0.6.6", features = ["fs"] }
once_cell = "1.21.3"
privdrop = "0.5.6"
ctrlc = "3.5.0"
port_check = "0.3.0"
#moka = { version = "0.12.10", features = ["sync"] }
#rustls = { version = "0.23.27", features = ["ring"] }
#hickory-client = { version = "0.25.2" }
#openssl = "0.10.73"

381
README.md
View File

@@ -1,38 +1,53 @@
![Gazan](https://netangels.net/utils/gazan-white.jpg)
![Aralez](https://netangels.net/utils/aralez-white.jpg)
# Gazan - The beast-mode reverse proxy.
---
Gazan is a Reverse proxy, service mesh based on Cloudflare's Pingora
# Aralez (Արալեզ),
**What Gazan means?**
<ins>Gazan = Գազան = beast / wild animal in Armenian / Often used as a synonym to something great.</ins>.
### **Reverse proxy and service mesh built on top of Cloudflare's Pingora**
Built on Rust, on top of **Cloudflares Pingora engine**, **Gazan** delivers world-class performance, security and scalability — right out of the box.
What Aralez means ?
**Aralez = Արալեզ** <ins>.Named after the legendary Armenian guardian spirit, winged dog-like creature, that descend upon fallen heroes to lick their wounds and resurrect them.</ins>.
Built on Rust, on top of **Cloudflares Pingora engine**, **Aralez** delivers world-class performance, security and scalability — right out of the box.
[![Buy Me A Coffee](https://img.shields.io/badge/☕-Buy%20me%20a%20coffee-orange)](https://www.buymeacoffee.com/sadoyan)
---
## 🔧 Key Features
- **Dynamic Config Reloads** — Upstreams can be updated live via API, no restart required
- **TLS Termination** — Built-in OpenSSL support
- **Upstreams TLS detection** — Gazan will automatically detect if upstreams uses secure connection
- **Authentication** — Supports Basic Auth, API tokens, and JWT verification
- **Dynamic Config Reloads** — Upstreams can be updated live via API, no restart required.
- **TLS Termination** — Built-in OpenSSL support.
- **Automatic load of certificates** — Automatically reads and loads certificates from a folder, without a restart.
- **Upstreams TLS detection** — Aralez will automatically detect if upstreams uses secure connection.
- **Built in rate limiter** — Limit requests to server, by setting up upper limit for requests per seconds, per virtualhost.
- **Global rate limiter** — Set rate limit for all virtualhosts.
- **Per path rate limiter** — Set rate limit for specific paths. Path limits will override global limits.
- **Authentication** — Supports Basic Auth, API tokens, and JWT verification.
- **Basic Auth**
- **API Key** via `x-api-key` header
- **JWT Auth**, with tokens issued by Aralez itself via `/jwt` API
- ⬇️ See below for examples and implementation details.
- **Load Balancing Strategies**
- Round-robin
- Failover with health checks
- Sticky sessions via cookies
- **Unified Port** — Serve HTTP and WebSocket traffic over the same connection
- **Memory Safe** — Created purely on Rust
- **High Performance** — Built with [Pingora](https://github.com/cloudflare/pingora) and tokio for async I/O
- **Unified Port** — Serve HTTP and WebSocket traffic over the same connection.
- **Built in file server** — Build in minimalistic file server for serving static files, should be added as upstreams for public access.
- **Memory Safe** — Created purely on Rust.
- **High Performance** — Built with [Pingora](https://github.com/cloudflare/pingora) and tokio for async I/O.
## 🌍 Highlights
- ⚙️ **Upstream Providers:** Supports `file`-based static upstreams, dynamic service discovery via `Consul`.
- ⚙️ **Upstream Providers:**
- `file` Upstreams are declared in config file.
- `consul` Upstreams are dynamically updated from Hashicorp Consul.
- 🔁 **Hot Reloading:** Modify upstreams on the fly via `upstreams.yaml` — no restart needed.
- 🔮 **Automatic WebSocket Support:** Zero config — connection upgrades are handled seamlessly.
- 🔮 **Automatic GRPC Support:** Zero config, Requires `ssl` to proxy, gRPC is handled seamlessly.
- 🔮 **Upstreams Session Stickiness:** Enable/Disable Sticky sessions.
- 🔐 **TLS Termination:** Fully supports TLS for incoming and upstream traffic.
- 🔮 **Automatic GRPC Support:** Zero config, Requires `ssl` to proxy, gRPC handled seamlessly.
- 🔮 **Upstreams Session Stickiness:** Enable/Disable Sticky sessions globally.
- 🔐 **TLS Termination:** Fully supports TLS for upstreams and downstreams.
- 🛡️ **Built-in Authentication** Basic Auth, JWT, API key.
- 🧠 **Header Injection:** Global and per-route header configuration.
- 🧪 **Health Checks:** Pluggable health check methods for upstreams.
@@ -57,15 +72,32 @@ Built on Rust, on top of **Cloudflares Pingora engine**, **Gazan** delivers w
### 🔧 `main.yaml`
- `proxy_address_http`: `0.0.0.0:6193` (HTTP listener)
- `proxy_address_tls`: `0.0.0.0:6194` (TLS listener, optional)
- `config_address`: `0.0.0.0:3000` (HTTP API for remote config push)
- `upstreams_conf`: `etc/upstreams.yaml` (location of upstreams config)
- `log_level`: `info` (verbosity of logs)
- `hc_method`: `HEAD`, `hc_interval`: `2s` (upstream health checks)
- `user` Optional. Drop privileges to regular user. To bind to privileged ports. Requires to start as root.
- `group` Optional. Drop privileges to regular group
- Other defaults: thread count, keep-alive pool size, etc.
| Key | Example Value | Description |
|----------------------------------|--------------------------------------|----------------------------------------------------------------------------------------------------|
| **threads** | 12 | Number of running daemon threads. Optional, defaults to 1 |
| **runuser** | aralez | Optional, Username for running aralez after dropping root privileges, requires to launch as root |
| **rungroup** | aralez | Optional,Group for running aralez after dropping root privileges, requires to launch as root |
| **daemon** | false | Run in background (boolean) |
| **upstream_keepalive_pool_size** | 500 | Pool size for upstream keepalive connections |
| **pid_file** | /tmp/aralez.pid | Path to PID file |
| **error_log** | /tmp/aralez_err.log | Path to error log file |
| **upgrade_sock** | /tmp/aralez.sock | Path to live upgrade socket file |
| **config_address** | 0.0.0.0:3000 | HTTP API address for pushing upstreams.yaml from remote location |
| **config_tls_address** | 0.0.0.0:3001 | HTTPS API address for pushing upstreams.yaml from remote location |
| **config_tls_certificate** | etc/server.crt | Certificate file path for API. Mandatory if proxy_address_tls is set, else optional |
| **proxy_tls_grade** | (high, medium, unsafe) | Grade of TLS ciphers, for easy configuration. High matches Qualys SSL Labs A+ (defaults to medium) |
| **config_tls_key_file** | etc/key.pem | Private Key file path. Mandatory if proxy_address_tls is set, else optional |
| **proxy_address_http** | 0.0.0.0:6193 | Aralez HTTP bind address |
| **proxy_address_tls** | 0.0.0.0:6194 | Aralez HTTPS bind address (Optional) |
| **proxy_certificates** | etc/certs/ | The directory containing certificate and key files. In a format {NAME}.crt, {NAME}.key. |
| **upstreams_conf** | etc/upstreams.yaml | The location of upstreams file |
| **log_level** | info | Log level , possible values : info, warn, error, debug, trace, off |
| **hc_method** | HEAD | Healthcheck method (HEAD, GET, POST are supported) UPPERCASE |
| **hc_interval** | 2 | Interval for health checks in seconds |
| **master_key** | 5aeff7f9-7b94-447c-af60-e8c488544a3e | Master key for working with API server and JWT Secret generation |
| **file_server_folder** | /some/local/folder | Optional, local folder to serve |
| **file_server_address** | 127.0.0.1:3002 | Optional, Local address for file server. Can set as upstream for public access |
| **config_api_enabled** | true | Boolean to enable/disable remote config push capability |
### 🌐 `upstreams.yaml`
@@ -81,40 +113,69 @@ Built on Rust, on top of **Cloudflares Pingora engine**, **Gazan** delivers w
## 🛠 Installation
Download the prebuilt binary for your architecture from releases section of [GitHub](https://github.com/sadoyan/gazan/releases) repo
Make the binary executable `chmod 755 ./gazan-VERSION` and run.
Download the prebuilt binary for your architecture from releases section of [GitHub](https://github.com/sadoyan/aralez/releases) repo
Make the binary executable `chmod 755 ./aralez-VERSION` and run.
File names:
| File Name | Description |
|--------------------------|---------------------------------------------------------------|
| `gazan-x86_64-musl.gz` | Static Linux x86_64 binary, without any system dependency |
| `gazan-x86_64-glibc.gz` | Dynamic Linux x86_64 binary, with minimal system dependencies |
| `gazan-aarch64-musl.gz` | Static Linux ARM64 binary, without any system dependency |
| `gazan-aarch64-glibc.gz` | Dynamic Linux ARM64 binary, with minimal system dependencies |
| File Name | Description |
|---------------------------|--------------------------------------------------------------------------|
| `aralez-x86_64-musl.gz` | Static Linux x86_64 binary, without any system dependency |
| `aralez-x86_64-glibc.gz` | Dynamic Linux x86_64 binary, with minimal system dependencies |
| `aralez-aarch64-musl.gz` | Static Linux ARM64 binary, without any system dependency |
| `aralez-aarch64-glibc.gz` | Dynamic Linux ARM64 binary, with minimal system dependencies |
| `sadoyan/aralez` | Docker image on Debian 13 slim (https://hub.docker.com/r/sadoyan/aralez) |
**Via docker**
```shell
docker run -d \
-v /local/path/to/config:/etc/aralez:ro \
-p 80:80 \
-p 443:443 \
sadoyan/aralez
```
## 💡 Note
In general **glibc** builds are working faster, but have few, basic, system dependencies for example :
```
linux-vdso.so.1 (0x00007ffeea33b000)
libgcc_s.so.1 => /lib/x86_64-linux-gnu/libgcc_s.so.1 (0x00007f09e7377000)
libm.so.6 => /lib/x86_64-linux-gnu/libm.so.6 (0x00007f09e6320000)
libc.so.6 => /lib/x86_64-linux-gnu/libc.so.6 (0x00007f09e613f000)
/lib64/ld-linux-x86-64.so.2 (0x00007f09e73b1000)
```
These are common to any Linux systems, so the binary should work on almost any Linux system.
**musl** builds are 100% portable, static compiled binaries and have zero system depencecies.
In general musl builds have a little less performance.
The most intensive tests shows 107k-110k requests per second on **Glibc** binaries against 97k-100k **Musl** ones.
## 🔌 Running the Proxy
```bash
./gazan -c path/to/main.yaml
./aralez -c path/to/main.yaml
```
## 🔌 Systemd integration
```bash
cat > /etc/systemd/system/gazan.service <<EOF
cat > /etc/systemd/system/aralez.service <<EOF
[Service]
Type=forking
PIDFile=/run/gazan.pid
ExecStart=/bin/gazan -d -c /etc/gazan.conf
PIDFile=/run/aralez.pid
ExecStart=/bin/aralez -d -c /etc/aralez.conf
ExecReload=kill -QUIT $MAINPID
ExecReload=/bin/gazan -u -d -c /etc/gazan.conf
ExecReload=/bin/aralez -u -d -c /etc/aralez.conf
EOF
```
```bash
systemctl enable gazan.service.
systemctl restart gazan.service.
systemctl enable aralez.service.
systemctl restart aralez.service.
```
## 💡 Example
@@ -123,18 +184,21 @@ A sample `upstreams.yaml` entry:
```yaml
provider: "file"
stickysessions: false
globals:
headers:
- "Access-Control-Allow-Origin:*"
- "Access-Control-Allow-Methods:POST, GET, OPTIONS"
- "Access-Control-Max-Age:86400"
authorization:
- "jwt"
- "910517d9-f9a1-48de-8826-dbadacbd84af-cb6f830e-ab16-47ec-9d8f-0090de732774"
sticky_sessions: false
to_https: false
rate_limit: 10
headers:
- "Access-Control-Allow-Origin:*"
- "Access-Control-Allow-Methods:POST, GET, OPTIONS"
- "Access-Control-Max-Age:86400"
authorization:
type: "jwt"
creds: "910517d9-f9a1-48de-8826-dbadacbd84af-cb6f830e-ab16-47ec-9d8f-0090de732774"
myhost.mydomain.com:
paths:
"/":
rate_limit: 20
to_https: false
headers:
- "X-Some-Thing:Yaaaaaaaaaaaaaaa"
- "X-Proxy-From:Hopaaaaaaaaaaaar"
@@ -142,24 +206,37 @@ myhost.mydomain.com:
- "127.0.0.1:8000"
- "127.0.0.2:8000"
"/foo":
to_https: true
headers:
- "X-Another-Header:Hohohohoho"
servers:
- "127.0.0.4:8443"
- "127.0.0.5:8443"
"/.well-known/acme-challenge":
healthcheck: false
servers:
- "127.0.0.1:8001"
```
**This means:**
- Sticky sessions are disabled globally. This setting applies to all upstreams.
- Sticky sessions are disabled globally. This setting applies to all upstreams. If enabled all requests will be 301 redirected to HTTPS.
- HTTP to HTTPS redirect disabled globally, but can be overridden by `to_https` setting per upstream.
- Requests to each hosted domains will be limited to 10 requests per second per virtualhost.
- Requests limits are calculated per requester ip plus requested virtualhost.
- If the requester exceeds the limit it will receive `429 Too Many Requests` error.
- Optional. Rate limiter will be disabled if the parameter is entirely removed from config.
- Requests to `myhost.mydomain.com/` will be limited to 20 requests per second.
- Requests to `myhost.mydomain.com/` will be proxied to `127.0.0.1` and `127.0.0.2`.
- Plain HTTP to `myhost.mydomain.com/foo` will get 301 redirect to configured TLS port of Aralez.
- Requests to `myhost.mydomain.com/foo` will be proxied to `127.0.0.4` and `127.0.0.5`.
- Requests to `myhost.mydomain.com/.well-known/acme-challenge` will be proxied to `127.0.0.1:8001`, but healthcheks are disabled.
- SSL/TLS for upstreams is detected automatically, no need to set any config parameter.
- Assuming the `127.0.0.5:8443` is SSL protected. The inner traffic will use TLS.
- Self signed certificates are silently accepted.
- Global headers (CORS for this case) will be injected to all upstreams
- Self-signed certificates are silently accepted.
- Global headers (CORS for this case) will be injected to all upstreams.
- Additional headers will be injected into the request for `myhost.mydomain.com`.
- You can choose any path, deep nested paths are supported, the best match is chosen.
- You can choose any path, deep nested paths are supported, the best match chosen.
- All requests to servers will require JWT token authentication (You can comment out the authorization to disable it),
- Firs parameter specifies the mechanism of authorisation `jwt`
- Second is the secret key for validating `jwt` tokens
@@ -185,10 +262,11 @@ To enable TLS for A proxy server: Currently only OpenSSL is supported, working o
## 📡 Remote Config API
You can push new `upstreams.yaml` over HTTP to `config_address` (`:3000` by default). Useful for CI/CD automation or remote config updates.
Push new `upstreams.yaml` over HTTP to `config_address` (`:3000` by default). Useful for CI/CD automation or remote config updates.
URL parameter. `key=MASTERKEY` is required. `MASTERKEY` is the value of `master_key` in the `main.yaml`
```bash
curl -XPOST --data-binary @./etc/upstreams.txt 127.0.0.1:3000/conf
curl -XPOST --data-binary @./etc/upstreams.txt 127.0.0.1:3000/conf?key=${MASTERKEY}
```
---
@@ -199,18 +277,18 @@ curl -XPOST --data-binary @./etc/upstreams.txt 127.0.0.1:3000/conf
- Only one method can be active at a time.
- `basic` : Standard HTTP Basic Authentication requests.
- `apikey` : Authentication via `x-api-key` header, which should match the value in config.
- `jwt`: JWT authentication implemented via `gazantoken=` url parameter. `/some/url?gazantoken=TOKEN`
- `jwt`: JWT authentication implemented via `araleztoken=` url parameter. `/some/url?araleztoken=TOKEN`
- `jwt`: JWT authentication implemented via `Authorization: Bearer <token>` header.
- To obtain JWT token, you should send **generate** request to built in api server's `/jwt` endpoint.
- `masterkey`: should match configured `masterkey` in `main.yaml` and `upstreams.yaml`.
- To obtain JWT a token, you should send **generate** request to built in api server's `/jwt` endpoint.
- `master_key`: should match configured `masterkey` in `main.yaml` and `upstreams.yaml`.
- `owner` : Just a placeholder, can be anything.
- `valid` : Time in minutes during which the generated token will be valid.
**Example JWT token generateion request**
**Example JWT token generation request**
```bash
PAYLOAD='{
"masterkey": "910517d9-f9a1-48de-8826-dbadacbd84af-cb6f830e-ab16-47ec-9d8f-0090de732774",
"master_key": "910517d9-f9a1-48de-8826-dbadacbd84af-cb6f830e-ab16-47ec-9d8f-0090de732774",
"owner": "valod",
"valid": 10
}'
@@ -230,7 +308,7 @@ curl -H "Authorization: Bearer ${TOK}" -H 'Host: myip.mydomain.com' http://127.0
With URL parameter (Very useful if you want to generate and share temporary links)
```bash
curl -H 'Host: myip.mydomain.com' "http://127.0.0.1:6193/?gazantoken=${TOK}`"
curl -H 'Host: myip.mydomain.com' "http://127.0.0.1:6193/?araleztoken=${TOK}`"
```
**Example Request with API Key**
@@ -260,4 +338,181 @@ curl -u username:password -H 'Host: myip.mydomain.com' http://127.0.0.1:6193/
- Transparent, fully automatic WebSocket upgrade support.
- Transparent, fully automatic gRPC proxy.
- Sticky session support.
- HTTP2 ready.
- HTTP2 ready.
📊 Why Choose Aralez? Feature Comparison
| Feature | **Aralez** | **Nginx** | **HAProxy** | **Traefik** |
|----------------------------|----------------------------------------------------------------------|--------------------------|-------------------------|-----------------|
| **Hot Reload** | ✅ Yes (live, API/file) | ⚠️ Reloads config | ⚠️ Reloads config | ✅ Yes (dynamic) |
| **JWT Auth** | ✅ Built-in | ❌ External scripts | ❌ External Lua or agent | ⚠️ With plugins |
| **WebSocket Support** | ✅ Automatic | ⚠️ Manual config | ✅ Yes | ✅ Yes |
| **gRPC Support** | ✅ Automatic (no config) | ⚠️ Manual + HTTP/2 + TLS | ⚠️ Complex setup | ✅ Native |
| **TLS Termination** | ✅ Built-in (OpenSSL) | ✅ Yes | ✅ Yes | ✅ Yes |
| **TLS Upstream Detection** | ✅ Automatic | ❌ | ❌ | ❌ |
| **HTTP/2 Support** | ✅ Automatic | ⚠️ Requires extra config | ⚠️ Requires build flags | ✅ Native |
| **Sticky Sessions** | ✅ Cookie-based | ⚠️ In plus version only | ✅ | ✅ |
| **Prometheus Metrics** | ✅ [Built in](https://github.com/sadoyan/aralez/blob/main/METRICS.md) | ⚠️ With Lua or exporter | ⚠️ With external script | ✅ Native |
| **Built With** | 🦀 Rust | C | C | Go |
## 💡 Simple benchmark by [Oha](https://github.com/hatoo/oha)
⚠️ These benchmarks use :
- 3 async Rust echo servers on a local network with 1Gbit as upstreams.
- A dedicated server for running **Aralez**
- A dedicated server for running **Oha**
- The following upstreams configuration.
- 9 test URLs from simple `/` to nested up to 7 subpaths.
```yaml
myhost.mydomain.com:
paths:
"/":
to_https: false
headers:
- "X-Proxy-From:Aralez"
servers:
- "192.168.211.211:8000"
- "192.168.211.212:8000"
- "192.168.211.213:8000"
"/ping":
to_https: false
headers:
- "X-Some-Thing:Yaaaaaaaaaaaaaaa"
- "X-Proxy-From:Aralez"
servers:
- "192.168.211.211:8000"
- "192.168.211.212:8000"
```
## 💡 Results reflect synthetic performance under optimal conditions.
- CPU : Intel(R) Xeon(R) CPU E3-1270 v6 @ 3.80GHz
- 300 : simultaneous connections
- Duration : 10 Minutes
- Binary : aralez-x86_64-glibc
```
Summary:
Success rate: 100.00%
Total: 600.0027 secs
Slowest: 0.2138 secs
Fastest: 0.0002 secs
Average: 0.0023 secs
Requests/sec: 129777.3838
Total data: 0 B
Size/request: 0 B
Size/sec: 0 B
Response time histogram:
0.000 [1] |
0.022 [77668026] |■■■■■■■■■■■■■■■■■■■■■■■■■■■■■■■■
0.043 [190362] |
0.064 [7908] |
0.086 [319] |
0.107 [4] |
0.128 [0] |
0.150 [0] |
0.171 [0] |
0.192 [0] |
0.214 [4] |
Response time distribution:
10.00% in 0.0012 secs
25.00% in 0.0016 secs
50.00% in 0.0020 secs
75.00% in 0.0026 secs
90.00% in 0.0033 secs
95.00% in 0.0040 secs
99.00% in 0.0078 secs
99.90% in 0.0278 secs
99.99% in 0.0434 secs
Details (average, fastest, slowest):
DNS+dialup: 0.0161 secs, 0.0002 secs, 0.0316 secs
DNS-lookup: 0.0000 secs, 0.0000 secs, 0.0000 secs
Status code distribution:
[200] 77866624 responses
Error distribution:
[158] aborted due to deadline
```
![Aralez](https://netangels.net/utils/glibc10.png)
- CPU : Intel(R) Xeon(R) CPU E3-1270 v6 @ 3.80GHz
- 300 : simultaneous connections
- Duration : 10 Minutes
- Binary : aralez-x86_64-musl
```
Summary:
Success rate: 100.00%
Total: 600.0021 secs
Slowest: 0.2182 secs
Fastest: 0.0002 secs
Average: 0.0024 secs
Requests/sec: 123870.5820
Total data: 0 B
Size/request: 0 B
Size/sec: 0 B
Response time histogram:
0.000 [1] |
0.022 [74254679] |■■■■■■■■■■■■■■■■■■■■■■■■■■■■■■■■
0.044 [61400] |
0.066 [5911] |
0.087 [385] |
0.109 [0] |
0.131 [0] |
0.153 [0] |
0.175 [0] |
0.196 [0] |
0.218 [1] |
Response time distribution:
10.00% in 0.0012 secs
25.00% in 0.0016 secs
50.00% in 0.0021 secs
75.00% in 0.0028 secs
90.00% in 0.0037 secs
95.00% in 0.0045 secs
99.00% in 0.0077 secs
99.90% in 0.0214 secs
99.99% in 0.0424 secs
Details (average, fastest, slowest):
DNS+dialup: 0.0066 secs, 0.0002 secs, 0.0210 secs
DNS-lookup: 0.0000 secs, 0.0000 secs, 0.0000 secs
Status code distribution:
[200] 74322377 responses
Error distribution:
[228] aborted due to deadline
```
![Aralez](https://netangels.net/utils/musl10.png)
## 🚀 Aralez, Nginx, Traefik performance benchmark
This benchmark is done on 4 servers. With CPU Intel(R) Xeon(R) E-2174G CPU @ 3.80GHz, 64 GB RAM.
1. Sever runs Aralez, Traefik, Nginx on different ports. Tuned as much as I could .
2. 3x Upstreams servers, running Nginx. Replying with dummy json hardcoded in config file for max performance.
All servers are connected to the same switch with 1GB port in datacenter , not a home lab. The results:
![Aralez](https://raw.githubusercontent.com/sadoyan/aralez/refs/heads/main/assets/bench.png)
The results show requests per second performed by Load balancer. You can see 3 batches with 800 concurrent users.
1. Requests via http1.1 to plain text endpoint.
2. Requests to via http2 to SSL endpoint.
3. Mixed workload with plain http1.1 and htt2 SSL.

BIN
assets/bench.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 160 KiB

View File

@@ -1,19 +1,25 @@
# Main configuration file , applied on startup
threads: 12 # Nubber of daemon threads default setting
#user: pastor # Username for running gazan after dropping root privileges, requires program to start as root
#group: pastor # Group for running gazan after dropping root privileges, requires program to start as root
# Main configuration file, applied on startup
threads: 12 # Number of daemon threads default setting
#runuser: pastor # Username for running aralez after dropping root privileges, requires program to start as root
#rungroup: pastor # Group for running aralez after dropping root privileges, requires program to start as root
daemon: false # Run in background
upstream_keepalive_pool_size: 500 # Pool size for upstream keepalive connections
pid_file: /tmp/gazan.pid # Path to PID file
error_log: /tmp/gazan_err.log # Path to error log
upgrade_sock: /tmp/gazan.sock # Path to socket file
pid_file: /tmp/aralez.pid # Path to PID file
error_log: /tmp/aralez_err.log # Path to error log
upgrade_sock: /tmp/aralez.sock # Path to socket file
config_api_enabled: true # Boolean to enable/disable remote config push capability.
config_address: 0.0.0.0:3000 # HTTP API address for pushing upstreams.yaml from remote location
config_tls_address: 0.0.0.0:3001 # HTTP TLS API address for pushing upstreams.yaml from remote location
config_tls_certificate: /etc/server.crt # Mandatory if config_tls_address is set
config_tls_key_file: /etc/key.pem # Mandatory if config_tls_address is set
proxy_address_http: 0.0.0.0:6193 # Proxy HTTP bind address
proxy_address_tls: 0.0.0.0:6194 # Optional, Proxy TLS bind address
tls_certificate: etc/server.crt # Mandatory if proxy_address_tls is set
tls_key_file: etc/key.pem # Mandatory if proxy_address_tls is set
upstreams_conf: etc/upstreams.yaml # the location of upstreams file
proxy_certificates: /etc/certs # Mandatory if proxy_address_tls set, should contain a certificate and key files strictly in a format {NAME}.crt, {NAME}.key.
proxy_tls_grade: a+ # Grade of TLS suite for proxy (a+, a, b, c, unsafe), matching grades of Qualys SSL Labs
upstreams_conf: /etc/upstreams.yaml # the location of upstreams file
file_server_folder: /opt/storage # Optional, local folder to serve
file_server_address: 127.0.0.1:3002 # Optional, Local address for file server. Can set as upstream for public access.
log_level: info # info, warn, error, debug, trace, off
hc_method: HEAD # Healthcheck method (HEAD, GET, POST are supported) UPPERCASE
hc_interval: 2 #Interval for health checks in seconds
master_key: 910517d9-f9a1-48de-8826-dbadacbd84af-cb6f830e-ab16-47ec-9d8f-0090de732774 # Mater key for working with API server and JWT Secret
master_key: 910517d9-f9a1-48de-8826-dbadacbd84af-cb6f830e-ab16-47ec-9d8f-0090de732774 # Mater key for working with API server and JWT Secret

View File

@@ -11,7 +11,7 @@ upstreams:
"/":
ssl: false
headers:
- "X-Proxy-From:Gazan"
- "X-Proxy-From:Aralez"
servers:
- "192.168.221.213:8000"
- "192.168.221.214:8000"

View File

@@ -1,95 +1,77 @@
# The file is under watch and hot reload , changes are applied immediately, no need to restart or reload
provider: "file" # consul
stickysessions: true
globals:
headers: # Global headers, appended for all upstreams and all paths.
- "Access-Control-Allow-Origin:*"
- "Access-Control-Allow-Methods:POST, GET, OPTIONS"
- "Access-Control-Max-Age:86400"
- "X-Custom-Header:Something Special"
# authorization: # Optional, only one of auth methods below can be active at a time
# - "basic"
# - "gazan:Gazanpass1234"
# - "apikey"
# - "5a28cc4c-ce10-4ff1-824e-743c38835f5c"
# - "jwt"
# - "910517d9-f9a1-48de-8826-dbadacbd84af-cb6f830e-ab16-47ec-9d8f-0090de732774"
consul: # If the provider is consul. Otherwise ignored
# The file under watch and hot reload, changes are applied immediately, no need to restart or reload.
provider: "file" # consul, kubernetes
sticky_sessions: false
to_ssl: false
#rate_limit: 100
headers:
- "Access-Control-Allow-Origin:*"
- "Access-Control-Allow-Methods:POST, GET, OPTIONS"
- "Access-Control-Max-Age:86400"
- "X-Custom-Header:Something Special"
authorization:
type: "jwt"
creds: "910517d9-f9a1-48de-8826-dbadacbd84af-cb6f830e-ab16-47ec-9d8f-0090de732774"
# type: "basic"
# creds: "user:Passw0rd"
# type: "apikey"
# creds: "5ecbf799-1343-4e94-a9b5-e278af5cd313-56b45249-1839-4008-a450-a60dc76d2bae"
consul: # If the provider is consul. Otherwise, ignored.
servers:
- "http://master1:8500"
- "http://192.168.22.1:8500"
- "http://master1.digitai.local:8500"
services: # proxy: The hostname to access proxy server, real : The real service name in Consul
- "http://consul1:8500"
- "http://consul2:8500"
- "http://consul3:8500"
services: # proxy: The hostname to access the proxy server, real : The real service name in Consul database.
- proxy: "proxy-frontend-dev-frontend-srv"
real: "frontend-dev-frontend-srv"
# - proxy: "proxy-gateway-test-gateway-srv"
# real: "gateway-test-gateway-srv"
# - proxy: "proxy-backoffice-dev-backoffice-srv"
# real: "backoffice-dev-backoffice-srv"
token: "8e2db809-845b-45e1-8b47-2c8356a09da0-a4370955-18c2-4d6e-a8f8-ffcc0b47be81" # Consul server access token, If Consul auth is enabled
upstreams: # If provider is files. Otherwise ignored
myip.netangels.net: # Hostname, or header host to access the upstream
paths: # URL path(s) for current upstream, closest match wins
kubernetes:
servers:
- "172.16.0.11:5443" # KUBERNETES_SERVICE_HOST : KUBERNETES_SERVICE_PORT_HTTPS
services:
- proxy: "api-service"
real: "api-service"
path: "/"
- proxy: "api-service"
real: "search-service"
path: "/search"
- proxy: "websocket-service"
real: "websocket-service"
tokenpath: "/tmp/token.txt" # /var/run/secrets/kubernetes.io/serviceaccount/token
upstreams:
myip.mydomain.com:
paths:
rate_limit: 10 # Per path rate limit have higher priority than global rate limit. If not set, the global rate limit will be used
"/":
headers: # Custom headers, set only for this Host and Path
- "X-Proxy-From:Gazan"
to_https: false
headers:
- "X-Proxy-From:Aralez"
servers: # List of upstreams HOST:PORT
- "127.0.0.1:8000"
- "127.0.0.2:8000"
- "127.0.0.3:8000"
- "127.0.0.4:8000"
"/ping":
to_https: true
headers:
- "X-Some-Thing:Yaaaaaaaaaaaaaaa"
- "X-Proxy-From:Gazan"
- "X-Proxy-From:Aralez"
servers:
- "127.0.0.1:8000"
- "127.0.0.2:8000"
"/draw":
servers:
- "192.168.1.1:8000"
polo.netangels.net:
polo.mydomain.com:
paths:
"/":
headers:
- "X-Some-Thing:Yaaaaaaaaaaaaaaa"
servers:
- "192.168.1.1:8000"
- "192.168.1.10:8000"
- "127.0.0.1:8000"
- "127.0.0.2:8000"
- "127.0.0.3:8000"
- "127.0.0.4:8000"
glop.netangels.net:
paths:
"/":
headers:
- "X-Hopar-From:Hopaaaaaaaaaaaar"
servers:
- "192.168.1.10:8000"
- "192.168.1.1:8000"
apt.netangels.net:
paths:
"/":
servers:
- "apt.netangels.net:443"
test.netangels.net:
paths:
"/":
servers:
- "myip.netangels.net:80"
127.0.0.1:
paths:
"/":
servers:
- "192.168.1.5:8080"
127.0.0.2:
paths:
"/":
servers:
- "10.0.55.171:3000"
localpost:
paths:
"/":
servers:
- "127.0.0.1:9000"
"/.well-known/acme-challenge":
healthcheck: false
servers:
- "127.0.0.1:8001"

View File

@@ -1,9 +1,14 @@
pub mod auth;
pub mod consul;
pub mod discovery;
pub mod dnsclient;
mod filewatch;
pub mod healthcheck;
pub mod jwt;
pub mod kuber;
pub mod metrics;
pub mod parceyaml;
pub mod state;
pub mod structs;
pub mod tls;
pub mod tools;

View File

@@ -37,7 +37,7 @@ impl AuthValidator for ApiKeyAuth<'_> {
impl AuthValidator for JwtAuth<'_> {
fn validate(&self, session: &Session) -> bool {
let jwtsecret = self.0;
if let Some(tok) = get_query_param(session, "gazantoken") {
if let Some(tok) = get_query_param(session, "araleztoken") {
return check_jwt(tok.as_str(), jwtsecret);
}

View File

@@ -1,6 +1,5 @@
use crate::utils::parceyaml::load_configuration;
use crate::utils::structs::{Configuration, ServiceMapping, UpstreamsDashMap};
use crate::utils::tools::{clone_dashmap_into, compare_dashmaps};
use crate::utils::structs::{Configuration, InnerMap, ServiceMapping, UpstreamsDashMap};
use crate::utils::tools::{clone_dashmap_into, compare_dashmaps, print_upstreams};
use dashmap::DashMap;
use futures::channel::mpsc::Sender;
use futures::SinkExt;
@@ -11,6 +10,7 @@ use reqwest::header::{HeaderMap, HeaderValue};
use serde::Deserialize;
use std::collections::HashMap;
use std::sync::atomic::AtomicUsize;
use std::sync::Arc;
use std::time::Duration;
#[derive(Debug, Deserialize)]
@@ -27,59 +27,53 @@ struct TaggedAddress {
port: u16,
}
pub async fn start(fp: String, mut toreturn: Sender<Configuration>) {
let config = load_configuration(fp.as_str(), "filepath");
pub async fn start(mut toreturn: Sender<Configuration>, config: Arc<Configuration>) {
let headers = DashMap::new();
match config {
Some(config) => {
if config.typecfg.to_string() != "consul" {
info!("Not running Consul discovery, requested type is: {}", config.typecfg);
return;
}
info!("Consul Discovery is enabled : {}", config.typecfg);
let consul = config.consul.clone();
let prev_upstreams = UpstreamsDashMap::new();
match consul {
Some(consul) => {
let servers = consul.servers.unwrap();
info!("Consul Servers => {:?}", servers);
let end = servers.len() - 1;
info!("Consul Discovery is enabled : {}", config.typecfg);
let consul = config.consul.clone();
let prev_upstreams = UpstreamsDashMap::new();
match consul {
Some(consul) => {
let servers = consul.servers.unwrap();
info!("Consul Servers => {:?}", servers);
let end = servers.len();
loop {
let num = rand::rng().random_range(1..end);
headers.clear();
for (k, v) in config.headers.clone() {
headers.insert(k.to_string(), v);
}
let consul_data = servers.get(num).unwrap().to_string();
let upstreams = consul_request(consul_data, consul.services.clone(), consul.token.clone());
match upstreams.await {
Some(upstreams) => {
if !compare_dashmaps(&upstreams, &prev_upstreams) {
let mut tosend: Configuration = Configuration {
upstreams: Default::default(),
headers: Default::default(),
consul: None,
typecfg: "".to_string(),
extraparams: config.extraparams.clone(),
};
clone_dashmap_into(&upstreams, &prev_upstreams);
clone_dashmap_into(&upstreams, &tosend.upstreams);
tosend.headers = headers.clone();
tosend.extraparams.authentication = config.extraparams.authentication.clone();
tosend.typecfg = config.typecfg.clone();
tosend.consul = config.consul.clone();
toreturn.send(tosend).await.unwrap();
}
}
None => {}
}
sleep(Duration::from_secs(5)).await;
}
loop {
let mut num = 0;
if end > 0 {
num = rand::rng().random_range(0..end);
}
None => {}
headers.clear();
for (k, v) in config.headers.clone() {
headers.insert(k.to_string(), v);
}
let consul_data = servers.get(num).unwrap().to_string();
let upstreams = consul_request(consul_data, consul.services.clone(), consul.token.clone());
match upstreams.await {
Some(upstreams) => {
if !compare_dashmaps(&upstreams, &prev_upstreams) {
let mut tosend: Configuration = Configuration {
upstreams: Default::default(),
headers: Default::default(),
consul: None,
kubernetes: None,
typecfg: "".to_string(),
extraparams: config.extraparams.clone(),
};
clone_dashmap_into(&upstreams, &prev_upstreams);
clone_dashmap_into(&upstreams, &tosend.upstreams);
tosend.headers = headers.clone();
tosend.extraparams.authentication = config.extraparams.authentication.clone();
tosend.typecfg = config.typecfg.clone();
tosend.consul = config.consul.clone();
print_upstreams(&tosend.upstreams);
toreturn.send(tosend).await.unwrap();
}
}
None => {}
}
sleep(Duration::from_secs(5)).await;
}
}
None => {}
@@ -109,7 +103,7 @@ async fn consul_request(url: String, whitelist: Option<Vec<ServiceMapping>>, tok
Some(upstreams)
}
async fn get_by_http(url: String, token: Option<String>) -> Option<DashMap<String, (Vec<(String, u16, bool, bool)>, AtomicUsize)>> {
async fn get_by_http(url: String, token: Option<String>) -> Option<DashMap<String, (Vec<InnerMap>, AtomicUsize)>> {
let client = reqwest::Client::new();
let mut headers = HeaderMap::new();
if let Some(token) = token {
@@ -118,7 +112,7 @@ async fn get_by_http(url: String, token: Option<String>) -> Option<DashMap<Strin
let to = Duration::from_secs(1);
let u = client.get(url).timeout(to).send();
let mut values = Vec::new();
let upstreams: DashMap<String, (Vec<(String, u16, bool, bool)>, AtomicUsize)> = DashMap::new();
let upstreams: DashMap<String, (Vec<InnerMap>, AtomicUsize)> = DashMap::new();
match u.await {
Ok(r) => {
let jason = r.json::<Vec<Service>>().await;
@@ -127,7 +121,15 @@ async fn get_by_http(url: String, token: Option<String>) -> Option<DashMap<Strin
for service in whitelist {
let addr = service.tagged_addresses.get("lan_ipv4").unwrap().address.clone();
let prt = service.tagged_addresses.get("lan_ipv4").unwrap().port.clone();
let to_add = (addr, prt, false, false);
let to_add = InnerMap {
address: addr,
port: prt,
is_ssl: false,
is_http2: false,
to_https: false,
rate_limit: None,
healthcheck: None,
};
values.push(to_add);
}
}

View File

@@ -1,34 +1,46 @@
use crate::utils::consul;
use crate::utils::filewatch;
use crate::utils::structs::Configuration;
use crate::utils::{consul, kuber};
use crate::web::webserver;
use async_trait::async_trait;
use futures::channel::mpsc::Sender;
use std::sync::Arc;
pub struct FromFileProvider {
pub path: String,
}
pub struct APIUpstreamProvider {
pub config_api_enabled: bool,
pub address: String,
pub masterkey: String,
}
pub struct ConsulProvider {
pub path: String,
}
#[async_trait]
pub trait Discovery {
async fn start(&self, tx: Sender<Configuration>);
pub tls_address: Option<String>,
pub tls_certificate: Option<String>,
pub tls_key_file: Option<String>,
pub file_server_address: Option<String>,
pub file_server_folder: Option<String>,
}
#[async_trait]
impl Discovery for APIUpstreamProvider {
async fn start(&self, toreturn: Sender<Configuration>) {
webserver::run_server(self.address.clone(), self.masterkey.clone(), toreturn).await;
webserver::run_server(self, toreturn).await;
}
}
pub struct FromFileProvider {
pub path: String,
}
pub struct ConsulProvider {
pub config: Arc<Configuration>,
}
pub struct KubernetesProvider {
pub config: Arc<Configuration>,
}
#[async_trait]
pub trait Discovery {
async fn start(&self, tx: Sender<Configuration>);
}
#[async_trait]
impl Discovery for FromFileProvider {
async fn start(&self, tx: Sender<Configuration>) {
@@ -39,6 +51,13 @@ impl Discovery for FromFileProvider {
#[async_trait]
impl Discovery for ConsulProvider {
async fn start(&self, tx: Sender<Configuration>) {
tokio::spawn(consul::start(self.path.clone(), tx.clone()));
tokio::spawn(consul::start(tx.clone(), self.config.clone()));
}
}
#[async_trait]
impl Discovery for KubernetesProvider {
async fn start(&self, tx: Sender<Configuration>) {
tokio::spawn(kuber::start(tx.clone(), self.config.clone()));
}
}

158
src/utils/dnsclient.rs Normal file
View File

@@ -0,0 +1,158 @@
/*
use crate::utils::structs::InnerMap;
use dashmap::DashMap;
use hickory_client::client::{Client, ClientHandle};
use hickory_client::proto::rr::{DNSClass, Name, RecordType};
use hickory_client::proto::runtime::TokioRuntimeProvider;
use hickory_client::proto::udp::UdpClientStream;
use std::net::SocketAddr;
use std::str::FromStr;
use std::sync::atomic::AtomicUsize;
use std::time::Duration;
use tokio::sync::Mutex;
type DnsError = Box<dyn std::error::Error + Send + Sync + 'static>;
pub struct DnsClientPool {
clients: Vec<Mutex<DnsClient>>,
}
struct DnsClient {
client: Client,
}
pub async fn start2(mut toreturn: Sender<Configuration>, config: Arc<Configuration>) {
let k8s = config.kubernetes.clone();
match k8s {
Some(k8s) => {
let dnserver = k8s.servers.unwrap_or(vec!["127.0.0.1:53".to_string()]);
let headers = DashMap::new();
let end = dnserver.len() - 1;
let mut num = 0;
if end > 0 {
num = rand::rng().random_range(0..end);
}
let srv = dnserver.get(num).unwrap().to_string();
let pool = DnsClientPool::new(5, srv.clone()).await;
let u = UpstreamsDashMap::new();
if let Some(whitelist) = k8s.services {
loop {
let upstreams = UpstreamsDashMap::new();
for service in whitelist.iter() {
let ret = pool.query_srv(service.real.as_str(), srv.clone()).await;
match ret {
Ok(r) => {
upstreams.insert(service.proxy.clone(), r);
}
Err(e) => eprintln!("DNS query failed for {:?}: {:?}", service, e),
}
}
if !compare_dashmaps(&u, &upstreams) {
headers.clear();
for (k, v) in config.headers.clone() {
headers.insert(k.to_string(), v);
}
let mut tosend: Configuration = Configuration {
upstreams: Default::default(),
headers: Default::default(),
consul: None,
kubernetes: None,
typecfg: "".to_string(),
extraparams: config.extraparams.clone(),
};
clone_dashmap_into(&upstreams, &u);
clone_dashmap_into(&upstreams, &tosend.upstreams);
tosend.headers = headers.clone();
tosend.extraparams.authentication = config.extraparams.authentication.clone();
tosend.typecfg = config.typecfg.clone();
tosend.consul = config.consul.clone();
print_upstreams(&tosend.upstreams);
toreturn.send(tosend).await.unwrap();
}
tokio::time::sleep(Duration::from_secs(1)).await;
}
}
}
None => {}
}
}
impl DnsClient {
pub async fn new(server: String) -> Result<Self, DnsError> {
let server_details = server;
let server: SocketAddr = server_details.parse().expect("Unable to parse socket address");
let conn = UdpClientStream::builder(server, TokioRuntimeProvider::default()).build();
let (client, bg) = Client::connect(conn).await.unwrap();
tokio::spawn(bg);
Ok(Self { client })
}
pub async fn query_srv(&mut self, name: &str) -> Result<DashMap<String, (Vec<InnerMap>, AtomicUsize)>, DnsError> {
let upstreams: DashMap<String, (Vec<InnerMap>, AtomicUsize)> = DashMap::new();
let mut values = Vec::new();
match tokio::time::timeout(Duration::from_secs(5), self.client.query(Name::from_str(name)?, DNSClass::IN, RecordType::SRV)).await {
Ok(Ok(response)) => {
for answer in response.answers() {
if let hickory_client::proto::rr::RData::SRV(srv) = answer.data() {
let to_add = InnerMap {
address: srv.target().to_string(),
port: srv.port(),
is_ssl: false,
is_http2: false,
to_https: false,
rate_limit: None,
};
values.push(to_add);
}
}
upstreams.insert("/".to_string(), (values, AtomicUsize::new(0)));
Ok(upstreams)
}
Ok(Err(e)) => Err(Box::new(e)),
Err(_) => Err("DNS query timed out".into()),
}
}
}
impl DnsClientPool {
pub async fn new(pool_size: usize, server: String) -> Self {
let mut clients = Vec::with_capacity(pool_size);
for _ in 0..pool_size {
if let Ok(client) = DnsClient::new(server.clone()).await {
clients.push(Mutex::new(client));
}
}
Self { clients }
}
pub async fn query_srv(&self, name: &str, server: String) -> Result<DashMap<String, (Vec<InnerMap>, AtomicUsize)>, DnsError> {
// Try to get an available client
for client_mutex in &self.clients {
if let Ok(mut client) = client_mutex.try_lock() {
let vay = client.query_srv(name).await;
match vay {
Ok(_) => return vay,
Err(_) => {
// If query fails, drop this client and create a new one
*client = match DnsClient::new(server).await {
Ok(c) => c,
Err(e) => return Err(e),
};
// Retry with the new client
return client.query_srv(name).await;
}
}
}
}
// If all clients are busy, wait for the first one with a timeout
match tokio::time::timeout(Duration::from_secs(2), self.clients[0].lock()).await {
Ok(mut client) => client.query_srv(name).await,
Err(_) => Err("All DNS clients are busy and timeout reached".into()),
}
}
}
*/

View File

@@ -2,7 +2,7 @@ use crate::utils::parceyaml::load_configuration;
use crate::utils::structs::Configuration;
use futures::channel::mpsc::Sender;
use futures::SinkExt;
use log::{error, info, warn};
use log::error;
use notify::event::ModifyKind;
use notify::{Config, Event, EventKind, RecommendedWatcher, RecursiveMode, Watcher};
use pingora::prelude::sleep;
@@ -15,19 +15,7 @@ pub async fn start(fp: String, mut toreturn: Sender<Configuration>) {
let file_path = fp.as_str();
let parent_dir = Path::new(file_path).parent().unwrap();
let (local_tx, mut local_rx) = tokio::sync::mpsc::channel::<notify::Result<Event>>(1);
let snd = load_configuration(file_path, "filepath");
match snd {
Some(snd) => {
if snd.typecfg != "file" {
warn!("Disabling file watcher, requested discovery type is: {}", snd.typecfg);
return;
}
info!("Watching for changes in {:?}", parent_dir);
toreturn.send(snd).await.unwrap();
}
None => {}
}
let _watcher_handle = task::spawn_blocking({
let parent_dir = parent_dir.to_path_buf(); // Move directory path into the closure
move || {
@@ -53,7 +41,7 @@ pub async fn start(fp: String, mut toreturn: Sender<Configuration>) {
if start.elapsed() > Duration::from_secs(2) {
start = Instant::now();
// info!("Config File changed :=> {:?}", e);
let snd = load_configuration(file_path, "filepath");
let snd = load_configuration(file_path, "filepath").await;
match snd {
Some(snd) => {
toreturn.send(snd).await.unwrap();

View File

@@ -1,7 +1,7 @@
use crate::utils::structs::{UpstreamsDashMap, UpstreamsIdMap};
use crate::utils::structs::{InnerMap, UpstreamsDashMap, UpstreamsIdMap};
use crate::utils::tools::*;
use dashmap::DashMap;
use log::{error, info, warn};
use log::{error, warn};
use reqwest::{Client, Version};
use std::sync::atomic::AtomicUsize;
use std::sync::Arc;
@@ -11,112 +11,111 @@ use tonic::transport::Endpoint;
pub async fn hc2(upslist: Arc<UpstreamsDashMap>, fullist: Arc<UpstreamsDashMap>, idlist: Arc<UpstreamsIdMap>, params: (&str, u64)) {
let mut period = interval(Duration::from_secs(params.1));
let mut first_run = 0;
let client = Client::builder().timeout(Duration::from_secs(params.1)).danger_accept_invalid_certs(true).build().unwrap();
loop {
tokio::select! {
_ = period.tick() => {
let totest : UpstreamsDashMap = DashMap::new();
let fclone : UpstreamsDashMap = clone_dashmap(&fullist);
for val in fclone.iter() {
let host = val.key();
let inner = DashMap::new();
let mut _scheme: (String, u16, bool, bool) = ("".to_string(), 0, false, false);
for path_entry in val.value().iter() {
// let inner = DashMap::new();
let path = path_entry.key();
let mut innervec= Vec::new();
for k in path_entry.value().0 .iter().enumerate() {
let (ip, port, _ssl, _version) = k.1;
let mut _link = String::new();
let tls = detect_tls(ip, port).await;
let mut is_h2 = false;
// if tls.1 == Some(Version::HTTP_11) {
// println!(" V1: ==> {:?}", tls.1)
// }else if tls.1 == Some(Version::HTTP_2) {
// is_h2 = true;
// println!(" V2: ==> {:?}", tls.1)
// }
if tls.1 == Some(Version::HTTP_2) {
is_h2 = true;
// println!(" V2: ==> {} ==> {:?}", tls.0, tls.1)
}
match tls.0 {
true => _link = format!("https://{}:{}{}", ip, port, path),
false => _link = format!("http://{}:{}{}", ip, port, path),
}
// if _pref == "https://" {
// _scheme = (ip.to_string(), *port, true);
// }else {
// _scheme = (ip.to_string(), *port, false);
// }
_scheme = (ip.to_string(), *port, tls.0, is_h2);
// let link = format!("{}{}:{}{}", _pref, ip, port, path);
let resp = http_request(_link.as_str(), params.0, "").await;
match resp.0 {
true => {
if resp.1 {
_scheme = (ip.to_string(), *port, tls.0, true);
}
innervec.push(_scheme.clone());
}
false => {
warn!("Dead Upstream : {}", _link);
}
}
}
inner.insert(path.clone().to_owned(), (innervec, AtomicUsize::new(0)));
}
totest.insert(host.clone(), inner);
}
if first_run == 1 {
info!("Performing initial hatchecks and upstreams ssl detection");
clone_idmap_into(&totest, &idlist);
info!("Gazan is up and ready to serve requests");
}
first_run+=1;
if ! compare_dashmaps(&totest, &upslist){
clone_dashmap_into(&totest, &upslist);
clone_idmap_into(&totest, &idlist);
}
populate_upstreams(&upslist, &fullist, &idlist, params, &client).await;
}
}
}
}
#[allow(dead_code)]
async fn http_request(url: &str, method: &str, payload: &str) -> (bool, bool) {
let client = Client::builder().danger_accept_invalid_certs(true).build().unwrap();
let timeout = Duration::from_secs(1);
pub async fn populate_upstreams(upslist: &Arc<UpstreamsDashMap>, fullist: &Arc<UpstreamsDashMap>, idlist: &Arc<UpstreamsIdMap>, params: (&str, u64), client: &Client) {
let totest = build_upstreams(fullist, params.0, client).await;
if !compare_dashmaps(&totest, upslist) {
clone_dashmap_into(&totest, upslist);
clone_idmap_into(&totest, idlist);
}
}
pub async fn initiate_upstreams(fullist: UpstreamsDashMap) -> UpstreamsDashMap {
let client = Client::builder().timeout(Duration::from_secs(2)).danger_accept_invalid_certs(true).build().unwrap();
build_upstreams(&fullist, "HEAD", &client).await
}
async fn build_upstreams(fullist: &UpstreamsDashMap, method: &str, client: &Client) -> UpstreamsDashMap {
let totest: UpstreamsDashMap = DashMap::new();
let fclone = clone_dashmap(fullist);
for val in fclone.iter() {
let host = val.key();
let inner = DashMap::new();
for path_entry in val.value().iter() {
let path = path_entry.key();
let mut innervec = Vec::new();
for (_, upstream) in path_entry.value().0.iter().enumerate() {
let tls = detect_tls(upstream.address.as_str(), &upstream.port, &client).await;
let is_h2 = matches!(tls.1, Some(Version::HTTP_2));
let link = if tls.0 {
format!("https://{}:{}{}", upstream.address, upstream.port, path)
} else {
format!("http://{}:{}{}", upstream.address, upstream.port, path)
};
let mut scheme = InnerMap {
address: upstream.address.clone(),
port: upstream.port,
is_ssl: tls.0,
is_http2: is_h2,
to_https: upstream.to_https,
rate_limit: upstream.rate_limit,
healthcheck: upstream.healthcheck,
};
if scheme.healthcheck.unwrap_or(true) {
let resp = http_request(&link, method, "", &client).await;
if resp.0 {
if resp.1 {
scheme.is_http2 = is_h2; // could be adjusted further
}
innervec.push(scheme);
} else {
warn!("Dead Upstream : {}", link);
}
} else {
innervec.push(scheme);
}
// let resp = http_request(&link, method, "", &client).await;
// if resp.0 {
// if resp.1 {
// scheme.is_http2 = is_h2; // could be adjusted further
// }
// innervec.push(scheme);
// } else {
// warn!("Dead Upstream : {}", link);
// }
}
inner.insert(path.clone(), (innervec, AtomicUsize::new(0)));
}
totest.insert(host.clone(), inner);
}
totest
}
async fn http_request(url: &str, method: &str, payload: &str, client: &Client) -> (bool, bool) {
if !["POST", "GET", "HEAD"].contains(&method) {
error!("Method {} not supported. Only GET|POST|HEAD are supported ", method);
return (false, false);
}
async fn send_request(client: &Client, method: &str, url: &str, payload: &str, timeout: Duration) -> Option<reqwest::Response> {
async fn send_request(client: &Client, method: &str, url: &str, payload: &str) -> Option<reqwest::Response> {
match method {
"POST" => client.post(url).body(payload.to_owned()).timeout(timeout).send().await.ok(),
"GET" => client.get(url).timeout(timeout).send().await.ok(),
"HEAD" => client.head(url).timeout(timeout).send().await.ok(),
"POST" => client.post(url).body(payload.to_owned()).send().await.ok(),
"GET" => client.get(url).send().await.ok(),
"HEAD" => client.head(url).send().await.ok(),
_ => None,
}
}
match send_request(&client, method, url, payload, timeout).await {
match send_request(&client, method, url, payload).await {
Some(response) => {
let status = response.status().as_u16();
((99..499).contains(&status), false)
}
None => {
// let fallback_url = url.replace("https", "http");
// ping_grpc(&fallback_url).await
(ping_grpc(&url).await, true)
}
None => (ping_grpc(&url).await, true),
}
}
@@ -127,10 +126,7 @@ pub async fn ping_grpc(addr: &str) -> bool {
let endpoint = endpoint.timeout(Duration::from_secs(2));
match tokio::time::timeout(Duration::from_secs(3), endpoint.connect()).await {
Ok(Ok(_channel)) => {
// println!("{:?} ==> {:?} ==> {}", endpoint, _channel, addr);
true
}
Ok(Ok(_channel)) => true,
_ => false,
}
} else {
@@ -138,15 +134,24 @@ pub async fn ping_grpc(addr: &str) -> bool {
}
}
async fn detect_tls(ip: &str, port: &u16) -> (bool, Option<Version>) {
let url = format!("https://{}:{}", ip, port);
// let url = format!("{}:{}", ip, port);
let client = Client::builder().timeout(Duration::from_secs(2)).danger_accept_invalid_certs(true).build().unwrap();
match client.get(&url).send().await {
Ok(response) => (true, Some(response.version())),
Err(e) => {
if e.is_builder() || e.is_connect() || e.to_string().contains("tls") {
(false, None)
async fn detect_tls(ip: &str, port: &u16, client: &Client) -> (bool, Option<Version>) {
let https_url = format!("https://{}:{}", ip, port);
match client.get(&https_url).send().await {
Ok(response) => {
// println!("{} => {:?} (HTTPS)", https_url, response.version());
return (true, Some(response.version()));
}
_ => {}
}
let http_url = format!("http://{}:{}", ip, port);
match client.get(&http_url).send().await {
Ok(response) => {
// println!("{} => {:?} (HTTP)", http_url, response.version());
(false, Some(response.version()))
}
Err(_) => {
if ping_grpc(&http_url).await {
(false, Some(Version::HTTP_2))
} else {
(false, None)
}

148
src/utils/kuber.rs Normal file
View File

@@ -0,0 +1,148 @@
// use crate::utils::dnsclient::DnsClientPool;
use crate::utils::structs::{Configuration, InnerMap, UpstreamsDashMap};
use crate::utils::tools::{clone_dashmap_into, compare_dashmaps, print_upstreams};
use dashmap::DashMap;
use futures::channel::mpsc::Sender;
use futures::SinkExt;
use pingora::prelude::sleep;
use rand::Rng;
use reqwest::Client;
use std::env;
use std::sync::atomic::AtomicUsize;
use std::sync::Arc;
use std::time::Duration;
use tokio::fs::File;
use tokio::io::AsyncReadExt;
// static KUBERNETES_SERVICE_HOST: &str = "IP_ADDRESS";
// static TOKEN: &str = "TOKEN";
#[derive(Debug, serde::Deserialize)]
struct Endpoints {
subsets: Option<Vec<Subset>>,
}
#[derive(Debug, serde::Deserialize)]
struct Subset {
addresses: Option<Vec<Address>>,
ports: Option<Vec<Port>>,
}
#[derive(Debug, serde::Deserialize)]
struct Address {
ip: String,
}
#[derive(Debug, serde::Deserialize)]
struct Port {
// name: String,
port: u16,
}
pub async fn start(mut toreturn: Sender<Configuration>, config: Arc<Configuration>) {
// println!("{:?}", config);
let upstreams = UpstreamsDashMap::new();
let prev_upstreams = UpstreamsDashMap::new();
loop {
if let Some(kuber) = config.kubernetes.clone() {
let path = kuber.tokenpath.unwrap_or("/var/run/secrets/kubernetes.io/serviceaccount/token".to_string());
let token = read_token(path.as_str()).await;
let servers = kuber.servers.unwrap_or(vec![format!(
"{}:{}",
env::var("KUBERNETES_SERVICE_HOST").unwrap_or("0.0.0.0".to_string()),
env::var("KUBERNETES_SERVICE_PORT_HTTPS").unwrap_or("0".to_string())
)]);
let end = servers.len() - 1;
let mut num = 0;
if end > 0 {
num = rand::rng().random_range(0..end);
}
let server = servers.get(num).unwrap().to_string();
if let Some(svc) = kuber.services {
for i in svc {
let path = i.path.unwrap_or("/".to_string());
let url = format!("https://{}/api/v1/namespaces/staging/endpoints/{}", server, i.real);
let list = get_by_http(&*url, &*token, &*path).await;
// println!("{:?}", list);
if let Some(list) = list {
match upstreams.get(&i.proxy.clone()) {
Some(foo) => {
for (k, v) in list {
foo.value().insert(k, v);
}
}
None => {
upstreams.insert(i.proxy.clone(), list);
}
};
}
}
}
}
// print_upstreams(&upstreams);
if !compare_dashmaps(&upstreams, &prev_upstreams) {
let tosend: Configuration = Configuration {
upstreams: Default::default(),
headers: config.headers.clone(),
consul: config.consul.clone(),
kubernetes: config.kubernetes.clone(),
typecfg: config.typecfg.clone(),
extraparams: config.extraparams.clone(),
};
clone_dashmap_into(&upstreams, &prev_upstreams);
clone_dashmap_into(&upstreams, &tosend.upstreams);
print_upstreams(&tosend.upstreams);
toreturn.send(tosend).await.unwrap();
}
sleep(Duration::from_secs(5)).await;
}
}
pub async fn get_by_http(url: &str, token: &str, path: &str) -> Option<DashMap<String, (Vec<InnerMap>, AtomicUsize)>> {
let client = Client::builder().timeout(Duration::from_secs(2)).danger_accept_invalid_certs(true).build().ok()?;
let resp = client.get(url).bearer_auth(token).send().await.ok()?;
if !resp.status().is_success() {
eprintln!("Kubernetes API returned status: {}", resp.status());
return None;
}
let endpoints: Endpoints = resp.json().await.ok()?;
let upstreams: DashMap<String, (Vec<InnerMap>, AtomicUsize)> = DashMap::new();
if let Some(subsets) = endpoints.subsets {
for subset in subsets {
if let (Some(addresses), Some(ports)) = (subset.addresses, subset.ports) {
let mut inner_vec = Vec::new();
for addr in addresses {
for port in &ports {
let to_add = InnerMap {
address: addr.ip.clone(),
port: port.port.clone(),
is_ssl: false,
is_http2: false,
to_https: false,
rate_limit: None,
healthcheck: None,
};
inner_vec.push(to_add);
}
}
upstreams.insert(path.to_string(), (inner_vec, AtomicUsize::new(0)));
}
}
}
Some(upstreams)
}
async fn read_token(path: &str) -> String {
let mut file = File::open(path).await.unwrap();
let mut contents = String::new();
file.read_to_string(&mut contents).await.unwrap();
contents.trim().to_string()
}

63
src/utils/metrics.rs Normal file
View File

@@ -0,0 +1,63 @@
use pingora_http::Version;
use prometheus::{register_histogram, register_int_counter, register_int_counter_vec, Histogram, IntCounter, IntCounterVec};
use std::time::Duration;
pub struct MetricTypes {
pub method: String,
pub code: String,
pub latency: Duration,
pub version: Version,
}
lazy_static::lazy_static! {
pub static ref REQUEST_COUNT: IntCounter = register_int_counter!(
"aralez_requests_total",
"Total number of requests handled by Aralez"
).unwrap();
pub static ref RESPONSE_CODES: IntCounterVec = register_int_counter_vec!(
"aralez_responses_total",
"Responses grouped by status code",
&["status"]
).unwrap();
pub static ref REQUEST_LATENCY: Histogram = register_histogram!(
"aralez_request_latency_seconds",
"Request latency in seconds",
vec![0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0]
).unwrap();
pub static ref RESPONSE_LATENCY: Histogram = register_histogram!(
"aralez_response_latency_seconds",
"Response latency in seconds",
vec![0.01, 0.05, 0.1, 0.25, 0.5, 1.0, 2.0, 5.0]
).unwrap();
pub static ref REQUESTS_BY_METHOD: IntCounterVec = register_int_counter_vec!(
"aralez_requests_by_method_total",
"Number of requests by HTTP method",
&["method"]
).unwrap();
pub static ref REQUESTS_BY_VERSION: IntCounterVec = register_int_counter_vec!(
"aralez_requests_by_version_total",
"Number of requests by HTTP versions",
&["version"]
).unwrap();
pub static ref ERROR_COUNT: IntCounter = register_int_counter!(
"aralez_errors_total",
"Total number of errors"
).unwrap();
}
pub fn calc_metrics(metric_types: &MetricTypes) {
REQUEST_COUNT.inc();
let timer = REQUEST_LATENCY.start_timer();
timer.observe_duration();
let version_str = match &metric_types.version {
&Version::HTTP_11 => "HTTP/1.1",
&Version::HTTP_2 => "HTTP/2.0",
&Version::HTTP_3 => "HTTP/3.0",
&Version::HTTP_10 => "HTTP/1.0",
_ => "Unknown",
};
REQUESTS_BY_VERSION.with_label_values(&[&version_str]).inc();
RESPONSE_CODES.with_label_values(&[&metric_types.code.to_string()]).inc();
REQUESTS_BY_METHOD.with_label_values(&[&metric_types.method]).inc();
RESPONSE_LATENCY.observe(metric_types.latency.as_secs_f64());
}

View File

@@ -1,135 +1,164 @@
use crate::utils::healthcheck;
use crate::utils::state::{is_first_run, mark_not_first_run};
use crate::utils::structs::*;
use crate::utils::tools::{clone_dashmap, clone_dashmap_into, print_upstreams};
use dashmap::DashMap;
use log::{error, info, warn};
use serde_yaml::Error;
use std::collections::HashMap;
use std::fs;
use std::sync::atomic::AtomicUsize;
// use std::sync::mpsc::{channel, Receiver, Sender};
use std::{env, fs};
// use tokio::sync::oneshot::{Receiver, Sender};
pub fn load_configuration(d: &str, kind: &str) -> Option<Configuration> {
let mut toreturn: Configuration = Configuration {
upstreams: Default::default(),
headers: Default::default(),
consul: None,
typecfg: "".to_string(),
extraparams: Extraparams {
stickysessions: false,
authentication: DashMap::new(),
pub async fn load_configuration(d: &str, kind: &str) -> Option<Configuration> {
let yaml_data = match kind {
"filepath" => match fs::read_to_string(d) {
Ok(data) => {
info!("Reading upstreams from {}", d);
data
}
Err(e) => {
error!("Reading: {}: {:?}", d, e);
warn!("Running with empty upstreams list, update it via API");
return None;
}
},
};
toreturn.upstreams = UpstreamsDashMap::new();
toreturn.headers = Headers::new();
let mut yaml_data = d.to_string();
match kind {
"filepath" => {
let _ = match fs::read_to_string(d) {
Ok(data) => {
info!("Reading upstreams from {}", d);
yaml_data = data
}
Err(e) => {
error!("Reading: {}: {:?}", d, e.to_string());
warn!("Running with empty upstreams list, update it via API");
return None;
}
};
}
"content" => {
info!("Reading upstreams from API post body");
d.to_string()
}
_ => error!("Mismatched parameter, only filepath|content is allowed "),
}
_ => {
error!("Mismatched parameter, only filepath|content is allowed");
return None;
}
};
let p: Result<Config, Error> = serde_yaml::from_str(&yaml_data);
match p {
Ok(parsed) => {
let global_headers = DashMap::new();
let mut hl = Vec::new();
if let Some(globals) = &parsed.globals {
for headers in globals.get("headers").iter().by_ref() {
for header in headers.iter() {
if let Some((key, val)) = header.split_once(':') {
hl.push((key.to_string(), val.to_string()));
}
}
}
global_headers.insert("/".to_string(), hl);
toreturn.headers.insert("GLOBAL_HEADERS".to_string(), global_headers);
toreturn.extraparams.stickysessions = parsed.stickysessions;
let cfg = DashMap::new();
if let Some(k) = globals.get("authorization") {
cfg.insert("authorization".to_string(), k.to_owned());
toreturn.extraparams.authentication = cfg;
} else {
toreturn.extraparams.authentication = DashMap::new();
}
}
match parsed.provider.as_str() {
"file" => {
toreturn.typecfg = "file".to_string();
if let Some(upstream) = parsed.upstreams {
for (hostname, host_config) in upstream {
let path_map = DashMap::new();
let header_list = DashMap::new();
for (path, path_config) in host_config.paths {
let mut server_list = Vec::new();
let mut hl = Vec::new();
if let Some(headers) = &path_config.headers {
for header in headers.iter().by_ref() {
if let Some((key, val)) = header.split_once(':') {
hl.push((key.to_string(), val.to_string()));
}
}
}
header_list.insert(path.clone(), hl);
for server in path_config.servers {
if let Some((ip, port_str)) = server.split_once(':') {
if let Ok(port) = port_str.parse::<u16>() {
// server_list.push((ip.to_string(), port, path_config.ssl));
server_list.push((ip.to_string(), port, true, false));
}
}
}
path_map.insert(path, (server_list, AtomicUsize::new(0)));
}
toreturn.headers.insert(hostname.clone(), header_list);
toreturn.upstreams.insert(hostname, path_map);
}
}
Some(toreturn)
}
"consul" => {
toreturn.typecfg = "consul".to_string();
let consul = parsed.consul;
match consul {
Some(consul) => {
toreturn.consul = Some(consul);
Some(toreturn)
}
None => None,
}
}
"kubernetes" => None,
_ => {
warn!("Unknown provider {}", parsed.provider);
None
}
}
}
let parsed: Config = match serde_yaml::from_str(&yaml_data) {
Ok(cfg) => cfg,
Err(e) => {
error!("Failed to parse upstreams file: {}", e);
return None;
}
};
let mut toreturn = Configuration::default();
populate_headers_and_auth(&mut toreturn, &parsed).await;
toreturn.typecfg = parsed.provider.clone();
match parsed.provider.as_str() {
"file" => {
populate_file_upstreams(&mut toreturn, &parsed).await;
Some(toreturn)
}
"consul" => {
toreturn.consul = parsed.consul;
toreturn.consul.is_some().then_some(toreturn)
}
"kubernetes" => {
toreturn.kubernetes = parsed.kubernetes;
toreturn.kubernetes.is_some().then_some(toreturn)
}
_ => {
warn!("Unknown provider {}", parsed.provider);
None
}
}
}
async fn populate_headers_and_auth(config: &mut Configuration, parsed: &Config) {
if let Some(headers) = &parsed.headers {
let mut hl = Vec::new();
for header in headers {
if let Some((key, val)) = header.split_once(':') {
hl.push((key.trim().to_string(), val.trim().to_string()));
}
}
let global_headers = DashMap::new();
global_headers.insert("/".to_string(), hl);
config.headers.insert("GLOBAL_HEADERS".to_string(), global_headers);
}
config.extraparams.sticky_sessions = parsed.sticky_sessions;
config.extraparams.to_https = parsed.to_https;
config.extraparams.rate_limit = parsed.rate_limit;
if let Some(rate) = &parsed.rate_limit {
info!("Applied Global Rate Limit : {} request per second", rate);
}
if let Some(auth) = &parsed.authorization {
let name = auth.get("type").unwrap_or(&"".to_string()).to_string();
let creds = auth.get("creds").unwrap_or(&"".to_string()).to_string();
config.extraparams.authentication.insert("authorization".to_string(), vec![name, creds]);
} else {
config.extraparams.authentication = DashMap::new();
}
}
async fn populate_file_upstreams(config: &mut Configuration, parsed: &Config) {
let imtdashmap = UpstreamsDashMap::new();
if let Some(upstreams) = &parsed.upstreams {
for (hostname, host_config) in upstreams {
let path_map = DashMap::new();
let header_list = DashMap::new();
for (path, path_config) in &host_config.paths {
if let Some(rate) = &path_config.rate_limit {
info!("Applied Rate Limit for {} : {} request per second", hostname, rate);
}
let mut server_list = Vec::new();
let mut hl = Vec::new();
if let Some(headers) = &path_config.headers {
for header in headers {
if let Some((key, val)) = header.split_once(':') {
hl.push((key.trim().to_string(), val.trim().to_string()));
}
}
}
header_list.insert(path.clone(), hl);
for server in &path_config.servers {
if let Some((ip, port_str)) = server.split_once(':') {
if let Ok(port) = port_str.parse::<u16>() {
server_list.push(InnerMap {
address: ip.trim().to_string(),
port,
is_ssl: true,
is_http2: false,
to_https: path_config.to_https.unwrap_or(false),
rate_limit: path_config.rate_limit,
healthcheck: path_config.healthcheck,
});
}
}
}
path_map.insert(path.clone(), (server_list, AtomicUsize::new(0)));
}
config.headers.insert(hostname.clone(), header_list);
imtdashmap.insert(hostname.clone(), path_map);
}
if is_first_run() {
clone_dashmap_into(&imtdashmap, &config.upstreams);
mark_not_first_run();
} else {
let y = clone_dashmap(&imtdashmap);
let r = healthcheck::initiate_upstreams(y).await;
clone_dashmap_into(&r, &config.upstreams);
}
info!("Upstream Config:");
print_upstreams(&config.upstreams);
}
}
pub fn parce_main_config(path: &str) -> AppConfig {
info!("Parsing configuration");
let data = fs::read_to_string(path).unwrap();
let reply = DashMap::new();
let cfg: HashMap<String, String> = serde_yaml::from_str(&*data).expect("Failed to parse main config file");
let mut cfo: AppConfig = serde_yaml::from_str(&*data).expect("Failed to parse main config file");
log_builder(&cfo);
cfo.hc_method = cfo.hc_method.to_uppercase();
for (k, v) in cfg {
reply.insert(k.to_string(), v.to_string());
@@ -139,5 +168,59 @@ pub fn parce_main_config(path: &str) -> AppConfig {
cfo.local_server = Option::from((ip.to_string(), port));
}
}
if let Some(tlsport_cfg) = cfo.proxy_address_tls.clone() {
if let Some((_, port_str)) = tlsport_cfg.split_once(':') {
if let Ok(port) = port_str.parse::<u16>() {
cfo.proxy_port_tls = Some(port);
}
}
};
cfo.proxy_tls_grade = parce_tls_grades(cfo.proxy_tls_grade.clone());
cfo
}
fn parce_tls_grades(what: Option<String>) -> Option<String> {
match what {
Some(g) => match g.to_ascii_lowercase().as_str() {
"high" => {
// info!("TLS grade set to: [ HIGH ]");
Some("high".to_string())
}
"medium" => {
// info!("TLS grade set to: [ MEDIUM ]");
Some("medium".to_string())
}
"unsafe" => {
// info!("TLS grade set to: [ UNSAFE ]");
Some("unsafe".to_string())
}
_ => {
warn!("Error parsing TLS grade, defaulting to: `medium`");
Some("medium".to_string())
}
},
None => {
warn!("TLS grade not set, defaulting to: medium");
Some("b".to_string())
}
}
}
fn log_builder(conf: &AppConfig) {
let log_level = conf.log_level.clone();
unsafe {
match log_level.as_str() {
"info" => env::set_var("RUST_LOG", "info"),
"error" => env::set_var("RUST_LOG", "error"),
"warn" => env::set_var("RUST_LOG", "warn"),
"debug" => env::set_var("RUST_LOG", "debug"),
"trace" => env::set_var("RUST_LOG", "trace"),
"off" => env::set_var("RUST_LOG", "off"),
_ => {
println!("Error reading log level, defaulting to: INFO");
env::set_var("RUST_LOG", "info")
}
}
}
env_logger::builder().init();
}

30
src/utils/state.rs Normal file
View File

@@ -0,0 +1,30 @@
use once_cell::sync::Lazy;
use std::sync::RwLock;
#[derive(Debug)]
pub struct SharedState {
pub first_run: bool,
}
pub static GLOBAL_STATE: Lazy<RwLock<SharedState>> = Lazy::new(|| RwLock::new(SharedState { first_run: true }));
pub fn mark_not_first_run() {
let mut state = GLOBAL_STATE.write().unwrap();
state.first_run = false;
}
pub fn is_first_run() -> bool {
let state = GLOBAL_STATE.read().unwrap();
state.first_run
}
/*
impl SharedState {
pub fn mark_first_run(&mut self) {
self.first_run = false;
}
pub fn is_first_run(&self) -> bool {
self.first_run
}
}
*/

View File

@@ -3,67 +3,129 @@ use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::sync::atomic::AtomicUsize;
pub type UpstreamsDashMap = DashMap<String, DashMap<String, (Vec<(String, u16, bool, bool)>, AtomicUsize)>>;
pub type UpstreamsIdMap = DashMap<String, (String, u16, bool, bool)>;
pub type UpstreamsDashMap = DashMap<String, DashMap<String, (Vec<InnerMap>, AtomicUsize)>>;
pub type UpstreamsIdMap = DashMap<String, InnerMap>;
pub type Headers = DashMap<String, DashMap<String, Vec<(String, String)>>>;
#[derive(Debug, Clone, Serialize, Deserialize)]
#[derive(Debug, Default, Clone, Serialize, Deserialize)]
pub struct ServiceMapping {
pub proxy: String,
pub real: String,
pub path: Option<String>,
}
#[derive(Clone, Debug)]
#[derive(Clone, Debug, Default)]
pub struct Extraparams {
pub stickysessions: bool,
pub sticky_sessions: bool,
pub to_https: Option<bool>,
pub authentication: DashMap<String, Vec<String>>,
pub rate_limit: Option<isize>,
}
#[derive(Clone, Default, Debug, Serialize, Deserialize)]
pub struct Kubernetes {
pub servers: Option<Vec<String>>,
pub services: Option<Vec<ServiceMapping>>,
pub tokenpath: Option<String>,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
#[derive(Clone, Default, Debug, Serialize, Deserialize)]
pub struct Consul {
pub servers: Option<Vec<String>>,
pub services: Option<Vec<ServiceMapping>>,
pub token: Option<String>,
}
#[derive(Debug, Serialize, Deserialize)]
#[derive(Debug, Default, Serialize, Deserialize)]
pub struct Config {
pub provider: String,
pub stickysessions: bool,
pub sticky_sessions: bool,
pub to_https: Option<bool>,
#[serde(default)]
pub upstreams: Option<HashMap<String, HostConfig>>,
#[serde(default)]
pub globals: Option<HashMap<String, Vec<String>>>,
#[serde(default)]
pub headers: Option<Vec<String>>,
#[serde(default)]
pub authorization: Option<HashMap<String, String>>,
#[serde(default)]
pub consul: Option<Consul>,
#[serde(default)]
pub kubernetes: Option<Kubernetes>,
#[serde(default)]
pub rate_limit: Option<isize>,
}
#[derive(Debug, Serialize, Deserialize)]
#[derive(Debug, Default, Serialize, Deserialize)]
pub struct HostConfig {
pub paths: HashMap<String, PathConfig>,
pub rate_limit: Option<isize>,
}
#[derive(Debug, Serialize, Deserialize)]
#[derive(Debug, Default, Serialize, Deserialize)]
pub struct PathConfig {
pub servers: Vec<String>,
pub to_https: Option<bool>,
pub headers: Option<Vec<String>>,
pub rate_limit: Option<isize>,
pub healthcheck: Option<bool>,
}
#[derive(Debug)]
#[derive(Debug, Default)]
pub struct Configuration {
pub upstreams: UpstreamsDashMap,
pub headers: Headers,
pub consul: Option<Consul>,
pub kubernetes: Option<Kubernetes>,
pub typecfg: String,
pub extraparams: Extraparams,
}
#[derive(Debug, Deserialize)]
#[derive(Debug, Default, Serialize, Deserialize)]
pub struct AppConfig {
pub hc_interval: u16,
pub hc_method: String,
pub upstreams_conf: String,
pub log_level: String,
pub master_key: String,
pub config_address: String,
pub proxy_address_http: String,
pub master_key: String,
pub config_api_enabled: bool,
pub config_tls_address: Option<String>,
pub config_tls_certificate: Option<String>,
pub config_tls_key_file: Option<String>,
pub proxy_address_tls: Option<String>,
pub tls_certificate: Option<String>,
pub tls_key_file: Option<String>,
pub proxy_port_tls: Option<u16>,
pub local_server: Option<(String, u16)>,
pub proxy_certificates: Option<String>,
pub proxy_tls_grade: Option<String>,
pub file_server_address: Option<String>,
pub file_server_folder: Option<String>,
pub runuser: Option<String>,
pub rungroup: Option<String>,
}
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct InnerMap {
pub address: String,
pub port: u16,
pub is_ssl: bool,
pub is_http2: bool,
pub to_https: bool,
pub rate_limit: Option<isize>,
pub healthcheck: Option<bool>,
}
#[allow(dead_code)]
impl InnerMap {
pub fn new() -> Self {
Self {
address: Default::default(),
port: Default::default(),
is_ssl: Default::default(),
is_http2: Default::default(),
to_https: Default::default(),
rate_limit: Default::default(),
healthcheck: Default::default(),
}
}
}

254
src/utils/tls.rs Normal file
View File

@@ -0,0 +1,254 @@
use dashmap::DashMap;
use log::{error, info, warn};
use pingora::tls::ssl::{select_next_proto, AlpnError, NameType, SniError, SslAlert, SslContext, SslFiletype, SslMethod, SslRef, SslVersion};
use pingora_core::listeners::tls::TlsSettings;
use rustls_pemfile::{read_one, Item};
use serde::Deserialize;
use std::collections::HashSet;
use std::fs::File;
use std::io::BufReader;
use x509_parser::extensions::GeneralName;
use x509_parser::nom::Err as NomErr;
use x509_parser::prelude::*;
#[derive(Clone, Deserialize, Debug)]
pub struct CertificateConfig {
pub cert_path: String,
pub key_path: String,
}
#[derive(Debug)]
struct CertificateInfo {
common_names: Vec<String>,
alt_names: Vec<String>,
ssl_context: SslContext,
#[allow(dead_code)]
cert_path: String, // Only used for logging
#[allow(dead_code)]
key_path: String, // Only used for logging
}
#[derive(Debug)]
pub struct Certificates {
configs: Vec<CertificateInfo>,
name_map: DashMap<String, SslContext>,
pub default_cert_path: String,
pub default_key_path: String,
}
impl Certificates {
pub fn new(configs: &Vec<CertificateConfig>, _grade: &str) -> Option<Self> {
let default_cert = configs.first().expect("At least one TLS certificate required");
let mut cert_infos = Vec::new();
let name_map: DashMap<String, SslContext> = DashMap::new();
for config in configs {
let cert_info = load_cert_info(&config.cert_path, &config.key_path, _grade);
match cert_info {
Some(cert) => {
for name in &cert.common_names {
name_map.insert(name.clone(), cert.ssl_context.clone());
}
for name in &cert.alt_names {
name_map.insert(name.clone(), cert.ssl_context.clone());
}
cert_infos.push(cert)
}
None => {
error!("Unable to load certificate info | public: {}, private: {}", &config.cert_path, &config.key_path);
return None;
}
}
}
Some(Self {
name_map: name_map,
configs: cert_infos,
default_cert_path: default_cert.cert_path.clone(),
default_key_path: default_cert.key_path.clone(),
})
}
fn find_ssl_context(&self, server_name: &str) -> Option<SslContext> {
if let Some(ctx) = self.name_map.get(server_name) {
return Some(ctx.clone());
}
for config in &self.configs {
for name in &config.common_names {
if name.starts_with("*.") && server_name.ends_with(&name[1..]) {
return Some(config.ssl_context.clone());
}
}
for name in &config.alt_names {
if name.starts_with("*.") && server_name.ends_with(&name[1..]) {
return Some(config.ssl_context.clone());
}
}
}
None
}
pub fn server_name_callback(&self, ssl_ref: &mut SslRef, ssl_alert: &mut SslAlert) -> Result<(), SniError> {
let server_name = ssl_ref.servername(NameType::HOST_NAME);
log::debug!("TLS connect: server_name = {:?}, ssl_ref = {:?}, ssl_alert = {:?}", server_name, ssl_ref, ssl_alert);
// let start_time = Instant::now();
if let Some(name) = server_name {
match self.find_ssl_context(name) {
Some(ctx) => {
ssl_ref.set_ssl_context(&*ctx).map_err(|_| SniError::ALERT_FATAL)?;
}
None => {
log::debug!("No matching server name found");
}
}
}
// println!("Context ==> {:?} <==", start_time.elapsed());
Ok(())
}
}
fn load_cert_info(cert_path: &str, key_path: &str, _grade: &str) -> Option<CertificateInfo> {
let mut common_names = HashSet::new();
let mut alt_names = HashSet::new();
let file = File::open(cert_path);
match file {
Err(e) => {
log::error!("Failed to open certificate file: {:?}", e);
return None;
}
Ok(file) => {
let mut reader = BufReader::new(file);
match read_one(&mut reader) {
Err(e) => {
log::error!("Failed to decode PEM from certificate file: {:?}", e);
return None;
}
Ok(leaf) => match leaf {
Some(Item::X509Certificate(cert)) => match X509Certificate::from_der(&cert) {
Err(NomErr::Error(e)) | Err(NomErr::Failure(e)) => {
log::error!("Failed to parse certificate: {:?}", e);
return None;
}
Err(_) => {
log::error!("Unknown error while parsing certificate");
return None;
}
Ok((_, x509)) => {
let subject = x509.subject();
for attr in subject.iter_common_name() {
if let Ok(cn) = attr.as_str() {
common_names.insert(cn.to_string());
}
}
if let Ok(Some(san)) = x509.subject_alternative_name() {
for name in san.value.general_names.iter() {
if let GeneralName::DNSName(dns) = name {
let dns_string = dns.to_string();
if !common_names.contains(&dns_string) {
alt_names.insert(dns_string);
}
}
}
}
}
},
_ => {
log::error!("Failed to read certificate");
return None;
}
},
}
}
}
if let Ok(ssl_context) = create_ssl_context(cert_path, key_path) {
Some(CertificateInfo {
cert_path: cert_path.to_string(),
key_path: key_path.to_string(),
common_names: common_names.into_iter().collect(),
alt_names: alt_names.into_iter().collect(),
ssl_context,
})
} else {
log::error!("Failed to create SSL context from cert paths");
None
}
}
fn create_ssl_context(cert_path: &str, key_path: &str) -> Result<SslContext, Box<dyn std::error::Error>> {
let mut ctx = SslContext::builder(SslMethod::tls())?;
ctx.set_certificate_chain_file(cert_path)?;
ctx.set_private_key_file(key_path, SslFiletype::PEM)?;
ctx.set_alpn_select_callback(prefer_h2);
let built = ctx.build();
Ok(built)
}
#[derive(Debug)]
pub struct CipherSuite {
pub high: &'static str,
pub medium: &'static str,
pub legacy: &'static str,
}
const CIPHERS: CipherSuite = CipherSuite {
high: "TLS_AES_256_GCM_SHA384:TLS_CHACHA20_POLY1305_SHA256:TLS_AES_128_GCM_SHA256:ECDHE-ECDSA-AES256-GCM-SHA384:ECDHE-RSA-AES256-GCM-SHA384:ECDHE-ECDSA-CHACHA20-POLY1305:ECDHE-RSA-CHACHA20-POLY1305",
medium: "ECDHE-RSA-AES128-SHA:ECDHE-ECDSA-AES128-SHA:AES128-GCM-SHA256",
legacy: "ALL:!ADH:!LOW:!EXP:!MD5:@STRENGTH",
};
#[derive(Debug)]
pub enum TlsGrade {
HIGH,
MEDIUM,
LEGACY,
}
impl TlsGrade {
pub fn from_str(s: &str) -> Option<Self> {
match s.to_ascii_lowercase().as_str() {
"high" => Some(TlsGrade::HIGH),
"medium" => Some(TlsGrade::MEDIUM),
"unsafe" => Some(TlsGrade::LEGACY),
_ => None,
}
}
}
pub fn prefer_h2<'a>(_ssl: &mut SslRef, alpn_in: &'a [u8]) -> Result<&'a [u8], AlpnError> {
match select_next_proto("\x02h2\x08http/1.1".as_bytes(), alpn_in) {
Some(p) => Ok(p),
_ => Err(AlpnError::NOACK),
}
}
pub fn set_tsl_grade(tls_settings: &mut TlsSettings, grade: &str) {
let config_grade = TlsGrade::from_str(grade);
match config_grade {
Some(TlsGrade::HIGH) => {
let _ = tls_settings.set_min_proto_version(Some(SslVersion::TLS1_2));
// let _ = tls_settings.set_max_proto_version(Some(SslVersion::TLS1_3));
let _ = tls_settings.set_cipher_list(CIPHERS.high);
let _ = tls_settings.set_ciphersuites(CIPHERS.high);
info!("TLS grade: {:?}, => HIGH", tls_settings.options());
}
Some(TlsGrade::MEDIUM) => {
let _ = tls_settings.set_min_proto_version(Some(SslVersion::TLS1));
let _ = tls_settings.set_cipher_list(CIPHERS.medium);
let _ = tls_settings.set_ciphersuites(CIPHERS.medium);
info!("TLS grade: {:?}, => MEDIUM", tls_settings.options());
}
Some(TlsGrade::LEGACY) => {
let _ = tls_settings.set_min_proto_version(Some(SslVersion::SSL3));
let _ = tls_settings.set_cipher_list(CIPHERS.legacy);
let _ = tls_settings.set_ciphersuites(CIPHERS.legacy);
warn!("TLS grade: {:?}, => UNSAFE", tls_settings.options());
}
None => {
// Defaults to MEDIUM
let _ = tls_settings.set_min_proto_version(Some(SslVersion::TLS1));
let _ = tls_settings.set_cipher_list(CIPHERS.medium);
let _ = tls_settings.set_ciphersuites(CIPHERS.medium);
warn!("TLS grade is not detected defaulting top MEDIUM");
}
}
}

View File

@@ -1,10 +1,22 @@
use crate::utils::structs::{UpstreamsDashMap, UpstreamsIdMap};
use crate::utils::structs::{InnerMap, UpstreamsDashMap, UpstreamsIdMap};
use crate::utils::tls;
use crate::utils::tls::CertificateConfig;
use dashmap::DashMap;
use log::{error, info};
use notify::{event::ModifyKind, Config, EventKind, RecommendedWatcher, RecursiveMode, Watcher};
use port_check::is_port_reachable;
use privdrop::PrivDrop;
use sha2::{Digest, Sha256};
use std::any::type_name;
use std::collections::HashSet;
use std::collections::{HashMap, HashSet};
use std::fmt::Write;
use std::net::SocketAddr;
use std::os::unix::fs::MetadataExt;
use std::str::FromStr;
use std::sync::atomic::AtomicUsize;
use std::sync::mpsc::{channel, Sender};
use std::time::{Duration, Instant};
use std::{fs, process, thread, time};
#[allow(dead_code)]
pub fn print_upstreams(upstreams: &UpstreamsDashMap) {
@@ -14,10 +26,12 @@ pub fn print_upstreams(upstreams: &UpstreamsDashMap) {
for path_entry in host_entry.value().iter() {
let path = path_entry.key();
println!(" Path: {}", path);
for (ip, port, ssl, vers) in path_entry.value().0.clone() {
println!(" ===> IP: {}, Port: {}, SSL: {}, H2: {}", ip, port, ssl, vers);
println!(" Path: {}", path);
for f in path_entry.value().0.clone() {
println!(
" IP: {}, Port: {}, SSL: {}, H2: {}, To HTTPS: {}",
f.address, f.port, f.is_ssl, f.is_http2, f.to_https
);
}
}
}
@@ -133,16 +147,119 @@ pub fn clone_idmap_into(original: &UpstreamsDashMap, cloned: &UpstreamsIdMap) {
let new_vec = vec.clone();
for x in vec.iter() {
let mut id = String::new();
write!(&mut id, "{}:{}:{}", x.0, x.1, x.2).unwrap();
write!(&mut id, "{}:{}:{}", x.address, x.port, x.is_ssl).unwrap();
let mut hasher = Sha256::new();
hasher.update(id.clone().into_bytes());
let hash = hasher.finalize();
let hex_hash = base16ct::lower::encode_string(&hash);
let hh = hex_hash[0..50].to_string();
cloned.insert(id, (hh.clone(), 0000, false, false));
let to_add = InnerMap {
address: hh.clone(),
port: 0,
is_ssl: false,
is_http2: false,
to_https: false,
rate_limit: None,
healthcheck: None,
};
cloned.insert(id, to_add);
cloned.insert(hh, x.to_owned());
}
new_inner_map.insert(path.clone(), new_vec);
}
}
}
pub fn listdir(dir: String) -> Vec<tls::CertificateConfig> {
let mut f = HashMap::new();
let mut certificate_configs: Vec<tls::CertificateConfig> = vec![];
let paths = fs::read_dir(dir).unwrap();
for path in paths {
let path_str = path.unwrap().path().to_str().unwrap().to_owned();
if path_str.ends_with(".crt") {
let name = path_str.replace(".crt", "");
let mut inner = vec![];
let domain = name.split("/").collect::<Vec<&str>>();
inner.push(name.clone() + ".crt");
inner.push(name.clone() + ".key");
f.insert(domain[domain.len() - 1].to_owned(), inner);
let y = CertificateConfig {
cert_path: name.clone() + ".crt",
key_path: name.clone() + ".key",
};
certificate_configs.push(y);
}
}
for (_, v) in f.iter() {
let y = CertificateConfig {
cert_path: v[0].clone(),
key_path: v[1].clone(),
};
certificate_configs.push(y);
}
certificate_configs
}
pub fn watch_folder(path: String, sender: Sender<Vec<CertificateConfig>>) -> notify::Result<()> {
let (tx, rx) = channel();
let mut watcher = RecommendedWatcher::new(tx, Config::default())?;
watcher.watch(path.as_ref(), RecursiveMode::Recursive)?;
info!("Watching for certificates in : {}", path);
let certificate_configs = listdir(path.clone());
sender.send(certificate_configs)?;
let mut start = Instant::now();
loop {
match rx.recv_timeout(Duration::from_secs(1)) {
Ok(Ok(event)) => match &event.kind {
EventKind::Modify(ModifyKind::Data(_)) | EventKind::Create(_) | EventKind::Remove(_) => {
if start.elapsed() > Duration::from_secs(1) {
start = Instant::now();
let certificate_configs = listdir(path.clone());
sender.send(certificate_configs)?;
info!("Certificate changed: {:?}, {:?}", event.kind, event.paths);
}
}
_ => {}
},
Ok(Err(e)) => error!("Watch error: {:?}", e),
Err(_) => {}
}
}
}
pub fn drop_priv(user: String, group: String, http_addr: String, tls_addr: Option<String>) {
thread::sleep(time::Duration::from_millis(10));
loop {
thread::sleep(time::Duration::from_millis(10));
if is_port_reachable(http_addr.clone()) {
break;
}
}
if let Some(tls_addr) = tls_addr {
loop {
thread::sleep(time::Duration::from_millis(10));
if is_port_reachable(tls_addr.clone()) {
break;
}
}
}
info!("Dropping ROOT privileges to: {}:{}", user, group);
if let Err(e) = PrivDrop::default().user(user).group(group).apply() {
error!("Failed to drop privileges: {}", e);
process::exit(1)
}
}
pub fn check_priv(addr: &str) {
let port = SocketAddr::from_str(addr).map(|sa| sa.port()).unwrap();
match port < 1024 {
true => {
let meta = std::fs::metadata("/proc/self").map(|m| m.uid()).unwrap();
if meta != 0 {
error!("Running on privileged port requires to start as ROOT");
process::exit(1)
}
}
false => {}
}
}

View File

@@ -1,4 +1,5 @@
use crate::utils::discovery::{APIUpstreamProvider, ConsulProvider, Discovery, FromFileProvider};
use crate::utils::discovery::{APIUpstreamProvider, ConsulProvider, Discovery, FromFileProvider, KubernetesProvider};
use crate::utils::parceyaml::load_configuration;
use crate::utils::structs::Configuration;
use crate::utils::tools::*;
use crate::utils::*;
@@ -6,8 +7,8 @@ use crate::web::proxyhttp::LB;
use async_trait::async_trait;
use dashmap::DashMap;
use futures::channel::mpsc;
use futures::StreamExt;
use log::info;
use futures::{SinkExt, StreamExt};
use log::{error, info};
use pingora_core::server::ShutdownWatch;
use pingora_core::services::background::BackgroundService;
use std::sync::Arc;
@@ -15,27 +16,50 @@ use std::sync::Arc;
#[async_trait]
impl BackgroundService for LB {
async fn start(&self, mut shutdown: ShutdownWatch) {
info!("Starting background service");
let (tx, mut rx) = mpsc::channel::<Configuration>(0);
info!("Starting background service"); // tx: Sender<Configuration>
let (mut tx, mut rx) = mpsc::channel::<Configuration>(1);
let tx_api = tx.clone();
let config = load_configuration(self.config.upstreams_conf.clone().as_str(), "filepath")
.await
.expect("Failed to load configuration");
let tx_file = tx.clone();
let tx_consul = tx.clone();
let file_load = FromFileProvider {
path: self.config.upstreams_conf.clone(),
};
let consul_load = ConsulProvider {
path: self.config.upstreams_conf.clone(),
};
let _ = tokio::spawn(async move { file_load.start(tx_file).await });
let _ = tokio::spawn(async move { consul_load.start(tx_consul).await });
match config.typecfg.as_str() {
"file" => {
info!("Running File discovery, requested type is: {}", config.typecfg);
tx.send(config).await.unwrap();
let file_load = FromFileProvider {
path: self.config.upstreams_conf.clone(),
};
let _ = tokio::spawn(async move { file_load.start(tx).await });
}
"kubernetes" => {
info!("Running Kubernetes discovery, requested type is: {}", config.typecfg);
let cf = Arc::from(config);
let kuber_load = KubernetesProvider { config: cf.clone() };
let _ = tokio::spawn(async move { kuber_load.start(tx).await });
}
"consul" => {
info!("Running Consul discovery, requested type is: {}", config.typecfg);
let cf = Arc::from(config);
let consul_load = ConsulProvider { config: cf.clone() };
let _ = tokio::spawn(async move { consul_load.start(tx).await });
}
_ => {
error!("Unknown discovery type: {}", config.typecfg);
}
}
let api_load = APIUpstreamProvider {
address: self.config.config_address.clone(),
masterkey: self.config.master_key.clone(),
config_api_enabled: self.config.config_api_enabled.clone(),
tls_address: self.config.config_tls_address.clone(),
tls_certificate: self.config.config_tls_certificate.clone(),
tls_key_file: self.config.config_tls_key_file.clone(),
file_server_address: self.config.file_server_address.clone(),
file_server_folder: self.config.file_server_folder.clone(),
};
let tx_api = tx.clone();
// let tx_api = tx.clone();
let _ = tokio::spawn(async move { api_load.start(tx_api).await });
let uu = self.ump_upst.clone();
@@ -56,8 +80,10 @@ impl BackgroundService for LB {
clone_dashmap_into(&ss.upstreams, &self.ump_upst);
let current = self.extraparams.load_full();
let mut new = (*current).clone();
new.stickysessions = ss.extraparams.stickysessions;
new.sticky_sessions = ss.extraparams.sticky_sessions;
new.to_https = ss.extraparams.to_https;
new.authentication = ss.extraparams.authentication.clone();
new.rate_limit = ss.extraparams.rate_limit;
self.extraparams.store(Arc::new(new));
self.headers.clear();
@@ -79,8 +105,8 @@ impl BackgroundService for LB {
}
}
}
info!("Upstreams list is changed, updating to:");
print_upstreams(&self.ump_full);
// info!("Upstreams list is changed, updating to:");
// print_upstreams(&self.ump_full);
}
None => {}
}

View File

@@ -1,15 +1,16 @@
use crate::utils::structs::InnerMap;
use crate::web::proxyhttp::LB;
use async_trait::async_trait;
use std::sync::atomic::Ordering;
#[async_trait]
pub trait GetHost {
fn get_host(&self, peer: &str, path: &str, backend_id: Option<&str>) -> Option<(String, u16, bool, bool)>;
fn get_host(&self, peer: &str, path: &str, backend_id: Option<&str>) -> Option<InnerMap>;
fn get_header(&self, peer: &str, path: &str) -> Option<Vec<(String, String)>>;
}
#[async_trait]
impl GetHost for LB {
fn get_host(&self, peer: &str, path: &str, backend_id: Option<&str>) -> Option<(String, u16, bool, bool)> {
fn get_host(&self, peer: &str, path: &str, backend_id: Option<&str>) -> Option<InnerMap> {
if let Some(b) = backend_id {
if let Some(bb) = self.ump_byid.get(b) {
// println!("BIB :===> {:?}", Some(bb.value()));
@@ -19,7 +20,7 @@ impl GetHost for LB {
let host_entry = self.ump_upst.get(peer)?;
let mut current_path = path.to_string();
let mut best_match: Option<(String, u16, bool, bool)> = None;
let mut best_match: Option<InnerMap> = None;
loop {
if let Some(entry) = host_entry.get(&current_path) {
let (servers, index) = entry.value();
@@ -44,7 +45,7 @@ impl GetHost for LB {
}
}
}
// println!("BMT :===> {:?}", best_match);
// println!("Best Match :===> {:?}", best_match);
best_match
}
fn get_header(&self, peer: &str, path: &str) -> Option<Vec<(String, String)>> {

View File

@@ -1,17 +1,26 @@
use crate::utils::auth::authenticate;
use crate::utils::structs::{AppConfig, Extraparams, Headers, UpstreamsDashMap, UpstreamsIdMap};
use crate::utils::metrics::*;
use crate::utils::structs::{AppConfig, Extraparams, Headers, InnerMap, UpstreamsDashMap, UpstreamsIdMap};
use crate::web::gethosts::GetHost;
use arc_swap::ArcSwap;
use async_trait::async_trait;
use log::{debug, warn};
use pingora::http::RequestHeader;
use axum::body::Bytes;
use log::{debug, error, warn};
use once_cell::sync::Lazy;
use pingora::http::{RequestHeader, ResponseHeader, StatusCode};
use pingora::prelude::*;
use pingora::ErrorSource::Upstream;
use pingora_core::listeners::ALPN;
use pingora_core::prelude::HttpPeer;
use pingora_http::ResponseHeader;
use pingora_limits::rate::Rate;
use pingora_proxy::{ProxyHttp, Session};
use std::sync::Arc;
use std::time::Duration;
use tokio::time::Instant;
static RATE_LIMITER: Lazy<Rate> = Lazy::new(|| Rate::new(Duration::from_secs(1)));
#[derive(Clone)]
pub struct LB {
pub ump_upst: Arc<UpstreamsDashMap>,
pub ump_full: Arc<UpstreamsDashMap>,
@@ -23,119 +32,187 @@ pub struct LB {
pub struct Context {
backend_id: String,
to_https: bool,
redirect_to: String,
start_time: Instant,
hostname: Option<String>,
upstream_peer: Option<InnerMap>,
extraparams: arc_swap::Guard<Arc<Extraparams>>,
}
#[async_trait]
impl ProxyHttp for LB {
// type CTX = ();
// fn new_ctx(&self) -> Self::CTX {}
type CTX = Context;
fn new_ctx(&self) -> Self::CTX {
Context { backend_id: String::new() }
Context {
backend_id: String::new(),
to_https: false,
redirect_to: String::new(),
start_time: Instant::now(),
hostname: None,
upstream_peer: None,
extraparams: self.extraparams.load(),
}
}
async fn request_filter(&self, session: &mut Session, _ctx: &mut Self::CTX) -> Result<bool> {
if let Some(auth) = self.extraparams.load().authentication.get("authorization") {
let ep = _ctx.extraparams.clone();
if let Some(auth) = ep.authentication.get("authorization") {
let authenticated = authenticate(&auth.value(), &session);
if !authenticated {
let _ = session.respond_error(401).await;
warn!("Forbidden: {:?}, {}", session.client_addr(), session.req_header().uri.path().to_string());
warn!("Forbidden: {:?}, {}", session.client_addr(), session.req_header().uri.path());
return Ok(true);
}
};
// if session.req_header().uri.path().starts_with("/denied") {
// let _ = session.respond_error(403).await;
// warn!("Forbidden: {:?}, {}", session.client_addr(), session.req_header().uri.path().to_string());
// return Ok(true);
// };
Ok(false)
}
async fn upstream_peer(&self, session: &mut Session, _ctx: &mut Self::CTX) -> Result<Box<HttpPeer>> {
let host_name = return_header_host(&session);
match host_name {
Some(hostname) => {
// session.req_header_mut().headers.insert("X-Host-Name", host.to_string().parse().unwrap());
let mut backend_id = None;
if self.extraparams.load().stickysessions {
if let Some(cookies) = session.req_header().headers.get("cookie") {
if let Ok(cookie_str) = cookies.to_str() {
for cookie in cookie_str.split(';') {
let trimmed = cookie.trim();
if let Some(value) = trimmed.strip_prefix("backend_id=") {
backend_id = Some(value);
break;
}
let hostname = return_header_host(&session);
_ctx.hostname = hostname;
let mut backend_id = None;
if ep.sticky_sessions {
if let Some(cookies) = session.req_header().headers.get("cookie") {
if let Ok(cookie_str) = cookies.to_str() {
for cookie in cookie_str.split(';') {
let trimmed = cookie.trim();
if let Some(value) = trimmed.strip_prefix("backend_id=") {
backend_id = Some(value);
break;
}
}
}
}
}
match _ctx.hostname.as_ref() {
None => return Ok(false),
Some(host) => {
// let optioninnermap = self.get_host(host.as_str(), host.as_str(), backend_id);
let optioninnermap = self.get_host(host.as_str(), session.req_header().uri.path(), backend_id);
match optioninnermap {
None => return Ok(false),
Some(ref innermap) => {
if let Some(rate) = innermap.rate_limit.or(ep.rate_limit) {
// let rate_key = session.client_addr().and_then(|addr| addr.as_inet()).map(|inet| inet.ip().to_string()).unwrap_or_else(|| host.to_string());
let rate_key = session.client_addr().and_then(|addr| addr.as_inet()).map(|inet| inet.ip());
let curr_window_requests = RATE_LIMITER.observe(&rate_key, 1);
if curr_window_requests > rate {
let mut header = ResponseHeader::build(429, None).unwrap();
header.insert_header("X-Rate-Limit-Limit", rate.to_string()).unwrap();
header.insert_header("X-Rate-Limit-Remaining", "0").unwrap();
header.insert_header("X-Rate-Limit-Reset", "1").unwrap();
session.set_keepalive(None);
session.write_response_header(Box::new(header), true).await?;
debug!("Rate limited: {:?}, {}", rate_key, rate);
return Ok(true);
}
}
}
}
let ddr = self.get_host(hostname, hostname, backend_id);
match ddr {
Some((address, port, ssl, is_h2)) => {
let mut peer = Box::new(HttpPeer::new((address.clone(), port.clone()), ssl, String::new()));
_ctx.upstream_peer = optioninnermap;
}
}
Ok(false)
}
async fn upstream_peer(&self, session: &mut Session, ctx: &mut Self::CTX) -> Result<Box<HttpPeer>> {
// let host_name = return_header_host(&session);
match ctx.hostname.as_ref() {
Some(hostname) => {
match ctx.upstream_peer.as_ref() {
// Some((address, port, ssl, is_h2, to_https)) => {
Some(innermap) => {
let mut peer = Box::new(HttpPeer::new((innermap.address.clone(), innermap.port.clone()), innermap.is_ssl, String::new()));
// if session.is_http2() {
if is_h2 {
if innermap.is_http2 {
peer.options.alpn = ALPN::H2;
}
if ssl {
peer.sni = hostname.to_string();
if innermap.is_ssl {
peer.sni = hostname.clone();
peer.options.verify_cert = false;
peer.options.verify_hostname = false;
}
// println!(" ==> {} ==> {} => {} => {:?}", hostname, address.as_str(), peer.options.alpn, is_h2);
_ctx.backend_id = format!("{}:{}:{}", address.clone(), port.clone(), ssl);
if ctx.to_https || innermap.to_https {
if let Some(stream) = session.stream() {
if stream.get_ssl().is_none() {
if let Some(addr) = session.server_addr() {
if let Some((host, _)) = addr.to_string().split_once(':') {
let uri = session.req_header().uri.path_and_query().map_or("/", |pq| pq.as_str());
let port = self.config.proxy_port_tls.unwrap_or(403);
ctx.to_https = true;
ctx.redirect_to = format!("https://{}:{}{}", host, port, uri);
}
}
}
}
}
ctx.backend_id = format!("{}:{}:{}", innermap.address.clone(), innermap.port.clone(), innermap.is_ssl);
Ok(peer)
}
None => {
warn!("Upstream not found. Host: {:?}, Path: {}", hostname, session.req_header().uri);
Ok(return_no_host(&self.config.local_server))
if let Err(e) = session.respond_error_with_body(502, Bytes::from("502 Bad Gateway\n")).await {
error!("Failed to send error response: {:?}", e);
}
Err(Box::new(Error {
etype: HTTPStatus(502),
esource: Upstream,
retry: RetryType::Decided(false),
cause: None,
context: Option::from(ImmutStr::Static("Upstream not found")),
}))
}
}
}
None => {
warn!("Upstream not found. Host: {:?}, Path: {}", host_name, session.req_header().uri);
Ok(return_no_host(&self.config.local_server))
// session.respond_error_with_body(502, Bytes::from("502 Bad Gateway\n")).await.expect("Failed to send error");
if let Err(e) = session.respond_error_with_body(502, Bytes::from("502 Bad Gateway\n")).await {
error!("Failed to send error response: {:?}", e);
}
Err(Box::new(Error {
etype: HTTPStatus(502),
esource: Upstream,
retry: RetryType::Decided(false),
cause: None,
context: None,
}))
}
}
}
async fn upstream_request_filter(&self, _session: &mut Session, _upstream_request: &mut RequestHeader, _ctx: &mut Self::CTX) -> Result<()> {
let clientip = _session.client_addr();
match clientip {
Some(ip) => {
let inet = ip.as_inet();
match inet {
Some(addr) => {
_upstream_request
.insert_header("X-Forwarded-For", addr.to_string().split(':').collect::<Vec<&str>>()[0])
.unwrap();
}
None => warn!("Malformed Client IP: {:?}", inet),
}
}
None => {
warn!("Cannot detect client IP");
}
async fn upstream_request_filter(&self, _session: &mut Session, upstream_request: &mut RequestHeader, ctx: &mut Self::CTX) -> Result<()> {
if let Some(hostname) = ctx.hostname.as_ref() {
upstream_request.insert_header("Host", hostname)?;
}
if let Some(peer) = ctx.upstream_peer.as_ref() {
upstream_request.insert_header("X-Forwarded-For", peer.address.as_str())?;
}
Ok(())
}
async fn response_filter(&self, _session: &mut Session, _upstream_response: &mut ResponseHeader, _ctx: &mut Self::CTX) -> Result<()> {
// async fn request_body_filter(&self, _session: &mut Session, _body: &mut Option<Bytes>, _end_of_stream: bool, _ctx: &mut Self::CTX) -> Result<()>
// where
// Self::CTX: Send + Sync,
// {
// Ok(())
// }
async fn response_filter(&self, session: &mut Session, _upstream_response: &mut ResponseHeader, ctx: &mut Self::CTX) -> Result<()> {
// _upstream_response.insert_header("X-Proxied-From", "Fooooooooooooooo").unwrap();
if self.extraparams.load().stickysessions {
let backend_id = _ctx.backend_id.clone();
if ctx.extraparams.sticky_sessions {
let backend_id = ctx.backend_id.clone();
if let Some(bid) = self.ump_byid.get(&backend_id) {
// let _ = _upstream_response.insert_header("set-cookie", format!("backend {}", bid.0));
let _ = _upstream_response.insert_header("set-cookie", format!("backend_id={}; Path=/; Max-Age=600; HttpOnly; SameSite=Lax", bid.0));
let _ = _upstream_response.insert_header("set-cookie", format!("backend_id={}; Path=/; Max-Age=600; HttpOnly; SameSite=Lax", bid.address));
}
}
let host_name = return_header_host(&_session);
match host_name {
if ctx.to_https {
let mut redirect_response = ResponseHeader::build(StatusCode::MOVED_PERMANENTLY, None)?;
redirect_response.insert_header("Location", ctx.redirect_to.clone())?;
redirect_response.insert_header("Content-Length", "0")?;
session.write_response_header(Box::new(redirect_response), false).await?;
}
match ctx.hostname.as_ref() {
Some(host) => {
let path = _session.req_header().uri.path();
let path = session.req_header().uri.path();
let host_header = host;
let split_header = host_header.split_once(':');
match split_header {
@@ -159,35 +236,36 @@ impl ProxyHttp for LB {
}
None => {}
}
session.set_keepalive(Some(300));
Ok(())
}
async fn logging(&self, session: &mut Session, _e: Option<&pingora::Error>, ctx: &mut Self::CTX) {
let response_code = session.response_written().map_or(0, |resp| resp.status.as_u16());
debug!("{}, response code: {response_code}", self.request_summary(session, ctx));
let m = &MetricTypes {
method: session.req_header().method.to_string(),
code: session.response_written().map(|resp| resp.status.as_str().to_owned()).unwrap_or("0".to_string()),
latency: ctx.start_time.elapsed(),
version: session.req_header().version,
};
calc_metrics(m);
}
}
fn return_header_host(session: &Session) -> Option<&str> {
fn return_header_host(session: &Session) -> Option<String> {
if session.is_http2() {
match session.req_header().uri.host() {
Some(host) => Option::from(host),
Some(host) => Option::from(host.to_string()),
None => None,
}
} else {
match session.req_header().headers.get("host") {
Some(host) => {
let header_host = host.to_str().unwrap().splitn(2, ':').collect::<Vec<&str>>();
Option::from(header_host[0])
Option::from(header_host[0].to_string())
}
None => None,
}
}
}
fn return_no_host(inp: &Option<(String, u16)>) -> Box<HttpPeer> {
match inp {
Some(t) => Box::new(HttpPeer::new(t, false, String::new())),
None => Box::new(HttpPeer::new(("0.0.0.0", 0), false, String::new())),
}
}

View File

@@ -1,16 +1,22 @@
// use rustls::crypto::ring::default_provider;
use crate::utils::structs::Extraparams;
use crate::utils::tls;
use crate::utils::tls::CertificateConfig;
use crate::utils::tools::*;
use crate::web::proxyhttp::LB;
use arc_swap::ArcSwap;
use ctrlc;
use dashmap::DashMap;
use log::info;
use pingora::tls::ssl::{SslAlert, SslRef};
use pingora_core::listeners::tls::TlsSettings;
use pingora_core::prelude::{background_service, Opt};
use pingora_core::server::Server;
use rustls::crypto::ring::default_provider;
use std::env;
use std::sync::mpsc::{channel, Receiver, Sender};
use std::sync::Arc;
use std::thread;
pub fn run() {
default_provider().install_default().expect("Failed to install rustls crypto provider");
// default_provider().install_default().expect("Failed to install rustls crypto provider");
let parameters = Some(Opt::parse_args()).unwrap();
let file = parameters.conf.clone().unwrap();
let maincfg = crate::utils::parceyaml::parce_main_config(file.as_str());
@@ -22,66 +28,73 @@ pub fn run() {
let ff_config = Arc::new(DashMap::new());
let im_config = Arc::new(DashMap::new());
let hh_config = Arc::new(DashMap::new());
let ec_config = Arc::new(ArcSwap::from_pointee(Extraparams {
stickysessions: false,
sticky_sessions: false,
to_https: None,
authentication: DashMap::new(),
rate_limit: None,
}));
let cfg = Arc::new(maincfg);
let lb = LB {
ump_upst: uf_config.clone(),
ump_full: ff_config.clone(),
ump_byid: im_config.clone(),
ump_upst: uf_config,
ump_full: ff_config,
ump_byid: im_config,
config: cfg.clone(),
headers: hh_config.clone(),
extraparams: ec_config.clone(),
};
let bg = LB {
ump_upst: uf_config.clone(),
ump_full: ff_config.clone(),
ump_byid: im_config.clone(),
config: cfg.clone(),
headers: hh_config.clone(),
extraparams: ec_config.clone(),
headers: hh_config,
extraparams: ec_config,
};
// env_logger::Env::new();
// env_logger::init();
let grade = cfg.proxy_tls_grade.clone().unwrap_or("medium".to_string());
info!("TLS grade set to: [ {} ]", grade);
let log_level = cfg.log_level.clone();
match log_level.as_str() {
"info" => env::set_var("RUST_LOG", "info"),
"error" => env::set_var("RUST_LOG", "error"),
"warn" => env::set_var("RUST_LOG", "warn"),
"debug" => env::set_var("RUST_LOG", "debug"),
"trace" => env::set_var("RUST_LOG", "trace"),
"off" => env::set_var("RUST_LOG", "off"),
_ => {
println!("Error reading log level, defaulting to: INFO");
env::set_var("RUST_LOG", "info")
}
}
env_logger::builder()
// .format_timestamp(None)
// .format_module_path(false)
// .format_source_path(false)
// .format_target(false)
.init();
let bg_srvc = background_service("bgsrvc", bg);
let mut proxy = pingora_proxy::http_proxy_service(&server.configuration, lb);
let bg_srvc = background_service("bgsrvc", lb.clone());
let mut proxy = pingora_proxy::http_proxy_service(&server.configuration, lb.clone());
let bind_address_http = cfg.proxy_address_http.clone();
let bind_address_tls = cfg.proxy_address_tls.clone();
check_priv(bind_address_http.as_str());
match bind_address_tls {
Some(bind_address_tls) => {
info!("Running TLS listener on :{}", bind_address_tls);
let cert_path = cfg.tls_certificate.clone().unwrap();
let key_path = cfg.tls_key_file.clone().unwrap();
let mut tls_settings = pingora_core::listeners::tls::TlsSettings::intermediate(&cert_path, &key_path).unwrap();
tls_settings.enable_h2();
check_priv(bind_address_tls.as_str());
let (tx, rx): (Sender<Vec<CertificateConfig>>, Receiver<Vec<CertificateConfig>>) = channel();
let certs_path = cfg.proxy_certificates.clone().unwrap();
thread::spawn(move || {
watch_folder(certs_path, tx).unwrap();
});
let certificate_configs = rx.recv().unwrap();
let first_set = tls::Certificates::new(&certificate_configs, grade.as_str()).unwrap_or_else(|| panic!("Unable to load initial certificate info"));
let certificates = Arc::new(ArcSwap::from_pointee(first_set));
let certs_for_callback = certificates.clone();
let certs_for_watcher = certificates.clone();
let new_certs = tls::Certificates::new(&certificate_configs, grade.as_str());
certs_for_watcher.store(Arc::new(new_certs.unwrap()));
let mut tls_settings =
TlsSettings::intermediate(&certs_for_callback.load().default_cert_path, &certs_for_callback.load().default_key_path).expect("unable to load or parse cert/key");
tls::set_tsl_grade(&mut tls_settings, grade.as_str());
tls_settings.set_servername_callback(move |ssl_ref: &mut SslRef, ssl_alert: &mut SslAlert| certs_for_callback.load().server_name_callback(ssl_ref, ssl_alert));
tls_settings.set_alpn_select_callback(tls::prefer_h2);
proxy.add_tls_with_settings(&bind_address_tls, None, tls_settings);
let certs_for_watcher = certificates.clone();
thread::spawn(move || {
while let Ok(new_configs) = rx.recv() {
let new_certs = tls::Certificates::new(&new_configs, grade.as_str());
match new_certs {
Some(new_certs) => {
certs_for_watcher.store(Arc::new(new_certs));
}
None => {}
};
}
});
}
None => {}
}
@@ -89,8 +102,15 @@ pub fn run() {
proxy.add_tcp(bind_address_http.as_str());
server.add_service(proxy);
server.add_service(bg_srvc);
// let mut prometheus_service_http = Service::prometheus_http_service();
// prometheus_service_http.add_tcp("0.0.0.0:1234");
// server.add_service(prometheus_service_http);
server.run_forever();
thread::spawn(move || server.run_forever());
if let (Some(user), Some(group)) = (cfg.rungroup.clone(), cfg.runuser.clone()) {
drop_priv(user, group, cfg.proxy_address_http.clone(), cfg.proxy_address_tls.clone());
}
let (tx, rx) = channel();
ctrlc::set_handler(move || tx.send(()).expect("Could not send signal on channel.")).expect("Error setting Ctrl-C handler");
rx.recv().expect("Could not receive from channel.");
info!("Signal received ! Exiting...");
}

View File

@@ -1,21 +1,27 @@
use crate::utils::discovery::APIUpstreamProvider;
use crate::utils::structs::Configuration;
use axum::body::Body;
use axum::extract::State;
use axum::extract::{Query, State};
use axum::http::{Response, StatusCode};
use axum::response::IntoResponse;
use axum::routing::{delete, get, head, post, put};
use axum::routing::{get, post};
use axum::{Json, Router};
use axum_server::tls_openssl::OpenSSLConfig;
use futures::channel::mpsc::Sender;
use futures::SinkExt;
use jsonwebtoken::{encode, EncodingKey, Header};
use log::{error, info, warn};
use prometheus::{gather, Encoder, TextEncoder};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::net::SocketAddr;
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use tokio::net::TcpListener;
use tower_http::services::ServeDir;
#[derive(Deserialize)]
struct InputKey {
masterkey: String,
master_key: String,
owner: String,
valid: u64,
}
@@ -25,51 +31,84 @@ struct OutToken {
token: String,
}
#[allow(unused_mut)]
pub async fn run_server(bindaddress: String, masterkey: String, mut toreturn: Sender<Configuration>) {
let mut tr = toreturn.clone();
let app = Router::new()
.route("/{*wildcard}", get(senderror))
.route("/{*wildcard}", post(senderror))
.route("/{*wildcard}", put(senderror))
.route("/{*wildcard}", head(senderror))
.route("/{*wildcard}", delete(senderror))
.route("/jwt", post(jwt_gen))
.with_state(masterkey.clone())
.route(
"/conf",
post(|up: String| async move {
let serverlist = crate::utils::parceyaml::load_configuration(up.as_str(), "content");
#[derive(Clone)]
struct AppState {
master_key: String,
config_sender: Sender<Configuration>,
config_api_enabled: bool,
}
match serverlist {
Some(serverlist) => {
let _ = tr.send(serverlist).await.unwrap();
Response::builder().status(StatusCode::CREATED).body(Body::from("Config, conf file, updated!\n")).unwrap()
}
None => Response::builder()
.status(StatusCode::INTERNAL_SERVER_ERROR)
.body(Body::from("Failed to parce config file!\n"))
.unwrap(),
}
})
.with_state("state"),
);
let listener = TcpListener::bind(bindaddress.clone()).await.unwrap();
info!("Starting the API server on: {}", bindaddress);
#[allow(unused_mut)]
pub async fn run_server(config: &APIUpstreamProvider, mut to_return: Sender<Configuration>) {
let app_state = AppState {
master_key: config.masterkey.clone(),
config_sender: to_return.clone(),
config_api_enabled: config.config_api_enabled.clone(),
};
let app = Router::new()
// .route("/{*wildcard}", get(senderror))
// .route("/{*wildcard}", post(senderror))
// .route("/{*wildcard}", put(senderror))
// .route("/{*wildcard}", head(senderror))
// .route("/{*wildcard}", delete(senderror))
// .nest_service("/static", static_files)
.route("/jwt", post(jwt_gen))
.route("/conf", post(conf))
.route("/metrics", get(metrics))
.with_state(app_state);
if let Some(value) = &config.tls_address {
let cf = OpenSSLConfig::from_pem_file(config.tls_certificate.clone().unwrap(), config.tls_key_file.clone().unwrap()).unwrap();
let addr: SocketAddr = value.parse().expect("Unable to parse socket address");
let tls_app = app.clone();
tokio::spawn(async move {
if let Err(e) = axum_server::bind_openssl(addr, cf).serve(tls_app.into_make_service()).await {
eprintln!("TLS server failed: {}", e);
}
});
info!("Starting the TLS API server on: {}", value);
}
if let (Some(address), Some(folder)) = (&config.file_server_address, &config.file_server_folder) {
let static_files = ServeDir::new(folder);
let static_serve: Router = Router::new().fallback_service(static_files);
let static_listen = TcpListener::bind(address).await.unwrap();
let _ = tokio::spawn(async move { axum::serve(static_listen, static_serve).await.unwrap() });
}
let listener = TcpListener::bind(config.address.clone()).await.unwrap();
info!("Starting the API server on: {}", config.address);
axum::serve(listener, app).await.unwrap();
}
#[allow(dead_code)]
async fn senderror() -> impl IntoResponse {
Response::builder().status(StatusCode::BAD_GATEWAY).body(Body::from("No live upstream found!\n")).unwrap()
async fn conf(State(mut st): State<AppState>, Query(params): Query<HashMap<String, String>>, content: String) -> impl IntoResponse {
if !st.config_api_enabled {
return Response::builder()
.status(StatusCode::FORBIDDEN)
.body(Body::from("Config remote API is disabled !\n"))
.unwrap();
}
if let Some(s) = params.get("key") {
if s.to_owned() == st.master_key {
if let Some(serverlist) = crate::utils::parceyaml::load_configuration(content.as_str(), "content").await {
st.config_sender.send(serverlist).await.unwrap();
return Response::builder().status(StatusCode::OK).body(Body::from("Config, conf file, updated !\n")).unwrap();
} else {
return Response::builder().status(StatusCode::BAD_GATEWAY).body(Body::from("Failed to parse config!\n")).unwrap();
};
}
}
Response::builder().status(StatusCode::FORBIDDEN).body(Body::from("Access Denied !\n")).unwrap()
}
async fn jwt_gen(State(masterkey): State<String>, Json(payload): Json<InputKey>) -> (StatusCode, Json<OutToken>) {
if payload.masterkey == masterkey {
async fn jwt_gen(State(state): State<AppState>, Json(payload): Json<InputKey>) -> (StatusCode, Json<OutToken>) {
if payload.master_key == state.master_key {
let now = SystemTime::now() + Duration::from_secs(payload.valid * 60);
let a = now.duration_since(UNIX_EPOCH).unwrap().as_secs();
let claim = crate::utils::jwt::Claims { user: payload.owner, exp: a };
match encode(&Header::default(), &claim, &EncodingKey::from_secret(payload.masterkey.as_ref())) {
match encode(&Header::default(), &claim, &EncodingKey::from_secret(payload.master_key.as_ref())) {
Ok(t) => {
let tok = OutToken { token: t };
info!("Generating token: {:?}", tok);
@@ -89,3 +128,28 @@ async fn jwt_gen(State(masterkey): State<String>, Json(payload): Json<InputKey>)
(StatusCode::FORBIDDEN, Json(tok))
}
}
async fn metrics() -> impl IntoResponse {
let metric_families = gather();
let encoder = TextEncoder::new();
let mut buffer = Vec::new();
if let Err(e) = encoder.encode(&metric_families, &mut buffer) {
// encoding error fallback
return Response::builder()
.status(StatusCode::INTERNAL_SERVER_ERROR)
.body(Body::from(format!("Failed to encode metrics: {}", e)))
.unwrap();
}
Response::builder()
.status(StatusCode::OK)
.header("Content-Type", encoder.format_type())
.body(Body::from(buffer))
.unwrap()
}
// #[allow(dead_code)]
// async fn senderror() -> impl IntoResponse {
// Response::builder().status(StatusCode::BAD_GATEWAY).body(Body::from("No live upstream found!\n")).unwrap()
// }