From 269d14aaa257bf56580fa9e3e5031444a3397ada Mon Sep 17 00:00:00 2001 From: Ara Sadoyan Date: Thu, 24 Apr 2025 14:10:13 +0200 Subject: [PATCH] Added gRPC support --- Cargo.lock | 87 ++++++++++++++++++++++++++++++++++++++++ Cargo.toml | 2 + README.md | 8 ++-- etc/upstreams.yaml | 40 ++++++++++-------- src/utils/healthcheck.rs | 70 +++++++++++++++++--------------- src/web/proxyhttp.rs | 9 ++++- start.sh | 4 -- 7 files changed, 162 insertions(+), 58 deletions(-) delete mode 100755 start.sh diff --git a/Cargo.lock b/Cargo.lock index 322f192..5291bbb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -177,6 +177,7 @@ dependencies = [ "serde", "serde_yaml", "tokio", + "tonic", ] [[package]] @@ -966,6 +967,19 @@ dependencies = [ "tower-service", ] +[[package]] +name = "hyper-timeout" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2b90d566bffbce6a75bd8b09a05aa8c2cb1fabb6cb348f8840c9e4c90a0d83b0" +dependencies = [ + "hyper", + "hyper-util", + "pin-project-lite", + "tokio", + "tower-service", +] + [[package]] name = "hyper-tls" version = "0.6.0" @@ -1614,6 +1628,26 @@ version = "2.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e3148f5046208a5d56bcfc03053e3ca6334e51da8dfb19b6cdc8b306fae3283e" +[[package]] +name = "pin-project" +version = "1.1.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "677f1add503faace112b9f1373e43e9e054bfdd22ff1a63c1bc485eaec6a6a8a" +dependencies = [ + "pin-project-internal", +] + +[[package]] +name = "pin-project-internal" +version = "1.1.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6e918e4ff8c4549eb882f14b3a4bc8c8bc93de829416eacf579f1207a8fbf861" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.100", +] + [[package]] name = "pin-project-lite" version = "0.2.16" @@ -1962,6 +1996,15 @@ dependencies = [ "thiserror 1.0.69", ] +[[package]] +name = "prost" +version = "0.13.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2796faa41db3ec313a31f7624d9286acf277b52de526150b7e69f3debf891ee5" +dependencies = [ + "bytes", +] + [[package]] name = "protobuf" version = "2.28.0" @@ -2768,6 +2811,35 @@ dependencies = [ "tokio", ] +[[package]] +name = "tonic" +version = "0.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "85839f0b32fd242bb3209262371d07feda6d780d16ee9d2bc88581b89da1549b" +dependencies = [ + "async-trait", + "axum", + "base64", + "bytes", + "h2", + "http", + "http-body", + "http-body-util", + "hyper", + "hyper-timeout", + "hyper-util", + "percent-encoding", + "pin-project", + "prost", + "socket2", + "tokio", + "tokio-stream", + "tower", + "tower-layer", + "tower-service", + "tracing", +] + [[package]] name = "tower" version = "0.5.2" @@ -2776,9 +2848,12 @@ checksum = "d039ad9159c98b70ecfd540b2573b97f7f52c3e8d9f8ad57a24b916a536975f9" dependencies = [ "futures-core", "futures-util", + "indexmap 2.7.1", "pin-project-lite", + "slab", "sync_wrapper", "tokio", + "tokio-util", "tower-layer", "tower-service", "tracing", @@ -2804,9 +2879,21 @@ checksum = "784e0ac535deb450455cbfa28a6f0df145ea1bb7ae51b821cf5e7927fdcfbdd0" dependencies = [ "log", "pin-project-lite", + "tracing-attributes", "tracing-core", ] +[[package]] +name = "tracing-attributes" +version = "0.1.28" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "395ae124c09f9e6918a2310af6038fba074bcf474ac352496d5910dd59a2226d" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.100", +] + [[package]] name = "tracing-core" version = "0.1.33" diff --git a/Cargo.toml b/Cargo.toml index 89796af..327389a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -25,5 +25,7 @@ base64 = "0.22.1" jsonwebtoken = "9.3.1" #hmac = "0.12.1" #sha2 = "0.10.8" +tonic = "0.13.0" + diff --git a/README.md b/README.md index 514f61a..0f2d189 100644 --- a/README.md +++ b/README.md @@ -16,7 +16,7 @@ Built on Rust, on top of **Cloudflareโ€™s Pingora engine**, **Gazan** delivers w - โš™๏ธ **Upstream Providers:** Supports `file`-based static upstreams, dynamic service discovery via `Consul`, and upcoming `Kubernetes` integration - ๐Ÿ” **Hot Reloading:** Modify upstreams on the fly via `upstreams.yaml` โ€” no restart needed - ๐Ÿ”ฎ **Automatic WebSocket Support:** No special config required โ€” connection upgrades are handled seamlessly -- ๐Ÿ”ฎ **Upcoming Automatic GRPC Support:** Zero config for GRPC upstreams and downstreams +- ๐Ÿ”ฎ **Automatic GRPC Support:** Zero config for GRPC upstreams and downstreams - ๐Ÿ” **TLS Termination:** Fully supports TLS for incoming and upstream traffic - ๐Ÿ›ก๏ธ **Built-in Auth Support:** - ๐Ÿง  **CORS & Header Injection:** Global and per-route header configuration @@ -210,6 +210,6 @@ curl -u username:password -H 'Host: myip.mydomain.com' http://127.0.0.1:6193/ - Uses Pingora under the hood for efficiency and flexibility. - Designed for edge proxying, internal routing, or hybrid cloud scenarios. - Transparent, fully automatic WebSocket upgrade support. -- Upcoming transparent, fully automatic GRPC proxy. -- Upcoming Kubernetes integration -- HTTP2 ready. \ No newline at end of file +- Transparent, fully automatic gRPC proxy. +- HTTP2 ready. +- Upcoming Kubernetes integration \ No newline at end of file diff --git a/etc/upstreams.yaml b/etc/upstreams.yaml index 2d420d8..2b5a4a3 100644 --- a/etc/upstreams.yaml +++ b/etc/upstreams.yaml @@ -6,18 +6,18 @@ globals: - "Access-Control-Allow-Methods:POST, GET, OPTIONS" - "Access-Control-Max-Age:86400" - "X-Custom-Header:Something Special" - authorization: # Optional, only one of auth methods below can be active at a time - # - "basic" - # - "zangag:Anhnazand1234" - # - "apikey" - # - "5a28cc4c-ce10-4ff1-824e-743c38835f5c" - - "jwt" - - "910517d9-f9a1-48de-8826-dbadacbd84af-cb6f830e-ab16-47ec-9d8f-0090de732774" +# authorization: # Optional, only one of auth methods below can be active at a time +# - "basic" +# - "zangag:Anhnazand1234" +# - "apikey" +# - "5a28cc4c-ce10-4ff1-824e-743c38835f5c" +# - "jwt" +# - "910517d9-f9a1-48de-8826-dbadacbd84af-cb6f830e-ab16-47ec-9d8f-0090de732774" consul: # If the provider is consul. Otherwise ignored servers: - "http://master1:8500" - "http://192.168.22.1:8500" - - "http://master2:8500" + - "http://master1.digitai.local:8500" services: # proxy: The hostname to access proxy server, real : The real service name in Consul - proxy: "proxy-frontend-dev-frontend-srv" real: "frontend-dev-frontend-srv" @@ -27,13 +27,13 @@ consul: # If the provider is consul. Otherwise ignored real: "backoffice-dev-backoffice-srv" token: "8e2db809-845b-45e1-8b47-2c8356a09da0-a4370955-18c2-4d6e-a8f8-ffcc0b47be81" # Consul server access token, If Consul auth is enabled upstreams: # If provider is files. Otherwise ignored - myip.mydomain.com: # Hostname, or header host to access the upstream + myip.netangels.net: # Hostname, or header host to access the upstream paths: # URL path(s) for current upstream, closest match wins "/": ssl: false # If upstream is SSL enabled headers: # Custom headers, set only for this Host and Path - "X-Proxy-From:Gazan" - servers: # List of upstreams HOST:PORT + servers: # List of upstreams HOST:PORT - "127.0.0.1:8000" - "127.0.0.2:8000" - "127.0.0.3:8000" @@ -50,7 +50,7 @@ upstreams: # If provider is files. Otherwise ignored ssl: false servers: - "192.168.1.1:8000" - polo.mydomain.com: + polo.netangels.net: paths: "/": ssl: false @@ -58,7 +58,7 @@ upstreams: # If provider is files. Otherwise ignored - "X-Some-Thing:Yaaaaaaaaaaaaaaa" servers: - "192.168.1.10:8000" - glop.mydomain.com: + glop.netangels.net: paths: "/": ssl: false @@ -67,18 +67,18 @@ upstreams: # If provider is files. Otherwise ignored servers: - "192.168.1.10:8000" - "192.168.1.1:8000" - apt.mydomain.com: + apt.netangels.net: paths: "/": ssl: true servers: - - "apt.mydomain.com:443" - test.mydomain.com: + - "apt.netangels.net:443" + test.netangels.net: paths: "/": ssl: false servers: - - "myip.mydomain.com:80" + - "myip.netangels.net:80" 127.0.0.1: paths: "/": @@ -90,4 +90,10 @@ upstreams: # If provider is files. Otherwise ignored "/": ssl: false servers: - - "10.0.55.171:3000" \ No newline at end of file + - "10.0.55.171:3000" + localpost: + paths: + "/": + ssl: false + servers: + - "127.0.0.1:9000" \ No newline at end of file diff --git a/src/utils/healthcheck.rs b/src/utils/healthcheck.rs index 074766b..f646787 100644 --- a/src/utils/healthcheck.rs +++ b/src/utils/healthcheck.rs @@ -1,10 +1,12 @@ use crate::utils::tools::*; use dashmap::DashMap; use log::{error, warn}; +use reqwest::Client; use std::sync::atomic::AtomicUsize; use std::sync::Arc; use std::time::Duration; use tokio::time::interval; +use tonic::transport::Endpoint; pub async fn hc2(upslist: Arc, fullist: Arc, params: (&str, u64)) { let mut period = interval(Duration::from_secs(params.1)); @@ -43,7 +45,6 @@ pub async fn hc2(upslist: Arc, fullist: Arc, totest.insert(host.clone(), inner); } if ! compare_dashmaps(&totest, &upslist){ - // print_upstreams(&totest); clone_dashmap_into(&totest, &upslist); } } @@ -53,39 +54,44 @@ pub async fn hc2(upslist: Arc, fullist: Arc, #[allow(dead_code)] async fn http_request(url: &str, method: &str, payload: &str) -> bool { - let client = reqwest::Client::builder().danger_accept_invalid_certs(true).build().unwrap(); - let to = Duration::from_secs(1); - match method { - "POST" => { - let response = client.post(url).body(payload.to_owned()).timeout(to).send().await; - match response { - Ok(r) => 100 <= r.status().as_u16() && r.status().as_u16() < 500, - Err(_) => false, - } + let client = Client::builder().danger_accept_invalid_certs(true).build().unwrap(); + let timeout = Duration::from_secs(1); + if !["POST", "GET", "HEAD"].contains(&method) { + error!("Method {} not supported. Only GET|POST|HEAD are supported ", method); + return false; + } + async fn send_request(client: &Client, method: &str, url: &str, payload: &str, timeout: Duration) -> Option { + match method { + "POST" => client.post(url).body(payload.to_owned()).timeout(timeout).send().await.ok(), + "GET" => client.get(url).timeout(timeout).send().await.ok(), + "HEAD" => client.head(url).timeout(timeout).send().await.ok(), + _ => None, } - "GET" => { - let response = client.get(url).timeout(to).send().await; - match response { - Ok(r) => { - // println!("Response: {} : {}", r.status(), r.url()); - 100 <= r.status().as_u16() && r.status().as_u16() < 500 - } - Err(_) => { - // println!("Error: {}", url); - false - } - } + } + + match send_request(&client, method, url, payload, timeout).await { + Some(response) => { + let status = response.status().as_u16(); + (99..499).contains(&status) } - "HEAD" => { - let response = client.head(url).timeout(to).send().await; - match response { - Ok(r) => 100 <= r.status().as_u16() && r.status().as_u16() < 500, - Err(_) => false, - } - } - _ => { - error!("Method {} not supported. Only GET|POST|HEAD are supported", method); - false + None => { + let fallback_url = url.replace("https", "http"); + ping_grpc(&fallback_url).await } } } + +pub async fn ping_grpc(addr: &str) -> bool { + let endpoint_result = Endpoint::from_shared(addr.to_owned()); + + if let Ok(endpoint) = endpoint_result { + let endpoint = endpoint.timeout(Duration::from_secs(2)); + + match tokio::time::timeout(Duration::from_secs(3), endpoint.connect()).await { + Ok(Ok(_channel)) => true, + _ => false, + } + } else { + false + } +} diff --git a/src/web/proxyhttp.rs b/src/web/proxyhttp.rs index cfca04b..eeb2d26 100644 --- a/src/web/proxyhttp.rs +++ b/src/web/proxyhttp.rs @@ -15,6 +15,7 @@ use pingora_http::ResponseHeader; use crate::utils::auth::authenticate; use crate::utils::parceyaml::Configuration; +use pingora_core::listeners::ALPN; use pingora_proxy::{ProxyHttp, Session}; use std::ops::Deref; use std::sync::atomic::Ordering; @@ -216,15 +217,21 @@ impl ProxyHttp for LB { type CTX = (); fn new_ctx(&self) -> Self::CTX {} async fn upstream_peer(&self, session: &mut Session, _ctx: &mut Self::CTX) -> Result> { + // println!("upstream_peer called for {:?}", session.req_header()); + // println!("=============================================="); let host_name = return_header_host(&session); match host_name { Some(host) => { // session.req_header_mut().headers.insert("X-Host-Name", host.to_string().parse().unwrap()); let ddr = self.get_host(host, host, session.is_upgrade_req()); + match ddr.await { Some((host, port, ssl)) => { - let peer = Box::new(HttpPeer::new((host, port), ssl, String::new())); + let mut peer = Box::new(HttpPeer::new((host, port), ssl, String::new())); + if session.is_http2() { + peer.options.alpn = ALPN::H2; + } Ok(peer) } None => { diff --git a/start.sh b/start.sh deleted file mode 100755 index b677e9d..0000000 --- a/start.sh +++ /dev/null @@ -1,4 +0,0 @@ -#!/bin/bash -#export RUST_LOG=INFO -#reflex -d none -r 'src/' -s -- sh -c 'reset && cargo run -- --address 0.0.0.0 --port 6193' -reflex -d none -r 'src/' -s -- sh -c 'reset && cargo run -- -c etc/main.yaml'