28 Commits

Author SHA1 Message Date
Ara Sadoyan
2b437c65fb Performance improvement. String removal from hot paths. 2026-01-27 16:19:51 +01:00
Ara Sadoyan
38055ae94e added new metric aralez_requests_by_upstream 2026-01-25 18:08:15 +01:00
Ara Sadoyan
703de9e909 updates on API server https://sadoyan.github.io/aralez-docs/assets/api/ 2026-01-22 16:50:51 +01:00
Ara Sadoyan
2c8b01295c Minor subfunction removal 2026-01-21 20:01:16 +01:00
Ara Sadoyan
baebe1c00f Async apply of config via API 2026-01-20 19:16:27 +01:00
Ara Sadoyan
6c1d3c5ef8 Error handling on API server 2026-01-09 18:44:36 +01:00
Ara Sadoyan
2d1a827007 Removed unneeded loop 2025-12-14 12:09:11 +01:00
Ara Sadoyan
a2a5250711 Performance improvements on data types . 2025-12-11 15:21:34 +01:00
Ara Sadoyan
985e923342 to https redirect bug fix 2025-12-11 13:37:40 +01:00
Ara Sadoyan
0fc79c022f perf: optimize header handling and concurrent access patterns 2025-12-10 19:09:04 +01:00
Ara Sadoyan
a43bccdfb8 minor, performance improvements 2025-11-28 13:13:15 +01:00
Ara Sadoyan
5b87391fbb some more type changes, performance improvements 2025-11-27 18:47:04 +01:00
Ara Sadoyan
c68a4ad83d Type changes, performance improvements 2025-11-27 18:03:34 +01:00
Ara Sadoyan
8ba8d32df1 Performance improvements, type changes 2025-11-26 12:12:41 +01:00
Ara Sadoyan
7a839065e6 update on kubernetes web client 2025-11-24 17:57:44 +01:00
Ara Sadoyan
74821654f3 Added support to send custom headers to upstream servers. 2025-11-22 23:18:06 +01:00
Ara Sadoyan
78c83b802f Merge Consul & Kubernetes discovery 2025-10-26 15:26:09 +01:00
Ara Sadoyan
012505b77e Cleaning up the code 2025-10-24 15:27:15 +02:00
Ara Sadoyan
21c4cb0901 Update README.md 2025-10-18 11:49:51 +02:00
Ara Sadoyan
86dd3d3402 README update 2025-10-18 11:48:48 +02:00
Ara Sadoyan
d6b345202b README update 2025-10-17 17:03:45 +02:00
Ara Sadoyan
5209d787e4 README update 2025-10-17 16:44:57 +02:00
Ara Sadoyan
02de5f1c21 Merge remote-tracking branch 'origin/main' 2025-10-16 19:05:15 +02:00
Ara Sadoyan
9519280026 Path filter, and rate limiter for Consul 2025-10-16 19:04:46 +02:00
Ara Sadoyan
e87c60cf4f unifying kubernetes and file provider configs 2025-10-15 19:13:33 +02:00
Ara Sadoyan
25693a7058 Path filtering and rate limit for kubernetes 2025-10-15 13:42:05 +02:00
Ara Sadoyan
3b0b385ec7 Create FUNDING.yml 2025-10-03 11:02:21 +02:00
Ara Sadoyan
5359c2e8e9 Create LICENSE 2025-10-02 11:14:40 +02:00
24 changed files with 1694 additions and 822 deletions

15
.github/FUNDING.yml vendored Normal file
View File

@@ -0,0 +1,15 @@
# These are supported funding model platforms
github: # Replace with up to 4 GitHub Sponsors-enabled usernames e.g., [user1, user2]
patreon: # Replace with a single Patreon username
open_collective: # Replace with a single Open Collective username
ko_fi: # Replace with a single Ko-fi username
tidelift: # Replace with a single Tidelift platform-name/package-name e.g., npm/babel
community_bridge: # Replace with a single Community Bridge project-name e.g., cloud-foundry
liberapay: # Replace with a single Liberapay username
issuehunt: # Replace with a single IssueHunt username
lfx_crowdfunding: # Replace with a single LFX Crowdfunding project-name e.g., cloud-foundry
polar: # Replace with a single Polar username
buy_me_a_coffee: sadoyan
thanks_dev: # Replace with a single thanks.dev username
custom: # Replace with up to 4 custom sponsorship URLs e.g., ['link1', 'link2']

719
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -11,9 +11,9 @@ panic = "abort"
strip = true strip = true
[dependencies] [dependencies]
tokio = { version = "1.45.1", features = ["full"] } tokio = { version = "1.48.0", features = ["full"] }
pingora = { version = "0.6.0", features = ["lb", "openssl"] } # openssl, rustls, boringssl pingora = { version = "0.6.0", features = ["lb", "openssl"] } # openssl, rustls, boringssl
serde = { version = "1.0.219", features = ["derive"] } serde = { version = "1.0.228", features = ["derive"] }
dashmap = "7.0.0-rc2" dashmap = "7.0.0-rc2"
pingora-core = "0.6.0" pingora-core = "0.6.0"
pingora-proxy = "0.6.0" pingora-proxy = "0.6.0"
@@ -21,22 +21,22 @@ pingora-http = "0.6.0"
pingora-limits = "0.6.0" pingora-limits = "0.6.0"
async-trait = "0.1.89" async-trait = "0.1.89"
env_logger = "0.11.8" env_logger = "0.11.8"
log = "0.4.28" log = "0.4.29"
futures = "0.3.31" futures = "0.3.31"
notify = "8.2.0" notify = "9.0.0-rc.1"
axum = { version = "0.8.4" } axum = { version = "0.8.8" }
axum-server = { version = "0.7.2", features = ["tls-openssl"] } axum-server = { version = "0.8.0", features = ["tls-openssl"] }
reqwest = { version = "0.12.23", features = ["json", "native-tls-alpn"] } reqwest = { version = "0.13.1", features = ["json", "stream"] }
#reqwest = { version = "0.12.15", features = ["json", "rustls-tls"] } #reqwest = { version = "0.12.15", features = ["json", "rustls-tls"] }
#reqwest = { version = "0.12.15", default-features = false, features = ["rustls-tls", "json"] } #reqwest = { version = "0.12.15", default-features = false, features = ["rustls-tls", "json"] }
serde_yaml = "0.9.34-deprecated" serde_yaml = "0.9.34-deprecated"
rand = "0.9.2" rand = "0.10.0-rc.5"
base64 = "0.22.1" base64 = "0.22.1"
jsonwebtoken = "9.3.1" jsonwebtoken = { version = "10.3.0", features = ["aws_lc_rs"] }
tonic = "0.14.2" tonic = "0.14.2"
sha2 = { version = "0.11.0-rc.2", default-features = false } sha2 = { version = "0.11.0-rc.3", default-features = false }
base16ct = { version = "0.3.0", features = ["alloc"] } base16ct = { version = "1.0.0", features = ["alloc"] }
urlencoding = "2.1.3" urlencoding = "2.1.3"
arc-swap = "1.7.1" arc-swap = "1.7.1"
mimalloc = { version = "0.1.48", default-features = false } mimalloc = { version = "0.1.48", default-features = false }
@@ -44,12 +44,14 @@ prometheus = "0.14.0"
lazy_static = "1.5.0" lazy_static = "1.5.0"
x509-parser = "0.18.0" x509-parser = "0.18.0"
rustls-pemfile = "2.2.0" rustls-pemfile = "2.2.0"
tower-http = { version = "0.6.6", features = ["fs"] } tower-http = { version = "0.6.8", features = ["fs"] }
once_cell = "1.21.3" once_cell = "1.21.3"
privdrop = "0.5.6" privdrop = "0.5.6"
ctrlc = "3.5.0" ctrlc = "3.5.1"
port_check = "0.3.0" port_check = "0.3.0"
#moka = { version = "0.12.10", features = ["sync"] } serde_json = "1.0.149"
http = "1.4.0"
#moka = { version = "0.12.12", features = ["sync"] }
#rustls = { version = "0.23.27", features = ["ring"] } #rustls = { version = "0.23.27", features = ["ring"] }
#hickory-client = { version = "0.25.2" } #hickory-client = { version = "0.25.2" }
#openssl = "0.10.73" #openssl = "0.10.73"

201
LICENSE Normal file
View File

@@ -0,0 +1,201 @@
Apache License
Version 2.0, January 2004
http://www.apache.org/licenses/
TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
1. Definitions.
"License" shall mean the terms and conditions for use, reproduction,
and distribution as defined by Sections 1 through 9 of this document.
"Licensor" shall mean the copyright owner or entity authorized by
the copyright owner that is granting the License.
"Legal Entity" shall mean the union of the acting entity and all
other entities that control, are controlled by, or are under common
control with that entity. For the purposes of this definition,
"control" means (i) the power, direct or indirect, to cause the
direction or management of such entity, whether by contract or
otherwise, or (ii) ownership of fifty percent (50%) or more of the
outstanding shares, or (iii) beneficial ownership of such entity.
"You" (or "Your") shall mean an individual or Legal Entity
exercising permissions granted by this License.
"Source" form shall mean the preferred form for making modifications,
including but not limited to software source code, documentation
source, and configuration files.
"Object" form shall mean any form resulting from mechanical
transformation or translation of a Source form, including but
not limited to compiled object code, generated documentation,
and conversions to other media types.
"Work" shall mean the work of authorship, whether in Source or
Object form, made available under the License, as indicated by a
copyright notice that is included in or attached to the work
(an example is provided in the Appendix below).
"Derivative Works" shall mean any work, whether in Source or Object
form, that is based on (or derived from) the Work and for which the
editorial revisions, annotations, elaborations, or other modifications
represent, as a whole, an original work of authorship. For the purposes
of this License, Derivative Works shall not include works that remain
separable from, or merely link (or bind by name) to the interfaces of,
the Work and Derivative Works thereof.
"Contribution" shall mean any work of authorship, including
the original version of the Work and any modifications or additions
to that Work or Derivative Works thereof, that is intentionally
submitted to Licensor for inclusion in the Work by the copyright owner
or by an individual or Legal Entity authorized to submit on behalf of
the copyright owner. For the purposes of this definition, "submitted"
means any form of electronic, verbal, or written communication sent
to the Licensor or its representatives, including but not limited to
communication on electronic mailing lists, source code control systems,
and issue tracking systems that are managed by, or on behalf of, the
Licensor for the purpose of discussing and improving the Work, but
excluding communication that is conspicuously marked or otherwise
designated in writing by the copyright owner as "Not a Contribution."
"Contributor" shall mean Licensor and any individual or Legal Entity
on behalf of whom a Contribution has been received by Licensor and
subsequently incorporated within the Work.
2. Grant of Copyright License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
copyright license to reproduce, prepare Derivative Works of,
publicly display, publicly perform, sublicense, and distribute the
Work and such Derivative Works in Source or Object form.
3. Grant of Patent License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
(except as stated in this section) patent license to make, have made,
use, offer to sell, sell, import, and otherwise transfer the Work,
where such license applies only to those patent claims licensable
by such Contributor that are necessarily infringed by their
Contribution(s) alone or by combination of their Contribution(s)
with the Work to which such Contribution(s) was submitted. If You
institute patent litigation against any entity (including a
cross-claim or counterclaim in a lawsuit) alleging that the Work
or a Contribution incorporated within the Work constitutes direct
or contributory patent infringement, then any patent licenses
granted to You under this License for that Work shall terminate
as of the date such litigation is filed.
4. Redistribution. You may reproduce and distribute copies of the
Work or Derivative Works thereof in any medium, with or without
modifications, and in Source or Object form, provided that You
meet the following conditions:
(a) You must give any other recipients of the Work or
Derivative Works a copy of this License; and
(b) You must cause any modified files to carry prominent notices
stating that You changed the files; and
(c) You must retain, in the Source form of any Derivative Works
that You distribute, all copyright, patent, trademark, and
attribution notices from the Source form of the Work,
excluding those notices that do not pertain to any part of
the Derivative Works; and
(d) If the Work includes a "NOTICE" text file as part of its
distribution, then any Derivative Works that You distribute must
include a readable copy of the attribution notices contained
within such NOTICE file, excluding those notices that do not
pertain to any part of the Derivative Works, in at least one
of the following places: within a NOTICE text file distributed
as part of the Derivative Works; within the Source form or
documentation, if provided along with the Derivative Works; or,
within a display generated by the Derivative Works, if and
wherever such third-party notices normally appear. The contents
of the NOTICE file are for informational purposes only and
do not modify the License. You may add Your own attribution
notices within Derivative Works that You distribute, alongside
or as an addendum to the NOTICE text from the Work, provided
that such additional attribution notices cannot be construed
as modifying the License.
You may add Your own copyright statement to Your modifications and
may provide additional or different license terms and conditions
for use, reproduction, or distribution of Your modifications, or
for any such Derivative Works as a whole, provided Your use,
reproduction, and distribution of the Work otherwise complies with
the conditions stated in this License.
5. Submission of Contributions. Unless You explicitly state otherwise,
any Contribution intentionally submitted for inclusion in the Work
by You to the Licensor shall be under the terms and conditions of
this License, without any additional terms or conditions.
Notwithstanding the above, nothing herein shall supersede or modify
the terms of any separate license agreement you may have executed
with Licensor regarding such Contributions.
6. Trademarks. This License does not grant permission to use the trade
names, trademarks, service marks, or product names of the Licensor,
except as required for reasonable and customary use in describing the
origin of the Work and reproducing the content of the NOTICE file.
7. Disclaimer of Warranty. Unless required by applicable law or
agreed to in writing, Licensor provides the Work (and each
Contributor provides its Contributions) on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
implied, including, without limitation, any warranties or conditions
of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
PARTICULAR PURPOSE. You are solely responsible for determining the
appropriateness of using or redistributing the Work and assume any
risks associated with Your exercise of permissions under this License.
8. Limitation of Liability. In no event and under no legal theory,
whether in tort (including negligence), contract, or otherwise,
unless required by applicable law (such as deliberate and grossly
negligent acts) or agreed to in writing, shall any Contributor be
liable to You for damages, including any direct, indirect, special,
incidental, or consequential damages of any character arising as a
result of this License or out of the use or inability to use the
Work (including but not limited to damages for loss of goodwill,
work stoppage, computer failure or malfunction, or any and all
other commercial damages or losses), even if such Contributor
has been advised of the possibility of such damages.
9. Accepting Warranty or Additional Liability. While redistributing
the Work or Derivative Works thereof, You may choose to offer,
and charge a fee for, acceptance of support, warranty, indemnity,
or other liability obligations and/or rights consistent with this
License. However, in accepting such obligations, You may act only
on Your own behalf and on Your sole responsibility, not on behalf
of any other Contributor, and only if You agree to indemnify,
defend, and hold each Contributor harmless for any liability
incurred by, or claims asserted against, such Contributor by reason
of your accepting any such warranty or additional liability.
END OF TERMS AND CONDITIONS
APPENDIX: How to apply the Apache License to your work.
To apply the Apache License to your work, attach the following
boilerplate notice, with the fields enclosed by brackets "[]"
replaced with your own identifying information. (Don't include
the brackets!) The text should be enclosed in the appropriate
comment syntax for the file format. We also recommend that a
file or class name and description of purpose be included on the
same "printed page" as the copyright notice for easier
identification within third-party archives.
Copyright [yyyy] [name of copyright owner]
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.

View File

@@ -4,8 +4,12 @@
# Aralez (Արալեզ), # Aralez (Արալեզ),
### **Reverse proxy and service mesh built on top of Cloudflare's Pingora** ### **Reverse proxy built on top of Cloudflare's Pingora**
Aralez is a high-performance Rust reverse proxy with zero-configuration automatic protocol handling, TLS, and upstream management,
featuring Consul and Kubernetes integration for dynamic pod discovery and health-checked routing, acting as a lightweight ingress-style proxy.
---
What Aralez means ? What Aralez means ?
**Aralez = Արալեզ** <ins>.Named after the legendary Armenian guardian spirit, winged dog-like creature, that descend upon fallen heroes to lick their wounds and resurrect them.</ins>. **Aralez = Արալեզ** <ins>.Named after the legendary Armenian guardian spirit, winged dog-like creature, that descend upon fallen heroes to lick their wounds and resurrect them.</ins>.
@@ -187,7 +191,10 @@ provider: "file"
sticky_sessions: false sticky_sessions: false
to_https: false to_https: false
rate_limit: 10 rate_limit: 10
headers: server_headers:
- "X-Forwarded-Proto:https"
- "X-Forwarded-Port:443"
client_headers:
- "Access-Control-Allow-Origin:*" - "Access-Control-Allow-Origin:*"
- "Access-Control-Allow-Methods:POST, GET, OPTIONS" - "Access-Control-Allow-Methods:POST, GET, OPTIONS"
- "Access-Control-Max-Age:86400" - "Access-Control-Max-Age:86400"
@@ -199,7 +206,10 @@ myhost.mydomain.com:
"/": "/":
rate_limit: 20 rate_limit: 20
to_https: false to_https: false
headers: server_headers:
- "X-Something-Else:Foobar"
- "X-Another-Header:Hohohohoho"
client_headers:
- "X-Some-Thing:Yaaaaaaaaaaaaaaa" - "X-Some-Thing:Yaaaaaaaaaaaaaaa"
- "X-Proxy-From:Hopaaaaaaaaaaaar" - "X-Proxy-From:Hopaaaaaaaaaaaar"
servers: servers:
@@ -207,7 +217,7 @@ myhost.mydomain.com:
- "127.0.0.2:8000" - "127.0.0.2:8000"
"/foo": "/foo":
to_https: true to_https: true
headers: client_headers:
- "X-Another-Header:Hohohohoho" - "X-Another-Header:Hohohohoho"
servers: servers:
- "127.0.0.4:8443" - "127.0.0.4:8443"
@@ -222,6 +232,8 @@ myhost.mydomain.com:
- Sticky sessions are disabled globally. This setting applies to all upstreams. If enabled all requests will be 301 redirected to HTTPS. - Sticky sessions are disabled globally. This setting applies to all upstreams. If enabled all requests will be 301 redirected to HTTPS.
- HTTP to HTTPS redirect disabled globally, but can be overridden by `to_https` setting per upstream. - HTTP to HTTPS redirect disabled globally, but can be overridden by `to_https` setting per upstream.
- All upstreams will receive custom headers : `X-Forwarded-Proto:https` and `X-Forwarded-Port:443`
- Additionally, myhost.mydomain.com with path `/` will receive custom headers : `X-Another-Header:Hohohohoho` and `X-Something-Else:Foobar`
- Requests to each hosted domains will be limited to 10 requests per second per virtualhost. - Requests to each hosted domains will be limited to 10 requests per second per virtualhost.
- Requests limits are calculated per requester ip plus requested virtualhost. - Requests limits are calculated per requester ip plus requested virtualhost.
- If the requester exceeds the limit it will receive `429 Too Many Requests` error. - If the requester exceeds the limit it will receive `429 Too Many Requests` error.
@@ -340,20 +352,33 @@ curl -u username:password -H 'Host: myip.mydomain.com' http://127.0.0.1:6193/
- Sticky session support. - Sticky session support.
- HTTP2 ready. - HTTP2 ready.
📊 Why Choose Aralez? Feature Comparison ### 🧩 Summary Table: Feature Comparison
| Feature | **Aralez** | **Nginx** | **HAProxy** | **Traefik** | | Feature / Proxy | **Aralez** | **Nginx** | **HAProxy** | **Traefik** | **Caddy** | **Envoy** |
|----------------------------|----------------------------------------------------------------------|--------------------------|-------------------------|-----------------| |----------------------------------|:-----------------:|:---------------------------:|:-----------------:|:--------------------------------:|:---------------:|:---------------:|
| **Hot Reload** | ✅ Yes (live, API/file) | ⚠️ Reloads config | ⚠️ Reloads config | ✅ Yes (dynamic) | | **Hot Reload (Zero Downtime)** | **Automatic** | ⚙️ Manual (graceful reload) | ⚙️ Manual | ✅ Automatic | ✅ Automatic | Automatic |
| **JWT Auth** | ✅ Built-in | ❌ External scripts | ❌ External Lua or agent | ⚠️ With plugins | | **Auto Cert Reload (from disk)** | ✅ **Automatic** | ❌ No | ❌ No | ✅ Automatic (Let's Encrypt only) | ✅ Automatic | ⚙️ Manual |
| **WebSocket Support** | ✅ Automatic | ⚠️ Manual config | ✅ Yes | ✅ Yes | | **Auth: Basic / API Key / JWT** | **Built-in** | ⚙️ Basic only | ⚙️ Basic only | ✅ Config-based | ✅ Config-based | ✅ Config-based |
| **gRPC Support** | ✅ Automatic (no config) | ⚠️ Manual + HTTP/2 + TLS | ⚠️ Complex setup | ✅ Native | | **TLS / HTTP2 Termination** | ✅ **Automatic** | ⚙️ Manual config | ⚙️ Manual config | ✅ Automatic | ✅ Automatic | Automatic |
| **TLS Termination** | ✅ Built-in (OpenSSL) | ✅ Yes | ✅ Yes | ✅ Yes | | **Built-in A+ TLS Grades** | **Automatic** | ⚙️ Manual tuning | ⚙️ Manual | ⚙️ Manual | ✅ Automatic | ⚙️ Manual |
| **TLS Upstream Detection** | ✅ Automatic | | ❌ | ❌ | | **gRPC Proxy** | ✅ **Zero-Config** | ⚙️ Manual setup | ⚙️ Manual | ⚙️ Needs config | ⚙️ Needs config | ⚙️ Needs config |
| **HTTP/2 Support** | ✅ Automatic | ⚠️ Requires extra config | ⚠️ Requires build flags | ✅ Native | | **SSL Proxy** | ✅ **Zero-Config** | ⚙️ Manual | ⚙️ Manual | ✅ Automatic | ✅ Automatic | ✅ Automatic |
| **Sticky Sessions** | ✅ Cookie-based | In plus version only | | | | **HTTP/2 Proxy** | ✅ **Zero-Config** | ⚙️ Manual enable | Manual enable | ✅ Automatic | ✅ Automatic | ✅ Automatic |
| **Prometheus Metrics** | ✅ [Built in](https://github.com/sadoyan/aralez/blob/main/METRICS.md) | ⚠️ With Lua or exporter | ⚠️ With external script | ✅ Native | | **WebSocket Proxy** | ✅ **Zero-Config** | ⚙️ Manual upgrade | ⚙️ Manual upgrade | ✅ Automatic | ✅ Automatic | ✅ Automatic |
| **Built With** | 🦀 Rust | C | C | Go | | **Sticky Sessions** | **Built-in** | ⚙️ Config-based | ⚙️ Config-based | ✅ Automatic | ⚙️ Limited | ✅ Config-based |
| **Prometheus Metrics** | ✅ **Built-in** | ⚙️ External exporter | ✅ Built-in | ✅ Built-in | ✅ Built-in | ✅ Built-in |
| **Consul Integration** | ✅ **Yes** | ❌ No | ⚙️ Via DNS only | ✅ Yes | ❌ No | ✅ Yes |
| **Kubernetes Integration** | ✅ **Yes** | ⚙️ Needs ingress setup | ⚙️ External | ✅ Yes | ⚙️ Limited | ✅ Yes |
| **Request Limiter** | ✅ **Yes** | ✅ Config-based | ✅ Config-based | ✅ Config-based | ✅ Config-based | ✅ Config-based |
| **Serve Static Files** | ✅ **Yes** | ✅ Yes | ⚙️ Basic | ✅ Automatic | ✅ Automatic | ❌ No |
| **Upstream Health Checks** | ✅ **Automatic** | ⚙️ Manual config | ⚙️ Manual config | ✅ Automatic | ✅ Automatic | ✅ Automatic |
| **Built With** | 🦀 **Rust** | C | C | Go | Go | C++ |
---
**Automatic / Zero-Config** Works immediately, no setup required
⚙️ **Manual / Config-based** Requires explicit configuration or modules
**No** Not supported
## 💡 Simple benchmark by [Oha](https://github.com/hatoo/oha) ## 💡 Simple benchmark by [Oha](https://github.com/hatoo/oha)

View File

@@ -1,58 +1,82 @@
# The file under watch and hot reload, changes are applied immediately, no need to restart or reload. # The file under watch and hot reload, changes are applied immediately, no need to restart or reload.
provider: "file" # consul, kubernetes provider: "file" # "file" "consul" "kubernetes"
sticky_sessions: false sticky_sessions: false
to_ssl: false to_https: false
#rate_limit: 100 rate_limit: 100
headers: server_headers:
- "X-Forwarded-Proto:https"
- "X-Forwarded-Port:443"
client_headers:
- "Access-Control-Allow-Origin:*" - "Access-Control-Allow-Origin:*"
- "Access-Control-Allow-Methods:POST, GET, OPTIONS" - "Access-Control-Allow-Methods:POST, GET, OPTIONS"
- "Access-Control-Max-Age:86400" - "Access-Control-Max-Age:86400"
- "X-Custom-Header:Something Special" #authorization:
authorization: # type: "jwt"
type: "jwt" # creds: "910517d9-f9a1-48de-8826-dbadacbd84af-cb6f830e-ab16-47ec-9d8f-0090de732774"
creds: "910517d9-f9a1-48de-8826-dbadacbd84af-cb6f830e-ab16-47ec-9d8f-0090de732774"
# type: "basic" # type: "basic"
# creds: "user:Passw0rd" # creds: "username:Pa$$w0rd"
# type: "apikey" # type: "apikey"
# creds: "5ecbf799-1343-4e94-a9b5-e278af5cd313-56b45249-1839-4008-a450-a60dc76d2bae" # creds: "5ecbf799-1343-4e94-a9b5-e278af5cd313-56b45249-1839-4008-a450-a60dc76d2bae"
consul: # If the provider is consul. Otherwise, ignored. consul:
servers: servers:
- "http://consul1:8500" - "http://192.168.1.199:8500"
- "http://consul2:8500" - "http://192.168.1.200:8500"
- "http://consul3:8500" - "http://192.168.1.201:8500"
services: # proxy: The hostname to access the proxy server, real : The real service name in Consul database. services: # hostname: The hostname to access the proxy server, upstream : The real service name in Consul database.
- proxy: "proxy-frontend-dev-frontend-srv" - hostname: "webapi-service"
real: "frontend-dev-frontend-srv" upstream: "webapi-service-health"
path: "/one"
client_headers:
- "X-Some-Thing:Yaaaaaaaaaaaaaaa"
- "X-Proxy-From:Aralez"
rate_limit: 1
to_https: false
- hostname: "webapi-service"
upstream: "webapi-service-health"
path: "/"
token: "8e2db809-845b-45e1-8b47-2c8356a09da0-a4370955-18c2-4d6e-a8f8-ffcc0b47be81" # Consul server access token, If Consul auth is enabled token: "8e2db809-845b-45e1-8b47-2c8356a09da0-a4370955-18c2-4d6e-a8f8-ffcc0b47be81" # Consul server access token, If Consul auth is enabled
kubernetes: kubernetes:
servers: servers:
- "172.16.0.11:5443" # KUBERNETES_SERVICE_HOST : KUBERNETES_SERVICE_PORT_HTTPS - "192.168.1.55:443" #For testing only, overrides with KUBERNETES_SERVICE_HOST : KUBERNETES_SERVICE_PORT_HTTPS env variables.
services: services:
- proxy: "api-service" - hostname: "webapi-service"
real: "api-service"
path: "/" path: "/"
- proxy: "api-service" upstream: "webapi-service"
real: "search-service" - hostname: "webapi-service"
path: "/search" upstream: "vt-console-service"
- proxy: "websocket-service" path: "/one"
real: "websocket-service" client_headers:
tokenpath: "/tmp/token.txt" # /var/run/secrets/kubernetes.io/serviceaccount/token - "X-Some-Thing:Yaaaaaaaaaaaaaaa"
- "X-Proxy-From:Aralez"
rate_limit: 100
to_https: false
- hostname: "webapi-service"
upstream: "vt-rambulik-service"
path: "/two"
- hostname: "websocket-service"
upstream: "websocket-service"
path: "/"
tokenpath: "/path/to/kubetoken.txt" #If not set, will default to /var/run/secrets/kubernetes.io/serviceaccount/token
upstreams: upstreams:
myip.mydomain.com: myip.mydomain.com:
paths: paths:
rate_limit: 10 # Per path rate limit have higher priority than global rate limit. If not set, the global rate limit will be used
"/": "/":
rate_limit: 200
to_https: false to_https: false
headers: client_headers:
- "X-Proxy-From:Aralez" - "X-Proxy-From:Aralez"
servers: # List of upstreams HOST:PORT servers:
- "127.0.0.1:8000" - "127.0.0.1:8000"
- "127.0.0.2:8000" - "127.0.0.2:8000"
- "127.0.0.3:8000" - "127.0.0.3:8000"
- "127.0.0.4:8000" - "127.0.0.4:8000"
- "127.0.0.5:8000"
"/ping": "/ping":
to_https: true to_https: false
headers: server_headers:
- "X-Forwarded-Proto:https"
- "X-Forwarded-Port:443"
client_headers:
- "X-Some-Thing:Yaaaaaaaaaaaaaaa" - "X-Some-Thing:Yaaaaaaaaaaaaaaa"
- "X-Proxy-From:Aralez" - "X-Proxy-From:Aralez"
servers: servers:
@@ -64,14 +88,28 @@ upstreams:
polo.mydomain.com: polo.mydomain.com:
paths: paths:
"/": "/":
headers: to_https: false
client_headers:
- "X-Some-Thing:Yaaaaaaaaaaaaaaa" - "X-Some-Thing:Yaaaaaaaaaaaaaaa"
servers: servers:
- "192.168.1.1:8000"
- "192.168.1.10:8000"
- "127.0.0.1:8000" - "127.0.0.1:8000"
- "127.0.0.2:8000" - "127.0.0.2:8000"
- "127.0.0.3:8000" - "127.0.0.3:8000"
- "127.0.0.4:8000" - "127.0.0.4:8000"
"/.well-known/acme-challenge": apt.mydomain.com:
healthcheck: false paths:
servers: "/":
- "127.0.0.1:8001" servers:
- "192.168.1.10:443"
"/.well-known/acme-challenge":
healthcheck: false
servers:
- "127.0.0.1:8001"
localpost:
paths:
"/":
to_https: false
servers:
- "127.0.0.1:9000"

View File

@@ -1,14 +1,15 @@
pub mod auth; pub mod auth;
pub mod consul;
pub mod discovery; pub mod discovery;
pub mod dnsclient; pub mod dnsclient;
mod filewatch; mod filewatch;
pub mod healthcheck; pub mod healthcheck;
pub mod httpclient;
pub mod jwt; pub mod jwt;
pub mod kuber; pub mod kuberconsul;
pub mod metrics; pub mod metrics;
pub mod parceyaml; pub mod parceyaml;
pub mod state; pub mod state;
pub mod structs; pub mod structs;
pub mod tls; pub mod tls;
pub mod tools; pub mod tools;
// pub mod watchksecret;

View File

@@ -3,6 +3,7 @@ use base64::engine::general_purpose::STANDARD;
use base64::Engine; use base64::Engine;
use pingora_proxy::Session; use pingora_proxy::Session;
use std::collections::HashMap; use std::collections::HashMap;
use std::sync::Arc;
use urlencoding::decode; use urlencoding::decode;
trait AuthValidator { trait AuthValidator {
@@ -40,16 +41,6 @@ impl AuthValidator for JwtAuth<'_> {
if let Some(tok) = get_query_param(session, "araleztoken") { if let Some(tok) = get_query_param(session, "araleztoken") {
return check_jwt(tok.as_str(), jwtsecret); return check_jwt(tok.as_str(), jwtsecret);
} }
// if let Some(header) = session.get_header("authorization") {
// let h = header.to_str().ok().unwrap().split(" ").collect::<Vec<_>>();
// match h.len() {
// n => {
// return check_jwt(h[n - 1], jwtsecret);
// }
// }
// }
if let Some(auth_header) = session.get_header("authorization") { if let Some(auth_header) = session.get_header("authorization") {
if let Ok(header_str) = auth_header.to_str() { if let Ok(header_str) = auth_header.to_str() {
if let Some((scheme, token)) = header_str.split_once(' ') { if let Some((scheme, token)) = header_str.split_once(' ') {
@@ -66,18 +57,18 @@ fn validate(auth: &dyn AuthValidator, session: &Session) -> bool {
auth.validate(session) auth.validate(session)
} }
pub fn authenticate(c: &[String], session: &Session) -> bool { pub fn authenticate(c: &[Arc<str>], session: &Session) -> bool {
match c[0].as_str() { match &*c[0] {
"basic" => { "basic" => {
let auth = BasicAuth(c[1].as_str().into()); let auth = BasicAuth(&*c[1]);
validate(&auth, session) validate(&auth, session)
} }
"apikey" => { "apikey" => {
let auth = ApiKeyAuth(c[1].as_str().into()); let auth = ApiKeyAuth(&*c[1]);
validate(&auth, session) validate(&auth, session)
} }
"jwt" => { "jwt" => {
let auth = JwtAuth(c[1].as_str().into()); let auth = JwtAuth(&*c[1]);
validate(&auth, session) validate(&auth, session)
} }
_ => { _ => {

View File

@@ -1,143 +0,0 @@
use crate::utils::structs::{Configuration, InnerMap, ServiceMapping, UpstreamsDashMap};
use crate::utils::tools::{clone_dashmap_into, compare_dashmaps, print_upstreams};
use dashmap::DashMap;
use futures::channel::mpsc::Sender;
use futures::SinkExt;
use log::{info, warn};
use pingora::prelude::sleep;
use rand::Rng;
use reqwest::header::{HeaderMap, HeaderValue};
use serde::Deserialize;
use std::collections::HashMap;
use std::sync::atomic::AtomicUsize;
use std::sync::Arc;
use std::time::Duration;
#[derive(Debug, Deserialize)]
struct Service {
#[serde(rename = "ServiceTaggedAddresses")]
tagged_addresses: HashMap<String, TaggedAddress>,
}
#[derive(Debug, Deserialize)]
struct TaggedAddress {
#[serde(rename = "Address")]
address: String,
#[serde(rename = "Port")]
port: u16,
}
pub async fn start(mut toreturn: Sender<Configuration>, config: Arc<Configuration>) {
let headers = DashMap::new();
info!("Consul Discovery is enabled : {}", config.typecfg);
let consul = config.consul.clone();
let prev_upstreams = UpstreamsDashMap::new();
match consul {
Some(consul) => {
let servers = consul.servers.unwrap();
info!("Consul Servers => {:?}", servers);
let end = servers.len() - 1;
loop {
let mut num = 0;
if end > 0 {
num = rand::rng().random_range(0..end);
}
headers.clear();
for (k, v) in config.headers.clone() {
headers.insert(k.to_string(), v);
}
let consul_data = servers.get(num).unwrap().to_string();
let upstreams = consul_request(consul_data, consul.services.clone(), consul.token.clone());
match upstreams.await {
Some(upstreams) => {
if !compare_dashmaps(&upstreams, &prev_upstreams) {
let mut tosend: Configuration = Configuration {
upstreams: Default::default(),
headers: Default::default(),
consul: None,
kubernetes: None,
typecfg: "".to_string(),
extraparams: config.extraparams.clone(),
};
clone_dashmap_into(&upstreams, &prev_upstreams);
clone_dashmap_into(&upstreams, &tosend.upstreams);
tosend.headers = headers.clone();
tosend.extraparams.authentication = config.extraparams.authentication.clone();
tosend.typecfg = config.typecfg.clone();
tosend.consul = config.consul.clone();
print_upstreams(&tosend.upstreams);
toreturn.send(tosend).await.unwrap();
}
}
None => {}
}
sleep(Duration::from_secs(5)).await;
}
}
None => {}
}
}
async fn consul_request(url: String, whitelist: Option<Vec<ServiceMapping>>, token: Option<String>) -> Option<UpstreamsDashMap> {
let upstreams = UpstreamsDashMap::new();
let ss = url.clone() + "/v1/catalog/service/";
match whitelist {
Some(whitelist) => {
for k in whitelist.iter() {
let pref: String = ss.clone() + &k.real;
let list = get_by_http(pref.clone(), token.clone()).await;
match list {
Some(list) => {
upstreams.insert(k.proxy.clone(), list);
}
None => {
warn!("Whitelist not found for {}", k.proxy);
}
}
}
}
None => {}
}
Some(upstreams)
}
async fn get_by_http(url: String, token: Option<String>) -> Option<DashMap<String, (Vec<InnerMap>, AtomicUsize)>> {
let client = reqwest::Client::new();
let mut headers = HeaderMap::new();
if let Some(token) = token {
headers.insert("X-Consul-Token", HeaderValue::from_str(&token).unwrap());
}
let to = Duration::from_secs(1);
let u = client.get(url).timeout(to).send();
let mut values = Vec::new();
let upstreams: DashMap<String, (Vec<InnerMap>, AtomicUsize)> = DashMap::new();
match u.await {
Ok(r) => {
let jason = r.json::<Vec<Service>>().await;
match jason {
Ok(whitelist) => {
for service in whitelist {
let addr = service.tagged_addresses.get("lan_ipv4").unwrap().address.clone();
let prt = service.tagged_addresses.get("lan_ipv4").unwrap().port.clone();
let to_add = InnerMap {
address: addr,
port: prt,
is_ssl: false,
is_http2: false,
to_https: false,
rate_limit: None,
healthcheck: None,
};
values.push(to_add);
}
}
Err(_) => return None,
}
}
Err(_) => return None,
}
upstreams.insert("/".to_string(), (values, AtomicUsize::new(0)));
Some(upstreams)
}

View File

@@ -1,6 +1,6 @@
use crate::utils::filewatch; use crate::utils::filewatch;
use crate::utils::structs::Configuration; use crate::utils::kuberconsul::{ConsulDiscovery, KubernetesDiscovery, ServiceDiscovery};
use crate::utils::{consul, kuber}; use crate::utils::structs::{Configuration, UpstreamsDashMap};
use crate::web::webserver; use crate::web::webserver;
use async_trait::async_trait; use async_trait::async_trait;
use futures::channel::mpsc::Sender; use futures::channel::mpsc::Sender;
@@ -15,13 +15,8 @@ pub struct APIUpstreamProvider {
pub tls_key_file: Option<String>, pub tls_key_file: Option<String>,
pub file_server_address: Option<String>, pub file_server_address: Option<String>,
pub file_server_folder: Option<String>, pub file_server_folder: Option<String>,
} pub current_upstreams: Arc<UpstreamsDashMap>,
pub full_upstreams: Arc<UpstreamsDashMap>,
#[async_trait]
impl Discovery for APIUpstreamProvider {
async fn start(&self, toreturn: Sender<Configuration>) {
webserver::run_server(self, toreturn).await;
}
} }
pub struct FromFileProvider { pub struct FromFileProvider {
@@ -41,6 +36,13 @@ pub trait Discovery {
async fn start(&self, tx: Sender<Configuration>); async fn start(&self, tx: Sender<Configuration>);
} }
#[async_trait]
impl Discovery for APIUpstreamProvider {
async fn start(&self, toreturn: Sender<Configuration>) {
webserver::run_server(self, toreturn, self.current_upstreams.clone(), self.full_upstreams.clone()).await;
}
}
#[async_trait] #[async_trait]
impl Discovery for FromFileProvider { impl Discovery for FromFileProvider {
async fn start(&self, tx: Sender<Configuration>) { async fn start(&self, tx: Sender<Configuration>) {
@@ -51,13 +53,13 @@ impl Discovery for FromFileProvider {
#[async_trait] #[async_trait]
impl Discovery for ConsulProvider { impl Discovery for ConsulProvider {
async fn start(&self, tx: Sender<Configuration>) { async fn start(&self, tx: Sender<Configuration>) {
tokio::spawn(consul::start(tx.clone(), self.config.clone())); tokio::spawn(ConsulDiscovery.fetch_upstreams(self.config.clone(), tx));
} }
} }
#[async_trait] #[async_trait]
impl Discovery for KubernetesProvider { impl Discovery for KubernetesProvider {
async fn start(&self, tx: Sender<Configuration>) { async fn start(&self, tx: Sender<Configuration>) {
tokio::spawn(kuber::start(tx.clone(), self.config.clone())); tokio::spawn(KubernetesDiscovery.fetch_upstreams(self.config.clone(), tx));
} }
} }

View File

@@ -41,7 +41,7 @@ pub async fn start(fp: String, mut toreturn: Sender<Configuration>) {
if start.elapsed() > Duration::from_secs(2) { if start.elapsed() > Duration::from_secs(2) {
start = Instant::now(); start = Instant::now();
// info!("Config File changed :=> {:?}", e); // info!("Config File changed :=> {:?}", e);
let snd = load_configuration(file_path, "filepath").await; let snd = load_configuration(file_path, "filepath").await.0;
match snd { match snd {
Some(snd) => { Some(snd) => {
toreturn.send(snd).await.unwrap(); toreturn.send(snd).await.unwrap();

View File

@@ -15,12 +15,18 @@ pub async fn hc2(upslist: Arc<UpstreamsDashMap>, fullist: Arc<UpstreamsDashMap>,
loop { loop {
tokio::select! { tokio::select! {
_ = period.tick() => { _ = period.tick() => {
populate_upstreams(&upslist, &fullist, &idlist, params, &client).await; // populate_upstreams(&upslist, &fullist, &idlist, params, &client).await;
let totest = build_upstreams(&fullist, params.0, &client).await;
if !compare_dashmaps(&totest, &upslist) {
clone_dashmap_into(&totest, &upslist);
clone_idmap_into(&totest, &idlist);
}
} }
} }
} }
} }
/*
pub async fn populate_upstreams(upslist: &Arc<UpstreamsDashMap>, fullist: &Arc<UpstreamsDashMap>, idlist: &Arc<UpstreamsIdMap>, params: (&str, u64), client: &Client) { pub async fn populate_upstreams(upslist: &Arc<UpstreamsDashMap>, fullist: &Arc<UpstreamsDashMap>, idlist: &Arc<UpstreamsIdMap>, params: (&str, u64), client: &Client) {
let totest = build_upstreams(fullist, params.0, client).await; let totest = build_upstreams(fullist, params.0, client).await;
if !compare_dashmaps(&totest, upslist) { if !compare_dashmaps(&totest, upslist) {
@@ -28,6 +34,7 @@ pub async fn populate_upstreams(upslist: &Arc<UpstreamsDashMap>, fullist: &Arc<U
clone_idmap_into(&totest, idlist); clone_idmap_into(&totest, idlist);
} }
} }
*/
pub async fn initiate_upstreams(fullist: UpstreamsDashMap) -> UpstreamsDashMap { pub async fn initiate_upstreams(fullist: UpstreamsDashMap) -> UpstreamsDashMap {
let client = Client::builder().timeout(Duration::from_secs(2)).danger_accept_invalid_certs(true).build().unwrap(); let client = Client::builder().timeout(Duration::from_secs(2)).danger_accept_invalid_certs(true).build().unwrap();
@@ -46,7 +53,7 @@ async fn build_upstreams(fullist: &UpstreamsDashMap, method: &str, client: &Clie
let mut innervec = Vec::new(); let mut innervec = Vec::new();
for (_, upstream) in path_entry.value().0.iter().enumerate() { for (_, upstream) in path_entry.value().0.iter().enumerate() {
let tls = detect_tls(upstream.address.as_str(), &upstream.port, &client).await; let tls = detect_tls(&upstream.address.to_string(), &upstream.port, &client).await;
let is_h2 = matches!(tls.1, Some(Version::HTTP_2)); let is_h2 = matches!(tls.1, Some(Version::HTTP_2));
let link = if tls.0 { let link = if tls.0 {
@@ -71,23 +78,13 @@ async fn build_upstreams(fullist: &UpstreamsDashMap, method: &str, client: &Clie
if resp.1 { if resp.1 {
scheme.is_http2 = is_h2; // could be adjusted further scheme.is_http2 = is_h2; // could be adjusted further
} }
innervec.push(scheme); innervec.push(Arc::from(scheme));
} else { } else {
warn!("Dead Upstream : {}", link); warn!("Dead Upstream : {}", link);
} }
} else { } else {
innervec.push(scheme); innervec.push(Arc::from(scheme));
} }
// let resp = http_request(&link, method, "", &client).await;
// if resp.0 {
// if resp.1 {
// scheme.is_http2 = is_h2; // could be adjusted further
// }
// innervec.push(scheme);
// } else {
// warn!("Dead Upstream : {}", link);
// }
} }
inner.insert(path.clone(), (innervec, AtomicUsize::new(0))); inner.insert(path.clone(), (innervec, AtomicUsize::new(0)));
} }

80
src/utils/httpclient.rs Normal file
View File

@@ -0,0 +1,80 @@
use crate::utils::kuberconsul::{match_path, ConsulService, KubeEndpoints};
use crate::utils::structs::{InnerMap, ServiceMapping};
use axum::http::{HeaderMap, HeaderValue};
use dashmap::DashMap;
use reqwest::Client;
use std::sync::atomic::AtomicUsize;
use std::sync::Arc;
use std::time::Duration;
pub async fn for_consul(url: String, token: Option<String>, conf: &ServiceMapping) -> Option<DashMap<Arc<str>, (Vec<Arc<InnerMap>>, AtomicUsize)>> {
let client = Client::builder().timeout(Duration::from_secs(2)).danger_accept_invalid_certs(true).build().ok()?;
let mut headers = HeaderMap::new();
if let Some(token) = token {
headers.insert("X-Consul-Token", HeaderValue::from_str(&token).unwrap());
}
let to = Duration::from_secs(1);
let resp = client.get(url).timeout(to).send().await.ok()?;
if !resp.status().is_success() {
eprintln!("Consul API returned status: {}", resp.status());
return None;
}
let mut inner_vec = Vec::new();
let upstreams: DashMap<Arc<str>, (Vec<Arc<InnerMap>>, AtomicUsize)> = DashMap::new();
let endpoints: Vec<ConsulService> = resp.json().await.ok()?;
for subsets in endpoints {
// let addr = subsets.tagged_addresses.get("lan_ipv4").unwrap().address.clone();
// let prt = subsets.tagged_addresses.get("lan_ipv4").unwrap().port.clone();
let addr = subsets.tagged_addresses.get("lan_ipv4").unwrap().address.clone().parse().unwrap();
let prt = subsets.tagged_addresses.get("lan_ipv4").unwrap().port.clone();
let to_add = Arc::from(InnerMap {
address: addr,
port: prt,
is_ssl: false,
is_http2: false,
to_https: conf.to_https.unwrap_or(false),
rate_limit: conf.rate_limit,
healthcheck: None,
});
inner_vec.push(to_add);
}
match_path(&conf, &upstreams, inner_vec.clone());
Some(upstreams)
}
pub async fn for_kuber(url: &str, token: &str, conf: &ServiceMapping) -> Option<DashMap<Arc<str>, (Vec<Arc<InnerMap>>, AtomicUsize)>> {
let to = Duration::from_secs(10);
let client = Client::builder().timeout(Duration::from_secs(10)).danger_accept_invalid_certs(true).build().ok()?;
let resp = client.get(url).timeout(to).bearer_auth(token).send().await.ok()?;
if !resp.status().is_success() {
eprintln!("Kubernetes API returned status: {}", resp.status());
return None;
}
let endpoints: KubeEndpoints = resp.json().await.ok()?;
let upstreams: DashMap<Arc<str>, (Vec<Arc<InnerMap>>, AtomicUsize)> = DashMap::new();
if let Some(subsets) = endpoints.subsets {
for subset in subsets {
if let (Some(addresses), Some(ports)) = (subset.addresses, subset.ports) {
let mut inner_vec = Vec::new();
for addr in addresses {
for port in &ports {
let to_add = Arc::from(InnerMap {
address: addr.ip.parse().unwrap(),
port: port.port.clone(),
is_ssl: false,
is_http2: false,
to_https: conf.to_https.unwrap_or(false),
rate_limit: conf.rate_limit,
healthcheck: None,
});
inner_vec.push(to_add);
}
}
match_path(&conf, &upstreams, inner_vec.clone());
}
}
}
Some(upstreams)
}

View File

@@ -1,148 +0,0 @@
// use crate::utils::dnsclient::DnsClientPool;
use crate::utils::structs::{Configuration, InnerMap, UpstreamsDashMap};
use crate::utils::tools::{clone_dashmap_into, compare_dashmaps, print_upstreams};
use dashmap::DashMap;
use futures::channel::mpsc::Sender;
use futures::SinkExt;
use pingora::prelude::sleep;
use rand::Rng;
use reqwest::Client;
use std::env;
use std::sync::atomic::AtomicUsize;
use std::sync::Arc;
use std::time::Duration;
use tokio::fs::File;
use tokio::io::AsyncReadExt;
// static KUBERNETES_SERVICE_HOST: &str = "IP_ADDRESS";
// static TOKEN: &str = "TOKEN";
#[derive(Debug, serde::Deserialize)]
struct Endpoints {
subsets: Option<Vec<Subset>>,
}
#[derive(Debug, serde::Deserialize)]
struct Subset {
addresses: Option<Vec<Address>>,
ports: Option<Vec<Port>>,
}
#[derive(Debug, serde::Deserialize)]
struct Address {
ip: String,
}
#[derive(Debug, serde::Deserialize)]
struct Port {
// name: String,
port: u16,
}
pub async fn start(mut toreturn: Sender<Configuration>, config: Arc<Configuration>) {
// println!("{:?}", config);
let upstreams = UpstreamsDashMap::new();
let prev_upstreams = UpstreamsDashMap::new();
loop {
if let Some(kuber) = config.kubernetes.clone() {
let path = kuber.tokenpath.unwrap_or("/var/run/secrets/kubernetes.io/serviceaccount/token".to_string());
let token = read_token(path.as_str()).await;
let servers = kuber.servers.unwrap_or(vec![format!(
"{}:{}",
env::var("KUBERNETES_SERVICE_HOST").unwrap_or("0.0.0.0".to_string()),
env::var("KUBERNETES_SERVICE_PORT_HTTPS").unwrap_or("0".to_string())
)]);
let end = servers.len() - 1;
let mut num = 0;
if end > 0 {
num = rand::rng().random_range(0..end);
}
let server = servers.get(num).unwrap().to_string();
if let Some(svc) = kuber.services {
for i in svc {
let path = i.path.unwrap_or("/".to_string());
let url = format!("https://{}/api/v1/namespaces/staging/endpoints/{}", server, i.real);
let list = get_by_http(&*url, &*token, &*path).await;
// println!("{:?}", list);
if let Some(list) = list {
match upstreams.get(&i.proxy.clone()) {
Some(foo) => {
for (k, v) in list {
foo.value().insert(k, v);
}
}
None => {
upstreams.insert(i.proxy.clone(), list);
}
};
}
}
}
}
// print_upstreams(&upstreams);
if !compare_dashmaps(&upstreams, &prev_upstreams) {
let tosend: Configuration = Configuration {
upstreams: Default::default(),
headers: config.headers.clone(),
consul: config.consul.clone(),
kubernetes: config.kubernetes.clone(),
typecfg: config.typecfg.clone(),
extraparams: config.extraparams.clone(),
};
clone_dashmap_into(&upstreams, &prev_upstreams);
clone_dashmap_into(&upstreams, &tosend.upstreams);
print_upstreams(&tosend.upstreams);
toreturn.send(tosend).await.unwrap();
}
sleep(Duration::from_secs(5)).await;
}
}
pub async fn get_by_http(url: &str, token: &str, path: &str) -> Option<DashMap<String, (Vec<InnerMap>, AtomicUsize)>> {
let client = Client::builder().timeout(Duration::from_secs(2)).danger_accept_invalid_certs(true).build().ok()?;
let resp = client.get(url).bearer_auth(token).send().await.ok()?;
if !resp.status().is_success() {
eprintln!("Kubernetes API returned status: {}", resp.status());
return None;
}
let endpoints: Endpoints = resp.json().await.ok()?;
let upstreams: DashMap<String, (Vec<InnerMap>, AtomicUsize)> = DashMap::new();
if let Some(subsets) = endpoints.subsets {
for subset in subsets {
if let (Some(addresses), Some(ports)) = (subset.addresses, subset.ports) {
let mut inner_vec = Vec::new();
for addr in addresses {
for port in &ports {
let to_add = InnerMap {
address: addr.ip.clone(),
port: port.port.clone(),
is_ssl: false,
is_http2: false,
to_https: false,
rate_limit: None,
healthcheck: None,
};
inner_vec.push(to_add);
}
}
upstreams.insert(path.to_string(), (inner_vec, AtomicUsize::new(0)));
}
}
}
Some(upstreams)
}
async fn read_token(path: &str) -> String {
let mut file = File::open(path).await.unwrap();
let mut contents = String::new();
file.read_to_string(&mut contents).await.unwrap();
contents.trim().to_string()
}

232
src/utils/kuberconsul.rs Normal file
View File

@@ -0,0 +1,232 @@
use crate::utils::httpclient;
use crate::utils::parceyaml::build_headers;
use crate::utils::structs::{Configuration, InnerMap, ServiceMapping, UpstreamsDashMap};
use crate::utils::tools::{clone_dashmap_into, compare_dashmaps, print_upstreams};
use async_trait::async_trait;
use dashmap::DashMap;
use futures::channel::mpsc::Sender;
use futures::SinkExt;
use pingora::prelude::sleep;
use rand::Rng;
use serde::Deserialize;
use std::collections::HashMap;
use std::env;
use std::fs;
use std::path::Path;
use std::sync::atomic::AtomicUsize;
use std::sync::Arc;
use std::time::Duration;
use tokio::fs::File;
use tokio::io::AsyncReadExt;
// #[derive(Debug, Deserialize)]
// pub struct KubeEndpointsList {
// pub items: Vec<KubeEndpoints>,
// }
#[derive(Debug, serde::Deserialize)]
pub struct KubeEndpoints {
pub subsets: Option<Vec<KubeSubset>>,
}
#[derive(Debug, serde::Deserialize)]
pub struct KubeSubset {
pub addresses: Option<Vec<KubeAddress>>,
pub ports: Option<Vec<KubePort>>,
}
#[derive(Debug, serde::Deserialize)]
pub struct KubeAddress {
pub ip: String,
}
#[derive(Debug, serde::Deserialize)]
pub struct KubePort {
pub port: u16,
}
#[derive(Debug, Deserialize)]
pub struct ConsulService {
#[serde(rename = "ServiceTaggedAddresses")]
pub tagged_addresses: HashMap<String, ConsulTaggedAddress>,
}
#[derive(Debug, Deserialize)]
pub struct ConsulTaggedAddress {
#[serde(rename = "Address")]
pub address: String,
#[serde(rename = "Port")]
pub port: u16,
}
pub fn list_to_upstreams(lt: Option<DashMap<Arc<str>, (Vec<Arc<InnerMap>>, AtomicUsize)>>, upstreams: &UpstreamsDashMap, i: &ServiceMapping) {
if let Some(list) = lt {
match upstreams.get(&*i.hostname.clone()) {
Some(upstr) => {
for (k, v) in list {
upstr.value().insert(Arc::from(k.to_owned()), v);
}
}
None => {
upstreams.insert(Arc::from(i.hostname.clone()), list);
}
};
}
}
pub fn match_path(conf: &ServiceMapping, upstreams: &DashMap<Arc<str>, (Vec<Arc<InnerMap>>, AtomicUsize)>, values: Vec<Arc<InnerMap>>) {
match conf.path {
Some(ref p) => {
upstreams.insert(Arc::from(p.clone()), (values, AtomicUsize::new(0)));
}
None => {
upstreams.insert(Arc::from("/"), (values, AtomicUsize::new(0)));
}
}
}
async fn read_token(path: &str) -> String {
let mut file = File::open(path).await.unwrap();
let mut contents = String::new();
file.read_to_string(&mut contents).await.unwrap();
contents.trim().to_string()
}
#[async_trait]
pub trait ServiceDiscovery {
async fn fetch_upstreams(&self, config: Arc<Configuration>, toreturn: Sender<Configuration>);
}
pub struct KubernetesDiscovery;
pub struct ConsulDiscovery;
#[async_trait]
impl ServiceDiscovery for KubernetesDiscovery {
async fn fetch_upstreams(&self, config: Arc<Configuration>, mut toreturn: Sender<Configuration>) {
let prev_upstreams = UpstreamsDashMap::new();
if let Some(kuber) = config.kubernetes.clone() {
let servers = kuber.servers.unwrap_or(vec![format!(
"{}:{}",
env::var("KUBERNETES_SERVICE_HOST").unwrap_or("0.0.0.0".to_string()),
env::var("KUBERNETES_SERVICE_PORT_HTTPS").unwrap_or("0".to_string())
)]);
let end = servers.len().saturating_sub(1);
let num = if end > 0 { rand::rng().random_range(0..end) } else { 0 };
let server = servers.get(num).unwrap().to_string();
let path = kuber.tokenpath.unwrap_or("/var/run/secrets/kubernetes.io/serviceaccount/token".to_string());
let namespace = get_current_namespace().unwrap_or_else(|| "default".to_string());
let token = read_token(path.as_str()).await;
loop {
let upstreams = UpstreamsDashMap::new();
if let Some(kuber) = config.kubernetes.clone() {
if let Some(svc) = kuber.services {
for service in svc {
let header_list: DashMap<Arc<str>, Vec<(Arc<str>, Arc<str>)>> = DashMap::new();
let mut hl = Vec::new();
build_headers(&service.client_headers, config.as_ref(), &mut hl);
if !hl.is_empty() {
match service.path.clone() {
Some(path) => {
header_list.insert(Arc::from(path.as_str()), hl);
}
None => {
header_list.insert(Arc::from("/"), hl);
}
}
// header_list.insert(Arc::from(path.as_str()), hl);
// header_list.insert(Arc::from(i.path).unwrap_or(Arc::from("/")).as_str(), hl);
config.client_headers.insert(Arc::from(service.hostname.clone()), header_list);
}
let url = format!("https://{}/api/v1/namespaces/{}/endpoints/{}", server, namespace, service.hostname);
// let url = format!("https://{}/api/v1/namespaces/{}/endpoints?labelSelector=app", server, namespace);
let list = httpclient::for_kuber(&*url, &*token, &service).await;
// println!("{:?}", list);
list_to_upstreams(list, &upstreams, &service);
}
}
if let Some(lt) = clone_compare(&upstreams, &prev_upstreams, &config).await {
toreturn.send(lt).await.unwrap();
}
}
sleep(Duration::from_secs(5)).await;
}
}
}
}
fn get_current_namespace() -> Option<String> {
let ns_path = "/var/run/secrets/kubernetes.io/serviceaccount/namespace";
if Path::new(ns_path).exists() {
if let Ok(contents) = fs::read_to_string(ns_path) {
return Some(contents.trim().to_string());
}
}
std::env::var("POD_NAMESPACE").ok()
}
#[async_trait]
impl ServiceDiscovery for ConsulDiscovery {
async fn fetch_upstreams(&self, config: Arc<Configuration>, mut toreturn: Sender<Configuration>) {
let prev_upstreams = UpstreamsDashMap::new();
loop {
let upstreams = UpstreamsDashMap::new();
if let Some(consul) = config.consul.clone() {
let servers = consul.servers.unwrap_or(vec![format!(
"{}:{}",
env::var("CONSUL_SERVICE_HOST").unwrap_or("0.0.0.0".to_string()),
env::var("CONSUL_SERVICE_PORT").unwrap_or("0".to_string())
)]);
let end = servers.len().saturating_sub(1);
let num = if end > 0 { rand::rng().random_range(0..end) } else { 0 };
let consul_data = servers.get(num).unwrap().to_string();
let ss = consul_data + "/v1/catalog/service/";
if let Some(svc) = consul.services {
for i in svc {
let header_list = DashMap::new();
let mut hl = Vec::new();
build_headers(&i.client_headers, config.as_ref(), &mut hl);
if !hl.is_empty() {
match i.path.clone() {
Some(path) => {
header_list.insert(Arc::from(path.as_str()), hl);
}
None => {
header_list.insert(Arc::from("/"), hl);
}
}
// header_list.insert(i.path.clone().unwrap_or("/".to_string()), hl);
config.client_headers.insert(Arc::from(i.hostname.clone()), header_list);
}
let pref = ss.clone() + &i.upstream;
let list = httpclient::for_consul(pref, consul.token.clone(), &i).await;
list_to_upstreams(list, &upstreams, &i);
}
}
}
if let Some(lt) = clone_compare(&upstreams, &prev_upstreams, &config).await {
toreturn.send(lt).await.unwrap();
}
sleep(Duration::from_secs(5)).await;
}
}
}
async fn clone_compare(upstreams: &UpstreamsDashMap, prev_upstreams: &UpstreamsDashMap, config: &Arc<Configuration>) -> Option<Configuration> {
if !compare_dashmaps(&upstreams, &prev_upstreams) {
let tosend: Configuration = Configuration {
upstreams: Default::default(),
client_headers: config.client_headers.clone(),
server_headers: config.server_headers.clone(),
consul: config.consul.clone(),
kubernetes: config.kubernetes.clone(),
typecfg: config.typecfg.clone(),
extraparams: config.extraparams.clone(),
};
clone_dashmap_into(&upstreams, &prev_upstreams);
clone_dashmap_into(&upstreams, &tosend.upstreams);
print_upstreams(&tosend.upstreams);
return Some(tosend);
};
None
}

View File

@@ -1,9 +1,12 @@
use http::method::Method;
use pingora_http::Version; use pingora_http::Version;
use prometheus::{register_histogram, register_int_counter, register_int_counter_vec, Histogram, IntCounter, IntCounterVec}; use prometheus::{register_histogram, register_int_counter, register_int_counter_vec, Histogram, IntCounter, IntCounterVec};
use std::sync::Arc;
use std::time::Duration; use std::time::Duration;
pub struct MetricTypes { pub struct MetricTypes {
pub method: String, pub method: Method,
pub upstream: Arc<str>,
pub code: String, pub code: String,
pub latency: Duration, pub latency: Duration,
pub version: Version, pub version: Version,
@@ -33,6 +36,11 @@ lazy_static::lazy_static! {
"Number of requests by HTTP method", "Number of requests by HTTP method",
&["method"] &["method"]
).unwrap(); ).unwrap();
pub static ref REQUESTS_BY_UPSTREAM: IntCounterVec = register_int_counter_vec!(
"aralez_requests_by_upstream",
"Number of requests by UPSTREAM server",
&["method"]
).unwrap();
pub static ref REQUESTS_BY_VERSION: IntCounterVec = register_int_counter_vec!( pub static ref REQUESTS_BY_VERSION: IntCounterVec = register_int_counter_vec!(
"aralez_requests_by_version_total", "aralez_requests_by_version_total",
"Number of requests by HTTP versions", "Number of requests by HTTP versions",
@@ -57,7 +65,8 @@ pub fn calc_metrics(metric_types: &MetricTypes) {
_ => "Unknown", _ => "Unknown",
}; };
REQUESTS_BY_VERSION.with_label_values(&[&version_str]).inc(); REQUESTS_BY_VERSION.with_label_values(&[&version_str]).inc();
RESPONSE_CODES.with_label_values(&[&metric_types.code.to_string()]).inc(); RESPONSE_CODES.with_label_values(&[&metric_types.code]).inc();
REQUESTS_BY_METHOD.with_label_values(&[&metric_types.method]).inc(); REQUESTS_BY_METHOD.with_label_values(&[&metric_types.method]).inc();
REQUESTS_BY_UPSTREAM.with_label_values(&[metric_types.upstream.as_ref()]).inc();
RESPONSE_LATENCY.observe(metric_types.latency.as_secs_f64()); RESPONSE_LATENCY.observe(metric_types.latency.as_secs_f64());
} }

View File

@@ -6,11 +6,11 @@ use dashmap::DashMap;
use log::{error, info, warn}; use log::{error, info, warn};
use std::collections::HashMap; use std::collections::HashMap;
use std::sync::atomic::AtomicUsize; use std::sync::atomic::AtomicUsize;
// use std::sync::mpsc::{channel, Receiver, Sender}; use std::sync::Arc;
use std::{env, fs}; use std::{env, fs};
// use tokio::sync::oneshot::{Receiver, Sender}; // use tokio::sync::oneshot::{Receiver, Sender};
pub async fn load_configuration(d: &str, kind: &str) -> Option<Configuration> { pub async fn load_configuration(d: &str, kind: &str) -> (Option<Configuration>, String) {
let yaml_data = match kind { let yaml_data = match kind {
"filepath" => match fs::read_to_string(d) { "filepath" => match fs::read_to_string(d) {
Ok(data) => { Ok(data) => {
@@ -20,7 +20,7 @@ pub async fn load_configuration(d: &str, kind: &str) -> Option<Configuration> {
Err(e) => { Err(e) => {
error!("Reading: {}: {:?}", d, e); error!("Reading: {}: {:?}", d, e);
warn!("Running with empty upstreams list, update it via API"); warn!("Running with empty upstreams list, update it via API");
return None; return (None, e.to_string());
} }
}, },
"content" => { "content" => {
@@ -29,7 +29,7 @@ pub async fn load_configuration(d: &str, kind: &str) -> Option<Configuration> {
} }
_ => { _ => {
error!("Mismatched parameter, only filepath|content is allowed"); error!("Mismatched parameter, only filepath|content is allowed");
return None; return (None, "Mismatched parameter, only filepath|content is allowed".to_string());
} }
}; };
@@ -37,10 +37,9 @@ pub async fn load_configuration(d: &str, kind: &str) -> Option<Configuration> {
Ok(cfg) => cfg, Ok(cfg) => cfg,
Err(e) => { Err(e) => {
error!("Failed to parse upstreams file: {}", e); error!("Failed to parse upstreams file: {}", e);
return None; return (None, e.to_string());
} }
}; };
let mut toreturn = Configuration::default(); let mut toreturn = Configuration::default();
populate_headers_and_auth(&mut toreturn, &parsed).await; populate_headers_and_auth(&mut toreturn, &parsed).await;
@@ -49,36 +48,49 @@ pub async fn load_configuration(d: &str, kind: &str) -> Option<Configuration> {
match parsed.provider.as_str() { match parsed.provider.as_str() {
"file" => { "file" => {
populate_file_upstreams(&mut toreturn, &parsed).await; populate_file_upstreams(&mut toreturn, &parsed).await;
Some(toreturn) (Some(toreturn), "Ok".to_string())
} }
"consul" => { "consul" => {
toreturn.consul = parsed.consul; toreturn.consul = parsed.consul;
toreturn.consul.is_some().then_some(toreturn) (toreturn.consul.is_some().then_some(toreturn), "Ok".to_string())
} }
"kubernetes" => { "kubernetes" => {
toreturn.kubernetes = parsed.kubernetes; toreturn.kubernetes = parsed.kubernetes;
toreturn.kubernetes.is_some().then_some(toreturn) (toreturn.kubernetes.is_some().then_some(toreturn), "Ok".to_string())
} }
_ => { _ => {
warn!("Unknown provider {}", parsed.provider); warn!("Unknown provider {}", parsed.provider);
None (None, "Unknown provider".to_string())
} }
} }
} }
async fn populate_headers_and_auth(config: &mut Configuration, parsed: &Config) { async fn populate_headers_and_auth(config: &mut Configuration, parsed: &Config) {
if let Some(headers) = &parsed.headers { let mut ch: Vec<(Arc<str>, Arc<str>)> = Vec::new();
let mut hl = Vec::new(); ch.push((Arc::from("Server"), Arc::from("Aralez")));
if let Some(headers) = &parsed.client_headers {
for header in headers { for header in headers {
if let Some((key, val)) = header.split_once(':') { if let Some((key, val)) = header.split_once(':') {
hl.push((key.trim().to_string(), val.trim().to_string())); ch.push((Arc::from(key), Arc::from(val)));
} }
} }
let global_headers = DashMap::new();
global_headers.insert("/".to_string(), hl);
config.headers.insert("GLOBAL_HEADERS".to_string(), global_headers);
} }
let global_headers: DashMap<Arc<str>, Vec<(Arc<str>, Arc<str>)>> = DashMap::new();
global_headers.insert(Arc::from("/"), ch);
config.client_headers.insert(Arc::from("GLOBAL_CLIENT_HEADERS"), global_headers);
let mut sh: Vec<(Arc<str>, Arc<str>)> = Vec::new();
sh.push((Arc::from("X-Proxy-Server"), Arc::from("Aralez")));
if let Some(headers) = &parsed.server_headers {
for header in headers {
if let Some((key, val)) = header.split_once(':') {
sh.push((Arc::from(key.trim()), Arc::from(val.trim())));
}
}
}
let server_global_headers: DashMap<Arc<str>, Vec<(Arc<str>, Arc<str>)>> = DashMap::new();
server_global_headers.insert(Arc::from("/"), sh);
config.server_headers.insert(Arc::from("GLOBAL_SERVER_HEADERS"), server_global_headers);
config.extraparams.sticky_sessions = parsed.sticky_sessions; config.extraparams.sticky_sessions = parsed.sticky_sessions;
config.extraparams.to_https = parsed.to_https; config.extraparams.to_https = parsed.to_https;
@@ -91,7 +103,10 @@ async fn populate_headers_and_auth(config: &mut Configuration, parsed: &Config)
if let Some(auth) = &parsed.authorization { if let Some(auth) = &parsed.authorization {
let name = auth.get("type").unwrap_or(&"".to_string()).to_string(); let name = auth.get("type").unwrap_or(&"".to_string()).to_string();
let creds = auth.get("creds").unwrap_or(&"".to_string()).to_string(); let creds = auth.get("creds").unwrap_or(&"".to_string()).to_string();
config.extraparams.authentication.insert("authorization".to_string(), vec![name, creds]); config
.extraparams
.authentication
.insert(Arc::from("authorization"), vec![Arc::from(name), Arc::from(creds)]);
} else { } else {
config.extraparams.authentication = DashMap::new(); config.extraparams.authentication = DashMap::new();
} }
@@ -102,43 +117,41 @@ async fn populate_file_upstreams(config: &mut Configuration, parsed: &Config) {
if let Some(upstreams) = &parsed.upstreams { if let Some(upstreams) = &parsed.upstreams {
for (hostname, host_config) in upstreams { for (hostname, host_config) in upstreams {
let path_map = DashMap::new(); let path_map = DashMap::new();
let header_list = DashMap::new(); let client_header_list = DashMap::new();
let server_header_list = DashMap::new();
for (path, path_config) in &host_config.paths { for (path, path_config) in &host_config.paths {
if let Some(rate) = &path_config.rate_limit { if let Some(rate) = &path_config.rate_limit {
info!("Applied Rate Limit for {} : {} request per second", hostname, rate); info!("Applied Rate Limit for {} : {} request per second", hostname, rate);
} }
let mut hl: Vec<(Arc<str>, Arc<str>)> = Vec::new();
let mut sl: Vec<(Arc<str>, Arc<str>)> = Vec::new();
build_headers(&path_config.client_headers, config, &mut hl);
build_headers(&path_config.server_headers, config, &mut sl);
client_header_list.insert(Arc::from(path.as_str()), hl);
server_header_list.insert(Arc::from(path.as_str()), sl);
let mut server_list = Vec::new(); let mut server_list = Vec::new();
let mut hl = Vec::new();
if let Some(headers) = &path_config.headers {
for header in headers {
if let Some((key, val)) = header.split_once(':') {
hl.push((key.trim().to_string(), val.trim().to_string()));
}
}
}
header_list.insert(path.clone(), hl);
for server in &path_config.servers { for server in &path_config.servers {
if let Some((ip, port_str)) = server.split_once(':') { if let Some((ip, port_str)) = server.split_once(':') {
if let Ok(port) = port_str.parse::<u16>() { if let Ok(port) = port_str.parse::<u16>() {
server_list.push(InnerMap { server_list.push(Arc::from(InnerMap {
address: ip.trim().to_string(), address: ip.trim().parse().unwrap(),
port, port,
is_ssl: true, is_ssl: true,
is_http2: false, is_http2: false,
to_https: path_config.to_https.unwrap_or(false), to_https: path_config.to_https.unwrap_or(false),
rate_limit: path_config.rate_limit, rate_limit: path_config.rate_limit,
healthcheck: path_config.healthcheck, healthcheck: path_config.healthcheck,
}); }));
} }
} }
} }
path_map.insert(path.clone(), (server_list, AtomicUsize::new(0))); path_map.insert(Arc::from(path.clone()), (server_list, AtomicUsize::new(0)));
} }
config.headers.insert(hostname.clone(), header_list); config.client_headers.insert(Arc::from(hostname.clone()), client_header_list);
imtdashmap.insert(hostname.clone(), path_map); config.server_headers.insert(Arc::from(hostname.clone()), server_header_list);
imtdashmap.insert(Arc::from(hostname.clone()), path_map);
} }
if is_first_run() { if is_first_run() {
@@ -224,3 +237,20 @@ fn log_builder(conf: &AppConfig) {
} }
env_logger::builder().init(); env_logger::builder().init();
} }
pub fn build_headers(path_config: &Option<Vec<String>>, _config: &Configuration, hl: &mut Vec<(Arc<str>, Arc<str>)>) {
if let Some(headers) = &path_config {
for header in headers {
if let Some((key, val)) = header.split_once(':') {
hl.push((Arc::from(key.trim()), Arc::from(val.trim())));
}
}
// if let Some(push) = config.client_headers.get("GLOBAL_HEADERS") {
// for k in push.iter() {
// for x in k.value() {
// hl.push(x.to_owned());
// }
// }
// }
}
}

View File

@@ -1,25 +1,31 @@
use dashmap::DashMap; use dashmap::DashMap;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use std::collections::HashMap; use std::collections::HashMap;
use std::net::IpAddr;
use std::sync::atomic::AtomicUsize; use std::sync::atomic::AtomicUsize;
use std::sync::Arc;
pub type UpstreamsDashMap = DashMap<String, DashMap<String, (Vec<InnerMap>, AtomicUsize)>>; pub type UpstreamsDashMap = DashMap<Arc<str>, DashMap<Arc<str>, (Vec<Arc<InnerMap>>, AtomicUsize)>>;
pub type UpstreamsIdMap = DashMap<String, InnerMap>; pub type UpstreamsIdMap = DashMap<Arc<str>, Arc<InnerMap>>;
pub type Headers = DashMap<String, DashMap<String, Vec<(String, String)>>>; pub type Headers = DashMap<Arc<str>, DashMap<Arc<str>, Vec<(Arc<str>, Arc<str>)>>>;
#[derive(Debug, Default, Clone, Serialize, Deserialize)] #[derive(Debug, Default, Clone, Serialize, Deserialize)]
pub struct ServiceMapping { pub struct ServiceMapping {
pub proxy: String, pub upstream: String,
pub real: String, pub hostname: String,
pub path: Option<String>, pub path: Option<String>,
pub to_https: Option<bool>,
pub rate_limit: Option<isize>,
pub client_headers: Option<Vec<String>>,
pub server_headers: Option<Vec<String>>,
} }
#[derive(Clone, Debug, Default)] #[derive(Clone, Debug, Default)]
pub struct Extraparams { pub struct Extraparams {
pub sticky_sessions: bool, pub sticky_sessions: bool,
pub to_https: Option<bool>, pub to_https: Option<bool>,
pub authentication: DashMap<String, Vec<String>>, pub authentication: DashMap<Arc<str>, Vec<Arc<str>>>,
pub rate_limit: Option<isize>, pub rate_limit: Option<isize>,
} }
#[derive(Clone, Default, Debug, Serialize, Deserialize)] #[derive(Clone, Default, Debug, Serialize, Deserialize)]
@@ -45,7 +51,9 @@ pub struct Config {
#[serde(default)] #[serde(default)]
pub globals: Option<HashMap<String, Vec<String>>>, pub globals: Option<HashMap<String, Vec<String>>>,
#[serde(default)] #[serde(default)]
pub headers: Option<Vec<String>>, pub client_headers: Option<Vec<String>>,
#[serde(default)]
pub server_headers: Option<Vec<String>>,
#[serde(default)] #[serde(default)]
pub authorization: Option<HashMap<String, String>>, pub authorization: Option<HashMap<String, String>>,
#[serde(default)] #[serde(default)]
@@ -66,14 +74,16 @@ pub struct HostConfig {
pub struct PathConfig { pub struct PathConfig {
pub servers: Vec<String>, pub servers: Vec<String>,
pub to_https: Option<bool>, pub to_https: Option<bool>,
pub headers: Option<Vec<String>>, pub client_headers: Option<Vec<String>>,
pub server_headers: Option<Vec<String>>,
pub rate_limit: Option<isize>, pub rate_limit: Option<isize>,
pub healthcheck: Option<bool>, pub healthcheck: Option<bool>,
} }
#[derive(Debug, Default)] #[derive(Debug, Default)]
pub struct Configuration { pub struct Configuration {
pub upstreams: UpstreamsDashMap, pub upstreams: UpstreamsDashMap,
pub headers: Headers, pub client_headers: Headers,
pub server_headers: Headers,
pub consul: Option<Consul>, pub consul: Option<Consul>,
pub kubernetes: Option<Kubernetes>, pub kubernetes: Option<Kubernetes>,
pub typecfg: String, pub typecfg: String,
@@ -104,9 +114,9 @@ pub struct AppConfig {
pub rungroup: Option<String>, pub rungroup: Option<String>,
} }
#[derive(Debug, Clone, PartialEq, Eq, Hash)] #[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize)]
pub struct InnerMap { pub struct InnerMap {
pub address: String, pub address: IpAddr,
pub port: u16, pub port: u16,
pub is_ssl: bool, pub is_ssl: bool,
pub is_http2: bool, pub is_http2: bool,
@@ -119,7 +129,7 @@ pub struct InnerMap {
impl InnerMap { impl InnerMap {
pub fn new() -> Self { pub fn new() -> Self {
Self { Self {
address: Default::default(), address: "127.0.0.1".parse().unwrap(),
port: Default::default(), port: Default::default(),
is_ssl: Default::default(), is_ssl: Default::default(),
is_http2: Default::default(), is_http2: Default::default(),
@@ -129,3 +139,10 @@ impl InnerMap {
} }
} }
} }
#[derive(Serialize)]
pub struct UpstreamSnapshot {
pub backends: Vec<InnerMap>,
pub requests: usize,
}
// pub type UpstreamsSnapshot = HashMap<String, HashMap<String, UpstreamSnapshot>>;

View File

@@ -1,4 +1,4 @@
use crate::utils::structs::{InnerMap, UpstreamsDashMap, UpstreamsIdMap}; use crate::utils::structs::{InnerMap, UpstreamSnapshot, UpstreamsDashMap, UpstreamsIdMap};
use crate::utils::tls; use crate::utils::tls;
use crate::utils::tls::CertificateConfig; use crate::utils::tls::CertificateConfig;
use dashmap::DashMap; use dashmap::DashMap;
@@ -6,6 +6,7 @@ use log::{error, info};
use notify::{event::ModifyKind, Config, EventKind, RecommendedWatcher, RecursiveMode, Watcher}; use notify::{event::ModifyKind, Config, EventKind, RecommendedWatcher, RecursiveMode, Watcher};
use port_check::is_port_reachable; use port_check::is_port_reachable;
use privdrop::PrivDrop; use privdrop::PrivDrop;
use serde_json::{json, Value};
use sha2::{Digest, Sha256}; use sha2::{Digest, Sha256};
use std::any::type_name; use std::any::type_name;
use std::collections::{HashMap, HashSet}; use std::collections::{HashMap, HashSet};
@@ -13,8 +14,9 @@ use std::fmt::Write;
use std::net::SocketAddr; use std::net::SocketAddr;
use std::os::unix::fs::MetadataExt; use std::os::unix::fs::MetadataExt;
use std::str::FromStr; use std::str::FromStr;
use std::sync::atomic::AtomicUsize; use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::mpsc::{channel, Sender}; use std::sync::mpsc::{channel, Sender};
use std::sync::Arc;
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
use std::{fs, process, thread, time}; use std::{fs, process, thread, time};
@@ -29,8 +31,13 @@ pub fn print_upstreams(upstreams: &UpstreamsDashMap) {
println!(" Path: {}", path); println!(" Path: {}", path);
for f in path_entry.value().0.clone() { for f in path_entry.value().0.clone() {
println!( println!(
" IP: {}, Port: {}, SSL: {}, H2: {}, To HTTPS: {}", " IP: {}, Port: {}, SSL: {}, H2: {}, To HTTPS: {}, Rate Limit: {}",
f.address, f.port, f.is_ssl, f.is_http2, f.to_https f.address,
f.port,
f.is_ssl,
f.is_http2,
f.to_https,
f.rate_limit.unwrap_or(0)
); );
} }
} }
@@ -127,7 +134,7 @@ pub fn compare_dashmaps(map1: &UpstreamsDashMap, map2: &UpstreamsDashMap) -> boo
true true
} }
pub fn merge_headers(target: &DashMap<String, Vec<(String, String)>>, source: &DashMap<String, Vec<(String, String)>>) { pub fn merge_headers(target: &DashMap<Arc<str>, Vec<(Arc<str>, Arc<str>)>>, source: &DashMap<Arc<str>, Vec<(Arc<str>, Arc<str>)>>) {
for entry in source.iter() { for entry in source.iter() {
let global_key = entry.key().clone(); let global_key = entry.key().clone();
let global_values = entry.value().clone(); let global_values = entry.value().clone();
@@ -154,7 +161,7 @@ pub fn clone_idmap_into(original: &UpstreamsDashMap, cloned: &UpstreamsIdMap) {
let hex_hash = base16ct::lower::encode_string(&hash); let hex_hash = base16ct::lower::encode_string(&hash);
let hh = hex_hash[0..50].to_string(); let hh = hex_hash[0..50].to_string();
let to_add = InnerMap { let to_add = InnerMap {
address: hh.clone(), address: "127.0.0.1".parse().unwrap(),
port: 0, port: 0,
is_ssl: false, is_ssl: false,
is_http2: false, is_http2: false,
@@ -162,8 +169,8 @@ pub fn clone_idmap_into(original: &UpstreamsDashMap, cloned: &UpstreamsIdMap) {
rate_limit: None, rate_limit: None,
healthcheck: None, healthcheck: None,
}; };
cloned.insert(id, to_add); cloned.insert(Arc::from(id.as_str()), Arc::from(to_add));
cloned.insert(hh, x.to_owned()); cloned.insert(Arc::from(hh.as_str()), Arc::from(x.to_owned()));
} }
new_inner_map.insert(path.clone(), new_vec); new_inner_map.insert(path.clone(), new_vec);
} }
@@ -263,3 +270,75 @@ pub fn check_priv(addr: &str) {
false => {} false => {}
} }
} }
#[allow(dead_code)]
pub fn upstreams_to_json(upstreams: &UpstreamsDashMap) -> serde_json::Result<String> {
let mut outer = HashMap::new();
for outer_entry in upstreams.iter() {
let mut inner_map = HashMap::new();
for inner_entry in outer_entry.value().iter() {
let (backends, counter) = inner_entry.value();
inner_map.insert(
inner_entry.key().to_string(),
UpstreamSnapshot {
backends: backends.iter().map(|a| (**a).clone()).collect(),
requests: counter.load(Ordering::Relaxed),
},
);
}
outer.insert(outer_entry.key().to_string(), inner_map);
}
// serde_json::to_string_pretty(&outer)
serde_json::to_string(&outer)
}
pub fn upstreams_liveness_json(configured: &UpstreamsDashMap, current: &UpstreamsDashMap) -> Value {
let mut result = serde_json::Map::new();
for host_entry in configured.iter() {
let hostname = host_entry.key().to_string();
let configured_paths = host_entry.value();
let mut paths_json = serde_json::Map::new();
for path_entry in configured_paths.iter() {
let path = path_entry.key().clone();
let (configured_backends, _) = path_entry.value();
let backends_json: Vec<Value> = configured_backends
.iter()
.map(|backend| {
let alive = if let Some(host_map) = current.get(&*hostname) {
if let Some(path_entry) = host_map.get(&*path) {
let list = &path_entry.value().0; // Vec<Arc<InnerMap>>
list.iter().any(|b| b.address == backend.address && b.port == backend.port)
} else {
false
}
} else {
false
};
json!({
"address": backend.address,
"port": backend.port,
"alive": alive
})
})
.collect();
paths_json.insert(
path.to_string(),
json!({
"backends": backends_json
}),
);
}
result.insert(hostname, Value::Object(paths_json));
}
Value::Object(result)
}

View File

@@ -21,6 +21,7 @@ impl BackgroundService for LB {
let tx_api = tx.clone(); let tx_api = tx.clone();
let config = load_configuration(self.config.upstreams_conf.clone().as_str(), "filepath") let config = load_configuration(self.config.upstreams_conf.clone().as_str(), "filepath")
.await .await
.0
.expect("Failed to load configuration"); .expect("Failed to load configuration");
match config.typecfg.as_str() { match config.typecfg.as_str() {
@@ -58,6 +59,8 @@ impl BackgroundService for LB {
tls_key_file: self.config.config_tls_key_file.clone(), tls_key_file: self.config.config_tls_key_file.clone(),
file_server_address: self.config.file_server_address.clone(), file_server_address: self.config.file_server_address.clone(),
file_server_folder: self.config.file_server_folder.clone(), file_server_folder: self.config.file_server_folder.clone(),
current_upstreams: self.ump_upst.clone(),
full_upstreams: self.ump_full.clone(),
}; };
// let tx_api = tx.clone(); // let tx_api = tx.clone();
let _ = tokio::spawn(async move { api_load.start(tx_api).await }); let _ = tokio::spawn(async move { api_load.start(tx_api).await });
@@ -85,22 +88,38 @@ impl BackgroundService for LB {
new.authentication = ss.extraparams.authentication.clone(); new.authentication = ss.extraparams.authentication.clone();
new.rate_limit = ss.extraparams.rate_limit; new.rate_limit = ss.extraparams.rate_limit;
self.extraparams.store(Arc::new(new)); self.extraparams.store(Arc::new(new));
self.headers.clear(); self.client_headers.clear();
self.server_headers.clear();
for entry in ss.upstreams.iter() { for entry in ss.upstreams.iter() {
let global_key = entry.key().clone(); let global_key = entry.key().clone();
let global_values = DashMap::new(); let client_global_values = DashMap::new();
let mut target_entry = ss.headers.entry(global_key).or_insert_with(DashMap::new); let server_global_values = DashMap::new();
target_entry.extend(global_values);
self.headers.insert(target_entry.key().to_owned(), target_entry.value().to_owned()); let mut client_target_entry = ss.client_headers.entry(global_key.clone()).or_insert_with(DashMap::new);
client_target_entry.extend(client_global_values);
let mut server_target_entry = ss.server_headers.entry(global_key).or_insert_with(DashMap::new);
server_target_entry.extend(server_global_values);
self.server_headers.insert(server_target_entry.key().to_owned(), server_target_entry.value().to_owned());
} }
for path in ss.headers.iter() { for path in ss.client_headers.iter() {
let path_key = path.key().clone(); let path_key = path.key().clone();
let path_headers = path.value().clone(); let path_headers = path.value().clone();
self.headers.insert(path_key.clone(), path_headers); self.client_headers.insert(path_key.clone(), path_headers);
if let Some(global_headers) = ss.headers.get("GLOBAL_HEADERS") { if let Some(global_headers) = ss.client_headers.get("GLOBAL_CLIENT_HEADERS") {
if let Some(existing_headers) = self.headers.get_mut(&path_key) { if let Some(existing_headers) = self.client_headers.get_mut(&path_key) {
merge_headers(&existing_headers, &global_headers);
}
}
}
for path in ss.server_headers.iter() {
let path_key = path.key().clone();
let path_headers = path.value().clone();
self.server_headers.insert(path_key.clone(), path_headers);
if let Some(global_headers) = ss.server_headers.get("GLOBAL_SERVER_HEADERS") {
if let Some(existing_headers) = self.server_headers.get_mut(&path_key) {
merge_headers(&existing_headers, &global_headers); merge_headers(&existing_headers, &global_headers);
} }
} }

View File

@@ -2,77 +2,103 @@ use crate::utils::structs::InnerMap;
use crate::web::proxyhttp::LB; use crate::web::proxyhttp::LB;
use async_trait::async_trait; use async_trait::async_trait;
use std::sync::atomic::Ordering; use std::sync::atomic::Ordering;
use std::sync::Arc;
#[derive(Debug, Clone)]
pub struct GetHostsReturHeaders {
pub client_headers: Option<Vec<(Arc<str>, Arc<str>)>>,
pub server_headers: Option<Vec<(Arc<str>, Arc<str>)>>,
}
#[async_trait] #[async_trait]
pub trait GetHost { pub trait GetHost {
fn get_host(&self, peer: &str, path: &str, backend_id: Option<&str>) -> Option<InnerMap>; fn get_host(&self, peer: &str, path: &str, backend_id: Option<&str>) -> Option<Arc<InnerMap>>;
fn get_header(&self, peer: &str, path: &str) -> Option<Vec<(String, String)>>;
fn get_header(&self, peer: &str, path: &str) -> Option<GetHostsReturHeaders>;
// fn get_upstreams(&self) -> Arc<UpstreamsDashMap>;
} }
#[async_trait] #[async_trait]
impl GetHost for LB { impl GetHost for LB {
fn get_host(&self, peer: &str, path: &str, backend_id: Option<&str>) -> Option<InnerMap> { // fn get_upstreams(&self) -> Arc<UpstreamsDashMap> {
// self.ump_full.clone()
// }
fn get_host(&self, peer: &str, path: &str, backend_id: Option<&str>) -> Option<Arc<InnerMap>> {
if let Some(b) = backend_id { if let Some(b) = backend_id {
if let Some(bb) = self.ump_byid.get(b) { if let Some(bb) = self.ump_byid.get(b) {
// println!("BIB :===> {:?}", Some(bb.value()));
return Some(bb.value().clone()); return Some(bb.value().clone());
} }
} }
let host_entry = self.ump_upst.get(peer)?; let host_entry = self.ump_upst.get(peer)?;
let mut current_path = path.to_string(); let mut end = path.len();
let mut best_match: Option<InnerMap> = None;
loop { loop {
if let Some(entry) = host_entry.get(&current_path) { let slice = &path[..end];
if let Some(entry) = host_entry.get(slice) {
let (servers, index) = entry.value(); let (servers, index) = entry.value();
if !servers.is_empty() { if !servers.is_empty() {
let idx = index.fetch_add(1, Ordering::Relaxed) % servers.len(); let idx = index.fetch_add(1, Ordering::Relaxed) % servers.len();
best_match = Some(servers[idx].clone()); return Some(servers[idx].clone());
break;
} }
} }
if let Some(pos) = current_path.rfind('/') { if let Some(pos) = slice.rfind('/') {
current_path.truncate(pos); end = pos;
} else { } else {
break; break;
} }
} }
if best_match.is_none() { if let Some(entry) = host_entry.get("/") {
if let Some(entry) = host_entry.get("/") { let (servers, index) = entry.value();
let (servers, index) = entry.value(); if !servers.is_empty() {
if !servers.is_empty() { let idx = index.fetch_add(1, Ordering::Relaxed) % servers.len();
let idx = index.fetch_add(1, Ordering::Relaxed) % servers.len(); return Some(servers[idx].clone());
best_match = Some(servers[idx].clone());
}
} }
} }
// println!("Best Match :===> {:?}", best_match); None
best_match
} }
fn get_header(&self, peer: &str, path: &str) -> Option<Vec<(String, String)>> {
let host_entry = self.headers.get(peer)?;
let mut current_path = path.to_string();
let mut best_match: Option<Vec<(String, String)>> = None;
fn get_header(&self, peer: &str, path: &str) -> Option<GetHostsReturHeaders> {
let client_entry = self.client_headers.get(peer)?;
let server_entry = self.server_headers.get(peer)?;
let mut current_path = path;
let mut clnt_match = None;
loop { loop {
if let Some(entry) = host_entry.get(&current_path) { if let Some(entry) = client_entry.get(current_path) {
if !entry.value().is_empty() { if !entry.value().is_empty() {
best_match = Some(entry.value().clone()); clnt_match = Some(entry.value().clone());
break; break;
} }
} }
if let Some(pos) = current_path.rfind('/') { if let Some(pos) = current_path.rfind('/') {
current_path.truncate(pos); current_path = if pos == 0 { "/" } else { &current_path[..pos] };
} else { } else {
break; break;
} }
} }
if best_match.is_none() { current_path = path;
if let Some(entry) = host_entry.get("/") { let mut serv_match = None;
loop {
if let Some(entry) = server_entry.get(current_path) {
if !entry.value().is_empty() { if !entry.value().is_empty() {
best_match = Some(entry.value().clone()); serv_match = Some(entry.value().clone());
break;
}
}
if let Some(pos) = current_path.rfind('/') {
current_path = if pos == 0 { "/" } else { &current_path[..pos] };
} else {
break;
}
if serv_match.is_none() {
if let Some(entry) = server_entry.get("/") {
if !entry.value().is_empty() {
serv_match = Some(entry.value().clone());
break;
}
} }
} }
} }
best_match Some(GetHostsReturHeaders {
client_headers: clnt_match,
server_headers: serv_match,
})
} }
} }

View File

@@ -14,6 +14,7 @@ use pingora_core::listeners::ALPN;
use pingora_core::prelude::HttpPeer; use pingora_core::prelude::HttpPeer;
use pingora_limits::rate::Rate; use pingora_limits::rate::Rate;
use pingora_proxy::{ProxyHttp, Session}; use pingora_proxy::{ProxyHttp, Session};
// use std::net::{IpAddr, Ipv4Addr};
use std::sync::Arc; use std::sync::Arc;
use std::time::Duration; use std::time::Duration;
use tokio::time::Instant; use tokio::time::Instant;
@@ -25,19 +26,22 @@ pub struct LB {
pub ump_upst: Arc<UpstreamsDashMap>, pub ump_upst: Arc<UpstreamsDashMap>,
pub ump_full: Arc<UpstreamsDashMap>, pub ump_full: Arc<UpstreamsDashMap>,
pub ump_byid: Arc<UpstreamsIdMap>, pub ump_byid: Arc<UpstreamsIdMap>,
pub headers: Arc<Headers>, pub client_headers: Arc<Headers>,
pub server_headers: Arc<Headers>,
pub config: Arc<AppConfig>, pub config: Arc<AppConfig>,
pub extraparams: Arc<ArcSwap<Extraparams>>, pub extraparams: Arc<ArcSwap<Extraparams>>,
} }
pub struct Context { pub struct Context {
backend_id: String, backend_id: Arc<str>,
// backend_id: Arc<(IpAddr, u16, bool)>,
to_https: bool, to_https: bool,
redirect_to: String, redirect_to: Arc<str>,
start_time: Instant, start_time: Instant,
hostname: Option<String>, hostname: Option<Arc<str>>,
upstream_peer: Option<InnerMap>, upstream_peer: Option<Arc<InnerMap>>,
extraparams: arc_swap::Guard<Arc<Extraparams>>, extraparams: arc_swap::Guard<Arc<Extraparams>>,
client_headers: Arc<Vec<(Arc<str>, Arc<str>)>>,
} }
#[async_trait] #[async_trait]
@@ -45,20 +49,21 @@ impl ProxyHttp for LB {
type CTX = Context; type CTX = Context;
fn new_ctx(&self) -> Self::CTX { fn new_ctx(&self) -> Self::CTX {
Context { Context {
backend_id: String::new(), backend_id: Arc::from(""),
to_https: false, to_https: false,
redirect_to: String::new(), redirect_to: Arc::from(""),
start_time: Instant::now(), start_time: Instant::now(),
hostname: None, hostname: None,
upstream_peer: None, upstream_peer: None,
extraparams: self.extraparams.load(), extraparams: self.extraparams.load(),
client_headers: Arc::new(Vec::new()),
} }
} }
async fn request_filter(&self, session: &mut Session, _ctx: &mut Self::CTX) -> Result<bool> { async fn request_filter(&self, session: &mut Session, _ctx: &mut Self::CTX) -> Result<bool> {
let ep = _ctx.extraparams.clone(); let ep = _ctx.extraparams.as_ref();
if let Some(auth) = ep.authentication.get("authorization") { if let Some(auth) = ep.authentication.get("authorization") {
let authenticated = authenticate(&auth.value(), &session); let authenticated = authenticate(auth.value(), &session);
if !authenticated { if !authenticated {
let _ = session.respond_error(401).await; let _ = session.respond_error(401).await;
warn!("Forbidden: {:?}, {}", session.client_addr(), session.req_header().uri.path()); warn!("Forbidden: {:?}, {}", session.client_addr(), session.req_header().uri.path());
@@ -66,7 +71,7 @@ impl ProxyHttp for LB {
} }
}; };
let hostname = return_header_host(&session); let hostname = return_header_host_from_upstream(session, &self.ump_upst);
_ctx.hostname = hostname; _ctx.hostname = hostname;
let mut backend_id = None; let mut backend_id = None;
@@ -89,7 +94,7 @@ impl ProxyHttp for LB {
None => return Ok(false), None => return Ok(false),
Some(host) => { Some(host) => {
// let optioninnermap = self.get_host(host.as_str(), host.as_str(), backend_id); // let optioninnermap = self.get_host(host.as_str(), host.as_str(), backend_id);
let optioninnermap = self.get_host(host.as_str(), session.req_header().uri.path(), backend_id); let optioninnermap = self.get_host(host, session.req_header().uri.path(), backend_id);
match optioninnermap { match optioninnermap {
None => return Ok(false), None => return Ok(false),
Some(ref innermap) => { Some(ref innermap) => {
@@ -116,54 +121,46 @@ impl ProxyHttp for LB {
Ok(false) Ok(false)
} }
async fn upstream_peer(&self, session: &mut Session, ctx: &mut Self::CTX) -> Result<Box<HttpPeer>> { async fn upstream_peer(&self, session: &mut Session, ctx: &mut Self::CTX) -> Result<Box<HttpPeer>> {
// let host_name = return_header_host(&session);
match ctx.hostname.as_ref() { match ctx.hostname.as_ref() {
Some(hostname) => { Some(hostname) => match ctx.upstream_peer.as_ref() {
match ctx.upstream_peer.as_ref() { Some(innermap) => {
// Some((address, port, ssl, is_h2, to_https)) => { let mut peer = Box::new(HttpPeer::new((innermap.address, innermap.port), innermap.is_ssl, String::new()));
Some(innermap) => { if innermap.is_http2 {
let mut peer = Box::new(HttpPeer::new((innermap.address.clone(), innermap.port.clone()), innermap.is_ssl, String::new())); peer.options.alpn = ALPN::H2;
// if session.is_http2() { }
if innermap.is_http2 { if innermap.is_ssl {
peer.options.alpn = ALPN::H2; peer.sni = hostname.to_string();
} peer.options.verify_cert = false;
if innermap.is_ssl { peer.options.verify_hostname = false;
peer.sni = hostname.clone(); }
peer.options.verify_cert = false; if ctx.to_https || innermap.to_https {
peer.options.verify_hostname = false; if let Some(stream) = session.stream() {
} if stream.get_ssl().is_none() {
if ctx.to_https || innermap.to_https { if let Some(host) = ctx.hostname.as_ref() {
if let Some(stream) = session.stream() { let uri = session.req_header().uri.path_and_query().map_or("/", |pq| pq.as_str());
if stream.get_ssl().is_none() { let port = self.config.proxy_port_tls.unwrap_or(403);
if let Some(addr) = session.server_addr() { ctx.to_https = true;
if let Some((host, _)) = addr.to_string().split_once(':') { ctx.redirect_to = Arc::from(format!("https://{}:{}{}", host, port, uri));
let uri = session.req_header().uri.path_and_query().map_or("/", |pq| pq.as_str());
let port = self.config.proxy_port_tls.unwrap_or(403);
ctx.to_https = true;
ctx.redirect_to = format!("https://{}:{}{}", host, port, uri);
}
}
} }
} }
} }
ctx.backend_id = format!("{}:{}:{}", innermap.address.clone(), innermap.port.clone(), innermap.is_ssl);
Ok(peer)
}
None => {
if let Err(e) = session.respond_error_with_body(502, Bytes::from("502 Bad Gateway\n")).await {
error!("Failed to send error response: {:?}", e);
}
Err(Box::new(Error {
etype: HTTPStatus(502),
esource: Upstream,
retry: RetryType::Decided(false),
cause: None,
context: Option::from(ImmutStr::Static("Upstream not found")),
}))
} }
ctx.backend_id = Arc::from(format!("{}:{}:{}", innermap.address, innermap.port, innermap.is_ssl));
Ok(peer)
} }
} None => {
if let Err(e) = session.respond_error_with_body(502, Bytes::from("502 Bad Gateway\n")).await {
error!("Failed to send error response: {:?}", e);
}
Err(Box::new(Error {
etype: HTTPStatus(502),
esource: Upstream,
retry: RetryType::Decided(false),
cause: None,
context: Option::from(ImmutStr::Static("Upstream not found")),
}))
}
},
None => { None => {
// session.respond_error_with_body(502, Bytes::from("502 Bad Gateway\n")).await.expect("Failed to send error"); // session.respond_error_with_body(502, Bytes::from("502 Bad Gateway\n")).await.expect("Failed to send error");
if let Err(e) = session.respond_error_with_body(502, Bytes::from("502 Bad Gateway\n")).await { if let Err(e) = session.respond_error_with_body(502, Bytes::from("502 Bad Gateway\n")).await {
@@ -180,62 +177,44 @@ impl ProxyHttp for LB {
} }
} }
async fn upstream_request_filter(&self, _session: &mut Session, upstream_request: &mut RequestHeader, ctx: &mut Self::CTX) -> Result<()> { async fn upstream_request_filter(&self, session: &mut Session, upstream_request: &mut RequestHeader, ctx: &mut Self::CTX) -> Result<()> {
if let Some(hostname) = ctx.hostname.as_ref() { if let Some(hostname) = ctx.hostname.as_ref() {
upstream_request.insert_header("Host", hostname)?; upstream_request.insert_header("Host", hostname.as_ref())?;
} }
if let Some(peer) = ctx.upstream_peer.as_ref() { if let Some(peer) = ctx.upstream_peer.as_ref() {
upstream_request.insert_header("X-Forwarded-For", peer.address.as_str())?; upstream_request.insert_header("X-Forwarded-For", peer.address.to_string())?;
} }
if let Some(headers) = self.get_header(ctx.hostname.as_ref().unwrap_or(&Arc::from("localhost")), session.req_header().uri.path()) {
if let Some(server_headers) = headers.server_headers {
for (k, v) in server_headers.iter() {
upstream_request.insert_header(k.to_string(), v.as_ref())?;
}
}
if let Some(client_headers) = headers.client_headers {
ctx.client_headers = Arc::new(client_headers);
}
}
Ok(()) Ok(())
} }
// async fn request_body_filter(&self, _session: &mut Session, _body: &mut Option<Bytes>, _end_of_stream: bool, _ctx: &mut Self::CTX) -> Result<()>
// where
// Self::CTX: Send + Sync,
// {
// Ok(())
// }
async fn response_filter(&self, session: &mut Session, _upstream_response: &mut ResponseHeader, ctx: &mut Self::CTX) -> Result<()> { async fn response_filter(&self, session: &mut Session, _upstream_response: &mut ResponseHeader, ctx: &mut Self::CTX) -> Result<()> {
// _upstream_response.insert_header("X-Proxied-From", "Fooooooooooooooo").unwrap();
if ctx.extraparams.sticky_sessions { if ctx.extraparams.sticky_sessions {
let backend_id = ctx.backend_id.clone(); if let Some(bid) = self.ump_byid.get(ctx.backend_id.as_ref()) {
if let Some(bid) = self.ump_byid.get(&backend_id) {
let _ = _upstream_response.insert_header("set-cookie", format!("backend_id={}; Path=/; Max-Age=600; HttpOnly; SameSite=Lax", bid.address)); let _ = _upstream_response.insert_header("set-cookie", format!("backend_id={}; Path=/; Max-Age=600; HttpOnly; SameSite=Lax", bid.address));
} }
} }
if ctx.to_https { if ctx.to_https {
let mut redirect_response = ResponseHeader::build(StatusCode::MOVED_PERMANENTLY, None)?; let mut redirect_response = ResponseHeader::build(StatusCode::MOVED_PERMANENTLY, None)?;
redirect_response.insert_header("Location", ctx.redirect_to.clone())?; redirect_response.insert_header("Location", ctx.redirect_to.as_ref())?;
redirect_response.insert_header("Content-Length", "0")?; redirect_response.insert_header("Content-Length", "0")?;
session.write_response_header(Box::new(redirect_response), false).await?; session.write_response_header(Box::new(redirect_response), false).await?;
} }
match ctx.hostname.as_ref() {
Some(host) => { for (k, v) in ctx.client_headers.iter() {
let path = session.req_header().uri.path(); _upstream_response.insert_header(k.to_string(), v.as_ref())?;
let host_header = host;
let split_header = host_header.split_once(':');
match split_header {
Some(sh) => {
let yoyo = self.get_header(sh.0, path);
for k in yoyo.iter() {
for t in k.iter() {
_upstream_response.insert_header(t.0.clone(), t.1.clone()).unwrap();
}
}
}
None => {
let yoyo = self.get_header(host_header, path);
for k in yoyo.iter() {
for t in k.iter() {
_upstream_response.insert_header(t.0.clone(), t.1.clone()).unwrap();
}
}
}
}
}
None => {}
} }
session.set_keepalive(Some(300)); session.set_keepalive(Some(300));
Ok(()) Ok(())
} }
@@ -244,28 +223,67 @@ impl ProxyHttp for LB {
let response_code = session.response_written().map_or(0, |resp| resp.status.as_u16()); let response_code = session.response_written().map_or(0, |resp| resp.status.as_u16());
debug!("{}, response code: {response_code}", self.request_summary(session, ctx)); debug!("{}, response code: {response_code}", self.request_summary(session, ctx));
let m = &MetricTypes { let m = &MetricTypes {
method: session.req_header().method.to_string(), method: session.req_header().method.clone(),
// method: Arc::from(session.req_header().method.as_str()),
code: session.response_written().map(|resp| resp.status.as_str().to_owned()).unwrap_or("0".to_string()), code: session.response_written().map(|resp| resp.status.as_str().to_owned()).unwrap_or("0".to_string()),
latency: ctx.start_time.elapsed(), latency: ctx.start_time.elapsed(),
version: session.req_header().version, version: session.req_header().version,
upstream: ctx.hostname.clone().unwrap_or(Arc::from("localhost")),
}; };
calc_metrics(m); calc_metrics(m);
} }
} }
fn return_header_host(session: &Session) -> Option<String> { // use moka::sync::Cache;
if session.is_http2() { // Using Moka for a high-concurrency, size-limited cache
match session.req_header().uri.host() { // static HOST_CACHE: Lazy<Cache<String, Arc<str>>> = Lazy::new(|| {
Some(host) => Option::from(host.to_string()), // Cache::builder()
None => None, // .max_capacity(10_000) // Limits memory usage if attacked
} // .build()
// });
// fn return_header_host_cached(session: &Session) -> Option<Arc<str>> {
// let host_str = if session.is_http2() {
// session.req_header().uri.host()?
// } else {
// let h = session.req_header().headers.get("host")?.to_str().ok()?;
// h.split_once(':').map_or(h, |(host, _)| host)
// };
// HOST_CACHE
// .get_with(host_str.to_string(), || {
// Arc::from(host_str)
// })
// .into()
// }
// use dashmap::DashMap;
// A simple cache to reuse Arcs for common hostnames
// static HOST_CACHE: Lazy<DashMap<String, Arc<str>>> = Lazy::new(|| DashMap::with_capacity(200));
//
// fn return_header_host_cached(session: &Session) -> Option<Arc<str>> {
// let host_str = if session.is_http2() {
// session.req_header().uri.host()?
// } else {
// let h = session.req_header().headers.get("host")?.to_str().ok()?;
// h.split_once(':').map_or(h, |(host, _)| host)
// };
//
// // Fast path: check if we already have an Arc for this host
// if let Some(arc) = HOST_CACHE.get(host_str) {
// return Some(arc.clone()); // Only an atomic increment!
// }
//
// // Slow path: create new Arc and cache it
// let new_arc: Arc<str> = Arc::from(host_str);
// HOST_CACHE.insert(host_str.to_string(), new_arc.clone());
// Some(new_arc)
// }
fn return_header_host_from_upstream(session: &Session, ump_upst: &UpstreamsDashMap) -> Option<Arc<str>> {
let host_str = if session.is_http2() {
session.req_header().uri.host()?
} else { } else {
match session.req_header().headers.get("host") { let h = session.req_header().headers.get("host")?.to_str().ok()?;
Some(host) => { h.split_once(':').map_or(h, |(host, _)| host)
let header_host = host.to_str().unwrap().splitn(2, ':').collect::<Vec<&str>>(); };
Option::from(header_host[0].to_string()) ump_upst.get(host_str).map(|entry| entry.key().clone())
}
None => None,
}
}
} }

View File

@@ -27,7 +27,8 @@ pub fn run() {
let uf_config = Arc::new(DashMap::new()); let uf_config = Arc::new(DashMap::new());
let ff_config = Arc::new(DashMap::new()); let ff_config = Arc::new(DashMap::new());
let im_config = Arc::new(DashMap::new()); let im_config = Arc::new(DashMap::new());
let hh_config = Arc::new(DashMap::new()); let ch_config = Arc::new(DashMap::new());
let sh_config = Arc::new(DashMap::new());
let ec_config = Arc::new(ArcSwap::from_pointee(Extraparams { let ec_config = Arc::new(ArcSwap::from_pointee(Extraparams {
sticky_sessions: false, sticky_sessions: false,
@@ -43,7 +44,8 @@ pub fn run() {
ump_full: ff_config, ump_full: ff_config,
ump_byid: im_config, ump_byid: im_config,
config: cfg.clone(), config: cfg.clone(),
headers: hh_config, client_headers: ch_config,
server_headers: sh_config,
extraparams: ec_config, extraparams: ec_config,
}; };

View File

@@ -1,5 +1,6 @@
use crate::utils::discovery::APIUpstreamProvider; use crate::utils::discovery::APIUpstreamProvider;
use crate::utils::structs::Configuration; use crate::utils::structs::{Config, Configuration, UpstreamsDashMap};
use crate::utils::tools::{upstreams_liveness_json, upstreams_to_json};
use axum::body::Body; use axum::body::Body;
use axum::extract::{Query, State}; use axum::extract::{Query, State};
use axum::http::{Response, StatusCode}; use axum::http::{Response, StatusCode};
@@ -15,6 +16,7 @@ use prometheus::{gather, Encoder, TextEncoder};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use std::collections::HashMap; use std::collections::HashMap;
use std::net::SocketAddr; use std::net::SocketAddr;
use std::sync::Arc;
use std::time::{Duration, SystemTime, UNIX_EPOCH}; use std::time::{Duration, SystemTime, UNIX_EPOCH};
use tokio::net::TcpListener; use tokio::net::TcpListener;
use tower_http::services::ServeDir; use tower_http::services::ServeDir;
@@ -36,16 +38,19 @@ struct AppState {
master_key: String, master_key: String,
config_sender: Sender<Configuration>, config_sender: Sender<Configuration>,
config_api_enabled: bool, config_api_enabled: bool,
current_upstreams: Arc<UpstreamsDashMap>,
full_upstreams: Arc<UpstreamsDashMap>,
} }
#[allow(unused_mut)] #[allow(unused_mut)]
pub async fn run_server(config: &APIUpstreamProvider, mut to_return: Sender<Configuration>) { pub async fn run_server(config: &APIUpstreamProvider, mut to_return: Sender<Configuration>, upstreams_curr: Arc<UpstreamsDashMap>, upstreams_full: Arc<UpstreamsDashMap>) {
let app_state = AppState { let app_state = AppState {
master_key: config.masterkey.clone(), master_key: config.masterkey.clone(),
config_sender: to_return.clone(), config_sender: to_return.clone(),
config_api_enabled: config.config_api_enabled.clone(), config_api_enabled: config.config_api_enabled.clone(),
current_upstreams: upstreams_curr,
full_upstreams: upstreams_full,
}; };
let app = Router::new() let app = Router::new()
// .route("/{*wildcard}", get(senderror)) // .route("/{*wildcard}", get(senderror))
// .route("/{*wildcard}", post(senderror)) // .route("/{*wildcard}", post(senderror))
@@ -56,6 +61,7 @@ pub async fn run_server(config: &APIUpstreamProvider, mut to_return: Sender<Conf
.route("/jwt", post(jwt_gen)) .route("/jwt", post(jwt_gen))
.route("/conf", post(conf)) .route("/conf", post(conf))
.route("/metrics", get(metrics)) .route("/metrics", get(metrics))
.route("/status", get(status))
.with_state(app_state); .with_state(app_state);
if let Some(value) = &config.tls_address { if let Some(value) = &config.tls_address {
@@ -82,27 +88,41 @@ pub async fn run_server(config: &APIUpstreamProvider, mut to_return: Sender<Conf
axum::serve(listener, app).await.unwrap(); axum::serve(listener, app).await.unwrap();
} }
async fn conf(State(mut st): State<AppState>, Query(params): Query<HashMap<String, String>>, content: String) -> impl IntoResponse { async fn conf(State(st): State<AppState>, Query(params): Query<HashMap<String, String>>, content: String) -> impl IntoResponse {
if !st.config_api_enabled { if !st.config_api_enabled {
return Response::builder() return Response::builder().status(StatusCode::FORBIDDEN).body(Body::from("Config API is disabled !\n")).unwrap();
.status(StatusCode::FORBIDDEN)
.body(Body::from("Config remote API is disabled !\n"))
.unwrap();
} }
if let Some(s) = params.get("key") { if let Some(s) = params.get("key") {
if s.to_owned() == st.master_key { if s.to_owned() == st.master_key {
if let Some(serverlist) = crate::utils::parceyaml::load_configuration(content.as_str(), "content").await { let strcontent = content.as_str();
st.config_sender.send(serverlist).await.unwrap(); let parsed = serde_yaml::from_str::<Config>(strcontent);
return Response::builder().status(StatusCode::OK).body(Body::from("Config, conf file, updated !\n")).unwrap(); match parsed {
} else { Ok(_) => {
return Response::builder().status(StatusCode::BAD_GATEWAY).body(Body::from("Failed to parse config!\n")).unwrap(); if let Some(s) = params.get("key") {
}; if s.to_owned() == st.master_key {
let _ = tokio::spawn(async move { apply_config(content.as_str(), st).await });
return Response::builder().status(StatusCode::OK).body(Body::from("Accepted! Applying in background\n")).unwrap();
}
}
return Response::builder().status(StatusCode::FORBIDDEN).body(Body::from("Access Denied !\n")).unwrap();
}
Err(err) => {
error!("Failed to parse upstreams file: {}", err);
return Response::builder().status(StatusCode::BAD_GATEWAY).body(Body::from(format!("Failed: {}\n", err))).unwrap();
}
}
} }
} }
Response::builder().status(StatusCode::FORBIDDEN).body(Body::from("Access Denied !\n")).unwrap() Response::builder().status(StatusCode::FORBIDDEN).body(Body::from("Access Denied !\n")).unwrap()
} }
async fn apply_config(content: &str, mut st: AppState) {
let sl = crate::utils::parceyaml::load_configuration(content, "content").await;
if let Some(serverlist) = sl.0 {
let _ = st.config_sender.send(serverlist).await;
}
}
async fn jwt_gen(State(state): State<AppState>, Json(payload): Json<InputKey>) -> (StatusCode, Json<OutToken>) { async fn jwt_gen(State(state): State<AppState>, Json(payload): Json<InputKey>) -> (StatusCode, Json<OutToken>) {
if payload.master_key == state.master_key { if payload.master_key == state.master_key {
let now = SystemTime::now() + Duration::from_secs(payload.valid * 60); let now = SystemTime::now() + Duration::from_secs(payload.valid * 60);
@@ -132,7 +152,6 @@ async fn jwt_gen(State(state): State<AppState>, Json(payload): Json<InputKey>) -
async fn metrics() -> impl IntoResponse { async fn metrics() -> impl IntoResponse {
let metric_families = gather(); let metric_families = gather();
let encoder = TextEncoder::new(); let encoder = TextEncoder::new();
let mut buffer = Vec::new(); let mut buffer = Vec::new();
if let Err(e) = encoder.encode(&metric_families, &mut buffer) { if let Err(e) = encoder.encode(&metric_families, &mut buffer) {
// encoding error fallback // encoding error fallback
@@ -141,7 +160,6 @@ async fn metrics() -> impl IntoResponse {
.body(Body::from(format!("Failed to encode metrics: {}", e))) .body(Body::from(format!("Failed to encode metrics: {}", e)))
.unwrap(); .unwrap();
} }
Response::builder() Response::builder()
.status(StatusCode::OK) .status(StatusCode::OK)
.header("Content-Type", encoder.format_type()) .header("Content-Type", encoder.format_type())
@@ -149,7 +167,35 @@ async fn metrics() -> impl IntoResponse {
.unwrap() .unwrap()
} }
// #[allow(dead_code)] async fn status(State(st): State<AppState>, Query(params): Query<HashMap<String, String>>) -> impl IntoResponse {
// async fn senderror() -> impl IntoResponse { if let Some(_) = params.get("live") {
// Response::builder().status(StatusCode::BAD_GATEWAY).body(Body::from("No live upstream found!\n")).unwrap() let r = upstreams_liveness_json(&st.full_upstreams, &st.current_upstreams);
// } return Response::builder()
.status(StatusCode::OK)
.header("Content-Type", "application/json")
.body(Body::from(format!("{}", r)))
.unwrap();
}
if let Some(_) = params.get("all") {
let resp = upstreams_to_json(&st.current_upstreams);
match resp {
Ok(j) => {
return Response::builder()
.status(StatusCode::OK)
.header("Content-Type", "application/json")
.body(Body::from(j))
.unwrap()
}
Err(e) => {
return Response::builder()
.status(StatusCode::INTERNAL_SERVER_ERROR)
.body(Body::from(format!("Failed to get status: {}", e)))
.unwrap();
}
}
}
Response::builder()
.status(StatusCode::INTERNAL_SERVER_ERROR)
.body(Body::from(format!("Parameter mismatch")))
.unwrap()
}