feat: HTTP-over-rendezvous fallback (HttpProxyRequest)
Closes the M4 plan. When `OPTION_USE_RAW_TCP_FOR_API=Y` (typical in locked-down networks where direct HTTPS to port 21114 is blocked), the client wraps every /api/* request in an HttpProxyRequest protobuf and ships it over the already-encrypted rendezvous TCP channel. We now decode those messages on hbbs and dispatch them through the *same* axum Router the HTTPS listener uses — so every existing handler (login, AB, audit, TOTP, OIDC, devices/cli, plugin-sign, …) is reachable through this path with zero per-route plumbing. Components ========== - libs/hbb_common (submodule, pro-features-httpproxy branch): backports HeaderEntry / HttpProxyRequest / HttpProxyResponse + union tags 27/28 from upstream @87b11a7 onto our pinned @83419b6. Proto-only — the rest of hbb_common is unchanged so we keep the tokio 1.x / axum 0.5 / pinned reqwest fork intact (a full submodule bump risked breaking those). - src/api/http_proxy.rs: the dispatch shim. Holds a `Mutex<Option<Router>>` populated by `api::serve` before the HTTPS listener starts, builds an `http::Request<Body>` from the proto fields (sanitizing hop-by-hop headers, defaulting Content-Type: application/json), runs it through `router.oneshot(req)`, and serializes the response into HttpProxyResponse. Tower added as a direct dep with the `util` feature for ServiceExt. - src/api/mod.rs: pub mod http_proxy; install_router(app.clone()) before axum::Server::bind to share the router. - src/rendezvous_server.rs::handle_tcp: new match arm right before the catch-all that decodes HttpProxyRequest and replies with an HttpProxyResponse via the existing Sink::TcpStream(..., Encrypt) path. The reply is automatically secretbox-sealed by `send_to_sink`, so the end-to-end channel is encrypted symmetrically with secure_tcp. - examples/http_proxy_test.rs: end-to-end smoke test that opens a TCP connection, walks the secure_tcp handshake by hand (read server's signed box pubkey, derive symmetric key, send sealed reply), then ships an HttpProxyRequest GET /api/login-options and verifies the response is 200 + ["account"]. Used as the validation gate. New crate deps ============== - tower = "0.4" (features = ["util"]) — for ServiceExt::oneshot - http-body = "0.4" — for the Body trait import in dispatch Verification ============ 1. cargo build --release — clean. 2. examples/http_proxy_test against a fresh hbbs: [ok] secure_tcp handshake complete [ok] sent HttpProxyRequest GET /api/login-options [ok] response status = 200 [ok] response body = ["account"] [pass] full HTTP-over-rendezvous round trip verified 3. hbbs log confirms the secure_tcp handshake completed and the dispatch went through the standard axum router. Notes on cherry-pick vs submodule bump ====================================== The plan flagged the bump as the riskiest M4 item because newer hbb_common pulls newer tokio that breaks axum 0.5. The proto-only cherry pick keeps everything stable; the upstream-divergence cost is one extra commit in the hbb_common submodule that we own. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -0,0 +1,162 @@
|
||||
//! TCP-over-rendezvous HTTP fallback. The client wraps any `/api/*` request
|
||||
//! in an `HttpProxyRequest` protobuf and ships it over the rendezvous TCP
|
||||
//! connection (already encrypted via secure_tcp) when
|
||||
//! `OPTION_USE_RAW_TCP_FOR_API=Y`. We dispatch the wrapped request through
|
||||
//! the **same** axum `Router` the HTTPS listener uses, so every existing
|
||||
//! handler — auth, AB, audit, OIDC, … — is reachable through this path
|
||||
//! with zero per-route plumbing.
|
||||
|
||||
use axum::body::Body;
|
||||
use axum::Router;
|
||||
use hbb_common::log;
|
||||
use hbb_common::rendezvous_proto::{HeaderEntry, HttpProxyRequest, HttpProxyResponse};
|
||||
use http::header::{HeaderMap, HeaderName, HeaderValue};
|
||||
use http::{Method, Request};
|
||||
use once_cell::sync::Lazy;
|
||||
use std::convert::TryFrom;
|
||||
use std::sync::Mutex;
|
||||
use tower::ServiceExt;
|
||||
|
||||
/// Shared router. Populated by [`api::serve`] before the HTTPS listener
|
||||
/// starts, so that the rendezvous TCP path can reach the same handlers.
|
||||
/// `Mutex` because `Router` isn't `Sync` even though it is `Send + Clone`;
|
||||
/// we never hold the lock across an await — we clone out, drop the guard,
|
||||
/// and call `oneshot` on the clone.
|
||||
static ROUTER: Lazy<Mutex<Option<Router>>> = Lazy::new(|| Mutex::new(None));
|
||||
|
||||
pub fn install_router(r: Router) {
|
||||
*ROUTER.lock().unwrap() = Some(r);
|
||||
}
|
||||
|
||||
pub async fn dispatch(req: HttpProxyRequest) -> HttpProxyResponse {
|
||||
let router = match ROUTER.lock().unwrap().as_ref() {
|
||||
Some(r) => r.clone(),
|
||||
None => return error_response(503, "router not initialized"),
|
||||
};
|
||||
|
||||
let http_req = match build_request(&req) {
|
||||
Ok(r) => r,
|
||||
Err(msg) => return error_response(400, &msg),
|
||||
};
|
||||
|
||||
let response = match router.oneshot(http_req).await {
|
||||
Ok(r) => r,
|
||||
Err(e) => {
|
||||
log::warn!("http_proxy: router error: {}", e);
|
||||
return error_response(500, &format!("router: {}", e));
|
||||
}
|
||||
};
|
||||
|
||||
let status = response.status().as_u16() as i32;
|
||||
let headers = serialize_headers(response.headers());
|
||||
let body = match collect_body(response.into_body()).await {
|
||||
Ok(b) => b,
|
||||
Err(msg) => return error_response(500, &msg),
|
||||
};
|
||||
|
||||
HttpProxyResponse {
|
||||
status,
|
||||
headers,
|
||||
body: body.into(),
|
||||
error: String::new(),
|
||||
..Default::default()
|
||||
}
|
||||
}
|
||||
|
||||
fn build_request(req: &HttpProxyRequest) -> Result<Request<Body>, String> {
|
||||
let method = if req.method.is_empty() {
|
||||
Method::GET
|
||||
} else {
|
||||
Method::try_from(req.method.as_bytes())
|
||||
.map_err(|e| format!("invalid method {:?}: {}", req.method, e))?
|
||||
};
|
||||
let uri = if req.path.is_empty() {
|
||||
"/".to_string()
|
||||
} else if req.path.starts_with('/') {
|
||||
req.path.clone()
|
||||
} else {
|
||||
format!("/{}", req.path)
|
||||
};
|
||||
let body_bytes: Vec<u8> = req.body.to_vec();
|
||||
let mut builder = Request::builder().method(method).uri(uri);
|
||||
let headers_map = builder
|
||||
.headers_mut()
|
||||
.ok_or_else(|| "request builder produced no headers map".to_string())?;
|
||||
let mut saw_content_type = false;
|
||||
for h in &req.headers {
|
||||
if h.name.is_empty() {
|
||||
continue;
|
||||
}
|
||||
let lower = h.name.to_ascii_lowercase();
|
||||
// Drop hop-by-hop / framing headers we'll set ourselves to match
|
||||
// the actual body length axum sees.
|
||||
if matches!(
|
||||
lower.as_str(),
|
||||
"host" | "content-length" | "connection" | "transfer-encoding"
|
||||
) {
|
||||
continue;
|
||||
}
|
||||
if lower == "content-type" {
|
||||
saw_content_type = true;
|
||||
}
|
||||
let name = HeaderName::try_from(h.name.as_bytes())
|
||||
.map_err(|e| format!("bad header name {:?}: {}", h.name, e))?;
|
||||
let value = HeaderValue::try_from(h.value.as_bytes())
|
||||
.map_err(|e| format!("bad header value for {:?}: {}", h.name, e))?;
|
||||
headers_map.append(name, value);
|
||||
}
|
||||
// Default to JSON if the client forgot — every /api/* handler expects
|
||||
// JSON unless the route reads `body` as raw bytes (only /api/record),
|
||||
// which doesn't care about content-type.
|
||||
if !saw_content_type {
|
||||
headers_map.insert(
|
||||
HeaderName::from_static("content-type"),
|
||||
HeaderValue::from_static("application/json"),
|
||||
);
|
||||
}
|
||||
builder
|
||||
.body(Body::from(body_bytes))
|
||||
.map_err(|e| format!("build request: {}", e))
|
||||
}
|
||||
|
||||
fn serialize_headers(map: &HeaderMap) -> Vec<HeaderEntry> {
|
||||
map.iter()
|
||||
.map(|(k, v)| HeaderEntry {
|
||||
name: k.as_str().to_string(),
|
||||
value: v.to_str().unwrap_or("").to_string(),
|
||||
..Default::default()
|
||||
})
|
||||
.collect()
|
||||
}
|
||||
|
||||
/// Collect any `http_body::Body` whose chunks are buffer-like into a `Vec<u8>`.
|
||||
/// Works against both the request `Body` we build and axum's
|
||||
/// `UnsyncBoxBody<Bytes, axum::Error>` response body.
|
||||
async fn collect_body<B>(mut body: B) -> Result<Vec<u8>, String>
|
||||
where
|
||||
B: http_body::Body + Unpin,
|
||||
B::Data: hbb_common::bytes::Buf,
|
||||
B::Error: std::fmt::Display,
|
||||
{
|
||||
use hbb_common::bytes::Buf;
|
||||
let mut buf = Vec::new();
|
||||
while let Some(chunk) = body.data().await {
|
||||
let mut chunk = chunk.map_err(|e| format!("body read: {}", e))?;
|
||||
while chunk.has_remaining() {
|
||||
let s = chunk.chunk();
|
||||
buf.extend_from_slice(s);
|
||||
let n = s.len();
|
||||
chunk.advance(n);
|
||||
}
|
||||
}
|
||||
Ok(buf)
|
||||
}
|
||||
|
||||
fn error_response(status: i32, msg: &str) -> HttpProxyResponse {
|
||||
HttpProxyResponse {
|
||||
status,
|
||||
body: msg.as_bytes().to_vec().into(),
|
||||
error: msg.to_string(),
|
||||
..Default::default()
|
||||
}
|
||||
}
|
||||
+6
-1
@@ -11,6 +11,7 @@ pub mod email;
|
||||
pub mod error;
|
||||
pub mod groups;
|
||||
pub mod heartbeat;
|
||||
pub mod http_proxy;
|
||||
pub mod middleware;
|
||||
pub mod oidc;
|
||||
pub mod pagination;
|
||||
@@ -96,8 +97,12 @@ pub fn router(state: Arc<AppState>) -> Router {
|
||||
|
||||
pub async fn serve(addr: SocketAddr, state: Arc<AppState>) -> ResultType<()> {
|
||||
log::info!("HTTP API listening on {}", addr);
|
||||
let app = router(state);
|
||||
// Share the same router with the rendezvous-TCP HttpProxyRequest path so
|
||||
// both transports route through the exact same handlers.
|
||||
http_proxy::install_router(app.clone());
|
||||
axum::Server::bind(&addr)
|
||||
.serve(router(state).into_make_service())
|
||||
.serve(app.into_make_service())
|
||||
.await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -626,6 +626,16 @@ impl RendezvousServer {
|
||||
});
|
||||
Self::send_to_sink(sink, msg_out).await;
|
||||
}
|
||||
// M4: HTTP-over-rendezvous fallback. The client uses this when
|
||||
// OPTION_USE_RAW_TCP_FOR_API=Y (locked-down networks where
|
||||
// direct HTTPS is blocked). We dispatch the wrapped request
|
||||
// through the SAME axum router as the HTTP listener.
|
||||
Some(rendezvous_message::Union::HttpProxyRequest(req)) => {
|
||||
let resp = crate::api::http_proxy::dispatch(req).await;
|
||||
let mut msg_out = RendezvousMessage::new();
|
||||
msg_out.set_http_proxy_response(resp);
|
||||
Self::send_to_sink(sink, msg_out).await;
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user