8 Commits

Author SHA1 Message Date
mike 95e51842ae release: bump to 0.1.8 (fix invalid edition)
build-windows / build-hello-agent-x64 (push) Successful in 6m19s
build-windows / sign-hello-agent-x64 (push) Successful in 5s
build-windows / validate-hello-agent-x64 (push) Successful in 9s
Bump version for the authoritative-strategy + session-notification changes.
edition was set to an invalid "2026"; restore to 2021 (rust-version 1.75
predates edition 2024).

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-16 10:59:09 +02:00
mike faf3b1303b strategy: make pushed config authoritative
handle_config_options only ever inserted/overwrote the keys a strategy
sent, so removing a key (or unassigning a strategy) on the server left the
old value lingering on the device.

Persist the set of keys applied on the previous push (strategy_managed_keys
in LocalConfig) and, on each apply, reset any key the server managed before
but no longer sends back to its default. Keys the server never managed (the
user's own local settings) are left untouched, so this remains a managed
overlay rather than a wipe.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-16 08:54:12 +00:00
mike 31d5a881b6 cm: only notify the user once all sessions have closed
remove_connection fired the "session ended" banner for every connection
that closed. With multiple concurrent sessions (a second supporter, or a
file-transfer session running alongside remote control) the user saw one
banner per close even though support was still ongoing.

Track the remaining approved-session count under the same lock and show the
banner only when the last session has ended. Denied/never-approved
connections still produce no banner.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-16 08:54:12 +00:00
mike 14411987e7 sysinfo: advertise Ed25519 public key for opsbase TOFU enrollment
build-windows / build-hello-agent-x64 (push) Successful in 5m19s
build-windows / sign-hello-agent-x64 (push) Successful in 8s
build-windows / validate-hello-agent-x64 (push) Successful in 11s
Include the agent's base64 Ed25519 public key in the `pk` field of the
sysinfo upload. opsbase (acting as the agent's api-server) has no rendezvous
server to learn the key from, so it pins this key trust-on-first-use on first
contact and verifies every later signed request against it.

This is the same keypair sign.rs already signs requests with. Vanilla
rustdesk servers ignore the unknown field, so the change is backward
compatible.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-15 19:23:58 +00:00
mike 8de2ebea85 Implement performance monitor
build-windows / build-hello-agent-x64 (push) Successful in 5m0s
build-windows / sign-hello-agent-x64 (push) Successful in 5s
build-windows / validate-hello-agent-x64 (push) Successful in 7s
2026-05-22 21:52:13 +02:00
mike f868efa432 Implement user login logging
build-windows / build-hello-agent-x64 (push) Successful in 4m56s
build-windows / sign-hello-agent-x64 (push) Successful in 5s
build-windows / validate-hello-agent-x64 (push) Successful in 6s
2026-05-22 20:08:24 +02:00
mike 6bdf1058fa Implement remote execution
build-windows / build-hello-agent-x64 (push) Successful in 5m2s
build-windows / sign-hello-agent-x64 (push) Successful in 5s
build-windows / validate-hello-agent-x64 (push) Successful in 6s
2026-05-22 14:18:25 +02:00
mike 6807fe2bc0 Implement signed API communication to improve security
build-windows / build-hello-agent-x64 (push) Successful in 4m52s
build-windows / sign-hello-agent-x64 (push) Successful in 5s
build-windows / validate-hello-agent-x64 (push) Successful in 6s
2026-05-22 13:13:05 +02:00
17 changed files with 2067 additions and 22 deletions
Generated
+2 -1
View File
@@ -3197,13 +3197,14 @@ checksum = "2304e00983f87ffb38b55b444b5e3b60a884b5d30c0fca7d82fe33449bbe55ea"
[[package]] [[package]]
name = "hello-agent" name = "hello-agent"
version = "0.1.3" version = "0.1.8"
dependencies = [ dependencies = [
"anyhow", "anyhow",
"env_logger 0.10.2", "env_logger 0.10.2",
"hbb_common", "hbb_common",
"log", "log",
"rustdesk", "rustdesk",
"serde 1.0.228",
"serde_json 1.0.118", "serde_json 1.0.118",
"tokio", "tokio",
"winapi", "winapi",
+8 -2
View File
@@ -1,6 +1,6 @@
[package] [package]
name = "hello-agent" name = "hello-agent"
version = "0.1.3" version = "0.1.8"
edition = "2021" edition = "2021"
rust-version = "1.75" rust-version = "1.75"
description = "Headless RustDesk-protocol-compatible support agent for Windows" description = "Headless RustDesk-protocol-compatible support agent for Windows"
@@ -24,7 +24,7 @@ path = "src/main.rs"
librustdesk = { package = "rustdesk", path = "vendor/rustdesk", default-features = false, features = ["use_dasp", "hwcodec", "vram"] } librustdesk = { package = "rustdesk", path = "vendor/rustdesk", default-features = false, features = ["use_dasp", "hwcodec", "vram"] }
hbb_common = { path = "vendor/rustdesk/libs/hbb_common" } hbb_common = { path = "vendor/rustdesk/libs/hbb_common" }
tokio = { version = "1", features = ["rt-multi-thread", "macros", "sync", "time", "io-util"] } tokio = { version = "1", features = ["rt-multi-thread", "macros", "sync", "time", "io-util", "process"] }
log = "0.4" log = "0.4"
env_logger = "0.10" env_logger = "0.10"
anyhow = "1" anyhow = "1"
@@ -34,6 +34,12 @@ anyhow = "1"
# the inventory module's `use serde_json` doesn't depend on internal # the inventory module's `use serde_json` doesn't depend on internal
# implementation details of hbb_common. # implementation details of hbb_common.
serde_json = "1" serde_json = "1"
# `perf_events.rs` derives Deserialize on the PowerShell row schema.
# hbb_common re-exports `serde_derive` and `serde_json` but NOT `serde`
# itself — and `#[derive(Deserialize)]` expands to a path that references
# the `serde` crate root, so we depend on it explicitly with the `derive`
# feature.
serde = { version = "1", features = ["derive"] }
[target.'cfg(target_os = "windows")'.dependencies] [target.'cfg(target_os = "windows")'.dependencies]
windows-service = "0.6" windows-service = "0.6"
+86
View File
@@ -60,6 +60,18 @@ hello-agent.exe --server # user session, SYSTEM token
│ └─ stamps `agent_name` / `agent_version` / `inventory` │ └─ stamps `agent_name` / `agent_version` / `inventory`
│ into each /api/sysinfo payload (re-uploads when the │ into each /api/sysinfo payload (re-uploads when the
│ inventory collector below transitions empty → ready) │ inventory collector below transitions empty → ready)
│ └─ signs every request with the device's Ed25519 sk
│ (same key rendezvous registers via RegisterPk).
│ The server's first valid sig flips that peer to
│ `managed=1` and unsigned posts get 401 from then on.
│ Spec: rustdesk-server/docs/AGENT-API-AUTH.md
├── exec::run_loop (background thread)
│ └─ subscribes to sync.rs's EXEC_SENDER broadcast; for
│ each queued PowerShell command runs `powershell.exe
│ -NoProfile -NonInteractive -Command -`, captures
│ stdout+stderr with 1 MiB cap & 5-min timeout, POSTs
│ signed result to /api/agent/exec-result. Idle unless
│ an admin dispatches via the dashboard.
├── inventory::collect_inventory (background thread) ├── inventory::collect_inventory (background thread)
│ └─ PowerShell + WMI + wlanapi + ipify → `INVENTORY` global │ └─ PowerShell + WMI + wlanapi + ipify → `INVENTORY` global
│ consumed by hbbs_http::sync above; one-shot, no retry │ consumed by hbbs_http::sync above; one-shot, no retry
@@ -115,6 +127,15 @@ inventory — keep it in sync when adding new patches.
`--cm` process can plug a `MessageBoxW`-based `InvokeUiCM` into `--cm` process can plug a `MessageBoxW`-based `InvokeUiCM` into
upstream's connection-manager IPC loop and inherit file-transfer, upstream's connection-manager IPC loop and inherit file-transfer,
chat, and clipboard handling rather than re-implementing them. chat, and clipboard handling rather than re-implementing them.
* `mod hbbs_http``pub mod hbbs_http` so hello-agent's
`unattended_password::try_report` and `exec::run_loop` can reach
`librustdesk::hbbs_http::sign::build_signed_headers` and
`librustdesk::hbbs_http::sync::exec_signal_receiver`. Without
this the in-crate code can't sign / can't subscribe to the
server's queued PowerShell commands, and the build fails with
`E0603: module 'hbbs_http' is private`. Tightly coupled to the
**Signed agent API** and **Remote PowerShell exec** divergences
below.
2. **Build shape** — [`vendor/rustdesk/Cargo.toml`](vendor/rustdesk/Cargo.toml): 2. **Build shape** — [`vendor/rustdesk/Cargo.toml`](vendor/rustdesk/Cargo.toml):
`[lib] crate-type` reduced from `["cdylib", "staticlib", "rlib"]` to `[lib] crate-type` reduced from `["cdylib", "staticlib", "rlib"]` to
`["rlib"]`. We statically link the rlib into hello-agent.exe; the `["rlib"]`. We statically link the rlib into hello-agent.exe; the
@@ -148,6 +169,71 @@ inventory — keep it in sync when adding new patches.
[`src/main.rs`](vendor/rustdesk/src/main.rs) (`.author(...)`). [`src/main.rs`](vendor/rustdesk/src/main.rs) (`.author(...)`).
Cosmetic, but they show through in the Windows EXE metadata and Cosmetic, but they show through in the Windows EXE metadata and
in-app error dialogs. in-app error dialogs.
6. **Signed agent API** — every `POST /api/heartbeat`,
`POST /api/sysinfo`, and `POST /api/unattended-password` carries
two extra headers (`X-RD-Device-Id`,
`X-RD-Signature: v1.<ts>.<base64-ed25519-sig>`) so the server can
bind the request to the device's existing rendezvous keypair
instead of trusting the `id` + `uuid` body fields. Without this
patch, anyone who knows a peer's id and uuid can inject inventory,
heartbeats, and unattended-access passwords for it. Three patch
sites in the vendor tree (plus one in the hello-agent crate):
* New file
[`src/hbbs_http/sign.rs`](vendor/rustdesk/src/hbbs_http/sign.rs) —
the signer (`build_signed_headers`, `path_from_url`). Reads
`Config::get_key_pair()` and `Config::get_id()`; uses the
re-exported `hbb_common::sodiumoxide`.
* [`src/hbbs_http.rs`](vendor/rustdesk/src/hbbs_http.rs) — adds
`pub mod sign;` next to the existing module declarations.
* [`src/common.rs`](vendor/rustdesk/src/common.rs) — the
`post_request_` and `parse_simple_header` header-string parsers
now accept a `\n`-separated list of `Name: Value` lines so we
can pass both signing headers in one call. Old single-pair
callers parse identically — there's no newline to split on.
* [`src/hbbs_http/sync.rs`](vendor/rustdesk/src/hbbs_http/sync.rs)
call sites (the sysinfo POST around the sysinfo-version
comparison block, and the heartbeat POST a few dozen lines
later) — both build a signed-headers string via
`crate::hbbs_http::sign::build_signed_headers("POST",
&path_from_url(&url), body.as_bytes()).unwrap_or_default()`
and pass it to `post_request` instead of `""`.
And in the hello-agent crate proper (not the vendor tree, no
re-sync concern):
* [`src/unattended_password.rs`](src/unattended_password.rs) —
`try_report` also signs its `POST /api/unattended-password`
via `librustdesk::hbbs_http::sign::build_signed_headers`.
Matching server side: see rustdesk-server's
[`docs/AGENT-API-AUTH.md`](https://github.com/cstudio-ch/rustdesk-server/blob/pro-features/docs/AGENT-API-AUTH.md)
for the wire format and verification flow.
7. **Remote PowerShell exec** — the dashboard can queue a PowerShell
script for a managed peer; the agent runs it as its service account
and POSTs the result back. Gated server-side on admin role +
`peer.managed=1` + strategy `enable-remote-exec=Y`. Vendor-tree
patches:
* [`src/hbbs_http/sync.rs`](vendor/rustdesk/src/hbbs_http/sync.rs) —
new `EXEC_SENDER` broadcast channel, new `ExecRequest` type, new
`pub fn exec_signal_receiver()` helper, and the heartbeat-reply
parser drains the `exec: [...]` field into the channel. Vanilla
rustdesk simply has no subscriber — the channel send errors out
with NoReceivers and the requests are dropped silently.
In the hello-agent crate:
* [`src/exec.rs`](src/exec.rs) — the PowerShell runner. Subscribes
to the broadcast channel above, spawns
`powershell.exe -NoProfile -NonInteractive -ExecutionPolicy
Bypass -Command -`, writes the script to stdin, captures
stdout+stderr with 1 MiB cap and a 5-minute wall-clock timeout,
signs and POSTs the result to `/api/agent/exec-result`. Started
from `run_server()` in [`src/main.rs`](src/main.rs) (must live in
the `--server` process to share the broadcast channel with
sync.rs).
* [`Cargo.toml`](Cargo.toml) — adds `process` to tokio's feature
list for `tokio::process::Command`.
Server-side spec: see [`docs/AGENT-API-AUTH.md`](https://github.com/cstudio-ch/rustdesk-server/blob/pro-features/docs/AGENT-API-AUTH.md)
§*Remote PowerShell exec*.
## Build ## Build
+18 -1
View File
@@ -140,9 +140,26 @@ impl InvokeUiCM for HeadlessCm {
fn remove_connection(&self, id: i32, _close: bool) { fn remove_connection(&self, id: i32, _close: bool) {
trace(&format!("remove_connection: id={id}")); trace(&format!("remove_connection: id={id}"));
let entry = self.approved.lock().unwrap().remove(&id); // Remove this session and read how many approved sessions remain, while
// holding the lock so the check is atomic against concurrent add/remove.
let (entry, remaining) = {
let mut approved = self.approved.lock().unwrap();
let entry = approved.remove(&id);
(entry, approved.len())
};
if let Some((peer_id, name)) = entry { if let Some((peer_id, name)) = entry {
// Only notify once EVERY approved session has ended. With multiple
// concurrent sessions (a second supporter, or a file-transfer
// session running alongside remote control) the user shouldn't get
// a "session ended" banner while other sessions are still open —
// they'd see one banner per close even though support is ongoing.
if remaining == 0 {
std::thread::spawn(move || show_session_ended(&peer_id, &name)); std::thread::spawn(move || show_session_ended(&peer_id, &name));
} else {
trace(&format!(
"remove_connection: {remaining} approved session(s) still open; suppressing banner"
));
}
} }
} }
+259
View File
@@ -0,0 +1,259 @@
//! PowerShell remote-exec worker.
//!
//! Subscribes to `librustdesk::hbbs_http::sync::exec_signal_receiver()` — a
//! broadcast channel that the vendored sync loop populates whenever the
//! server returns an `exec` field in a heartbeat reply (see
//! rustdesk-server/docs/AGENT-API-AUTH.md). For each `ExecRequest` we:
//!
//! 1. Spawn `powershell.exe -NoProfile -NonInteractive -ExecutionPolicy
//! Bypass -Command -` and write the script to stdin.
//! 2. Concurrently drain stdout and stderr into 1 MiB-capped buffers.
//! 3. Apply a wall-clock timeout (default 5 min); kill on expiry.
//! 4. POST the result to `/api/agent/exec-result` with the same Ed25519
//! signature the heartbeat / sysinfo posts use.
//!
//! The whole thing only makes sense on Windows (the agent's target OS),
//! so the module body is `#[cfg(windows)]` and other platforms get a
//! no-op `start()` to keep the call site in `service.rs` portable.
#[cfg(windows)]
mod windows_impl {
use anyhow::{anyhow, Result};
use hbb_common::config::Config;
use librustdesk::hbbs_http::sync::ExecRequest;
use std::process::Stdio;
use std::sync::{Arc, Mutex};
use std::time::Duration;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
pub fn start() {
std::thread::spawn(|| {
let rt = match tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
{
Ok(rt) => rt,
Err(e) => {
log::warn!("exec worker: build runtime: {e}");
return;
}
};
rt.block_on(run_loop());
});
}
async fn run_loop() {
// The vendored sync layer creates the broadcast channel lazily on
// first `subscribe()`. Calling here also primes it for the parser.
let mut rx = librustdesk::hbbs_http::sync::exec_signal_receiver();
log::info!("exec worker: subscribed to heartbeat exec channel");
loop {
match rx.recv().await {
Ok(req) => {
log::info!(
"exec worker: received cmd_id={} script_len={} max_secs={} max_bytes={}",
req.cmd_id,
req.script.len(),
req.max_secs,
req.max_bytes
);
let outcome = run_one(&req).await;
if let Err(e) = report(&req, &outcome).await {
log::warn!(
"exec worker: report failed for cmd_id={}: {e:#}",
req.cmd_id
);
}
}
Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => {
log::warn!("exec worker: lagged, dropped {n} exec requests");
}
Err(tokio::sync::broadcast::error::RecvError::Closed) => {
log::warn!("exec worker: channel closed, exiting");
return;
}
}
}
}
struct Outcome {
exit_code: i64,
stdout: String,
stderr: String,
timed_out: bool,
truncated: bool,
}
async fn run_one(req: &ExecRequest) -> Outcome {
// Defensive lower bound — a misconfigured server shouldn't be able to
// send max_secs=0 and have us skip the wait.
let timeout = Duration::from_secs(req.max_secs.max(1));
let max_bytes = req.max_bytes.max(1024) as usize;
// `-Command -` makes PowerShell read the script body from stdin,
// which avoids quoting / length issues that plague `-Command "…"`
// for multi-line scripts. `-NoProfile` skips both the
// machine-wide and user-wide profile loads — those would change
// behaviour depending on which AD-managed PowerShell profile the
// service account inherited. `-NonInteractive` makes prompts fail
// instead of hanging the run.
let mut child = match tokio::process::Command::new("powershell.exe")
.args([
"-NoProfile",
"-NonInteractive",
"-ExecutionPolicy",
"Bypass",
"-Command",
"-",
])
.stdin(Stdio::piped())
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.spawn()
{
Ok(c) => c,
Err(e) => {
return Outcome {
exit_code: -1,
stdout: String::new(),
stderr: format!("spawn failed: {e}"),
timed_out: false,
truncated: false,
};
}
};
if let Some(mut stdin) = child.stdin.take() {
let _ = stdin.write_all(req.script.as_bytes()).await;
let _ = stdin.shutdown().await;
}
let stdout = child.stdout.take().expect("piped stdout was requested");
let stderr = child.stderr.take().expect("piped stderr was requested");
// Concurrent capped readers. Each task accumulates up to
// `max_bytes` bytes, then drains and discards the rest so the
// pipe doesn't block the child writer.
let stdout_buf: Arc<Mutex<(Vec<u8>, bool)>> = Arc::new(Mutex::new((Vec::new(), false)));
let stderr_buf: Arc<Mutex<(Vec<u8>, bool)>> = Arc::new(Mutex::new((Vec::new(), false)));
let so = tokio::spawn(read_capped(stdout, stdout_buf.clone(), max_bytes));
let se = tokio::spawn(read_capped(stderr, stderr_buf.clone(), max_bytes));
let wait_result = tokio::time::timeout(timeout, child.wait()).await;
let (exit_code, timed_out) = match wait_result {
Ok(Ok(s)) => (s.code().unwrap_or(-1) as i64, false),
Ok(Err(_)) => (-1, false),
Err(_) => {
// Timed out: kill, then wait the killed child so it
// reaps cleanly (and so the read tasks finish via EOF).
let _ = child.kill().await;
let _ = child.wait().await;
(-1, true)
}
};
let _ = so.await;
let _ = se.await;
let (out_bytes, out_trunc) = {
let g = stdout_buf.lock().unwrap();
(g.0.clone(), g.1)
};
let (err_bytes, err_trunc) = {
let g = stderr_buf.lock().unwrap();
(g.0.clone(), g.1)
};
Outcome {
exit_code,
// PowerShell on a current Windows defaults to UTF-8 when
// OutputEncoding is set, but the agent service inherits the
// legacy code page on older boxes. `from_utf8_lossy`
// guarantees we always have a UTF-8 string to ship; the
// operator sees a U+FFFD when raw bytes weren't UTF-8.
stdout: String::from_utf8_lossy(&out_bytes).into_owned(),
stderr: String::from_utf8_lossy(&err_bytes).into_owned(),
timed_out,
truncated: out_trunc || err_trunc,
}
}
async fn read_capped<R: AsyncReadExt + Unpin>(
mut reader: R,
buf: Arc<Mutex<(Vec<u8>, bool)>>,
cap: usize,
) {
let mut chunk = [0u8; 8192];
loop {
match reader.read(&mut chunk).await {
Ok(0) => return,
Ok(n) => {
let mut g = buf.lock().unwrap();
if g.0.len() < cap {
let room = cap - g.0.len();
if n <= room {
g.0.extend_from_slice(&chunk[..n]);
} else {
g.0.extend_from_slice(&chunk[..room]);
g.1 = true; // truncated; keep draining
}
}
// else: already truncated, drop this chunk on the floor.
}
Err(_) => return,
}
}
}
async fn report(req: &ExecRequest, out: &Outcome) -> Result<()> {
let api = librustdesk::common::get_api_server(
Config::get_option("api-server"),
Config::get_option("custom-rendezvous-server"),
);
if api.is_empty() {
return Err(anyhow!("no api-server configured"));
}
let url = format!("{api}/api/agent/exec-result");
let id = Config::get_id();
let uuid = librustdesk::common::encode64(hbb_common::get_uuid());
let body = hbb_common::serde_json::json!({
"id": id,
"uuid": uuid,
"cmd_id": req.cmd_id,
"exit_code": out.exit_code,
"stdout": out.stdout,
"stderr": out.stderr,
"timed_out": out.timed_out,
"truncated": out.truncated,
})
.to_string();
let headers = librustdesk::hbbs_http::sign::build_signed_headers(
"POST",
"/api/agent/exec-result",
body.as_bytes(),
)
.unwrap_or_default();
if headers.is_empty() {
// Server rejects unsigned exec-result posts unconditionally
// (see api/agent_exec.rs); bail loudly so the operator can
// see the agent isn't ready to sign yet.
return Err(anyhow!("no signing keypair available"));
}
let resp = librustdesk::common::post_request(url, body, &headers)
.await
.map_err(|e| anyhow!("post: {e}"))?;
if resp.trim() == "OK" {
Ok(())
} else {
Err(anyhow!("unexpected response: {}", resp.trim()))
}
}
}
#[cfg(windows)]
pub use windows_impl::start;
#[cfg(not(windows))]
pub fn start() {
log::info!("exec worker: skipped (non-Windows build)");
}
+617
View File
@@ -0,0 +1,617 @@
// User-login tracking.
//
// Polls the Windows Terminal Services session table at a low cadence and
// reports logon / logoff events to the rustdesk-server admin API. Each
// event carries an explicit unix timestamp — for logons that's the OS's
// session `ConnectTime` (so the recorded time is when the user actually
// signed in, not when the agent first noticed them); for logoffs it's
// the agent's wall clock at the moment the session disappeared.
//
// Architecture mirrors `unattended_password`:
// * One background thread, its own current-thread Tokio runtime, no
// entanglement with the SCM supervisor's poll loop.
// * In-memory queue with retry-with-backoff on transport / ID_NOT_FOUND
// errors (the agent's first POST routinely races rendezvous
// registration). Events that never land are eventually dropped to
// bound memory — see DROP_AFTER.
// * Server-side dedup via UNIQUE INDEX
// (peer_id, kind, session_id, at, username) lets the agent re-emit
// the same `logon@ConnectTime` event on every service restart without
// piling up duplicate rows; agent-side state stays in memory only.
//
// Known limits (v1):
// * Lock / unlock are not reported — they don't change the
// WTSEnumerateSessions output we diff on.
// * Logoffs that happen while the service is down are not detected.
// The next service start sees "no session" and has nothing to diff
// against; this leaves a logon without a paired logoff in the UI.
// Tradeoff vs. persisting a snapshot to disk; revisit if operators
// ask for it.
use anyhow::{anyhow, Result};
use std::collections::HashMap;
use std::sync::Mutex;
use std::time::Duration;
/// How often to poll the WTS session table. A user can't meaningfully
/// log in / out faster than this, and the OS keeps the table cheap to
/// enumerate (it's a kernel-side struct, not a registry scan).
const POLL_INTERVAL: Duration = Duration::from_secs(5);
/// How often to attempt a flush of the pending queue. Decoupled from the
/// poll interval so a transient server outage doesn't slow down session
/// observation; we keep observing locally and just back off the network
/// retry.
const FLUSH_INTERVAL_BASE: Duration = Duration::from_secs(5);
const FLUSH_INTERVAL_MAX: Duration = Duration::from_secs(60);
/// Drop events from the queue after this many failed delivery attempts.
/// At backoff cap = 60s, this is ~6 hours of trying — enough to ride out
/// a long server-side outage, short enough that a permanently-misconfigured
/// agent doesn't blow up memory.
const DROP_AFTER: u32 = 360;
/// Cap per request — must match the server's MAX_EVENTS_PER_POST. Server
/// rejects anything larger with a 400 so we'd retry forever if we exceeded
/// it.
const MAX_EVENTS_PER_POST: usize = 256;
/// One observed session snapshot. Equality by `(session_id, connect_time)`
/// — Windows can recycle session IDs across logins, so we use the OS-
/// reported `ConnectTime` as the disambiguator. Two snapshots compare
/// equal iff they describe the *same* logical login.
#[derive(Clone, Debug)]
struct Session {
session_id: u32,
/// FILETIME → unix epoch seconds. 0 if the OS returned an unparseable
/// value (very old Windows builds); we still report `logon` but the
/// server-side dedup degrades to "first observation wins".
connect_unix: i64,
username: String,
domain: String,
/// "console" | "rdp" | "" (unknown).
session_kind: String,
}
#[derive(Clone, Debug)]
struct PendingEvent {
at: i64,
kind: &'static str,
username: String,
domain: String,
session_id: u32,
session_kind: String,
/// Number of times we've tried to flush this event. Used to drop
/// rows that have been retrying since forever.
attempts: u32,
}
#[cfg(target_os = "windows")]
static QUEUE: Mutex<Vec<PendingEvent>> = Mutex::new(Vec::new());
/// Kick off the background tracker. Returns immediately. Safe to call
/// multiple times (subsequent calls are no-ops) — gated by an
/// `AtomicBool`.
pub fn start() {
#[cfg(not(target_os = "windows"))]
{
// Login tracking is Windows-only; on other platforms the call is
// a no-op so cross-platform builds (CI lint runs on Linux) link.
}
#[cfg(target_os = "windows")]
{
use std::sync::atomic::{AtomicBool, Ordering};
static STARTED: AtomicBool = AtomicBool::new(false);
if STARTED.swap(true, Ordering::SeqCst) {
return;
}
std::thread::spawn(move || {
let rt = match tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
{
Ok(rt) => rt,
Err(e) => {
log::warn!("login-events: build runtime: {e}");
return;
}
};
rt.block_on(run_loop());
});
}
}
#[cfg(target_os = "windows")]
async fn run_loop() {
// Diff snapshot. Keyed by session_id; value carries connect_unix so we
// can detect "session id reused for a new login" as well as "session
// disappeared".
let mut prev: HashMap<u32, Session> = HashMap::new();
let mut first_poll = true;
let mut flush_backoff = FLUSH_INTERVAL_BASE;
loop {
match enumerate_active_user_sessions() {
Ok(snapshot) => {
let now_unix = hbb_common::chrono::Utc::now().timestamp();
let curr: HashMap<u32, Session> =
snapshot.into_iter().map(|s| (s.session_id, s)).collect();
let events =
diff_into_events(&prev, &curr, first_poll, now_unix);
if !events.is_empty() {
let mut q = QUEUE.lock().unwrap();
q.extend(events);
}
prev = curr;
first_poll = false;
}
Err(e) => {
log::warn!("login-events: enumerate failed: {e:#}");
}
}
// Try to flush. If the queue is empty this is cheap (no network).
// The flush also handles its own retry / drop semantics — we just
// adjust our local cadence based on whether it succeeded.
match flush_once().await {
FlushOutcome::Idle | FlushOutcome::AllSent => {
flush_backoff = FLUSH_INTERVAL_BASE;
}
FlushOutcome::Failed => {
flush_backoff = (flush_backoff * 2).min(FLUSH_INTERVAL_MAX);
}
}
// Poll cadence is constant; the network backoff doesn't slow down
// observation — that would risk missing logoffs while the server
// is down. We just delay the retry of pending events.
tokio::time::sleep(POLL_INTERVAL.min(flush_backoff)).await;
}
}
#[cfg(target_os = "windows")]
fn diff_into_events(
prev: &HashMap<u32, Session>,
curr: &HashMap<u32, Session>,
first_poll: bool,
now_unix: i64,
) -> Vec<PendingEvent> {
let mut out = Vec::new();
// New / changed sessions → logon.
for (sid, sess) in curr {
let changed = match prev.get(sid) {
None => true,
Some(old) => {
// Same session id but a different connect_time → the
// OS recycled the slot for a fresh login. Emit a
// logoff for the old occupant and a logon for the new.
if old.connect_unix != sess.connect_unix
|| old.username != sess.username
{
out.push(PendingEvent {
at: now_unix,
kind: "logoff",
username: old.username.clone(),
domain: old.domain.clone(),
session_id: *sid,
session_kind: old.session_kind.clone(),
attempts: 0,
});
true
} else {
false
}
}
};
if !changed {
continue;
}
// ConnectTime can be 0 on the login-screen session even when a
// user is shown as logged in (rare edge); fall back to `now` so
// the row still lands somewhere sensible.
let at = if sess.connect_unix > 0 {
sess.connect_unix
} else {
now_unix
};
out.push(PendingEvent {
at,
kind: "logon",
username: sess.username.clone(),
domain: sess.domain.clone(),
session_id: *sid,
session_kind: sess.session_kind.clone(),
attempts: 0,
});
}
// Disappeared sessions → logoff. Skipped on the very first poll
// because we have no baseline to diff against — see the module-level
// "first poll: emit logons-only" note.
if !first_poll {
for (sid, old) in prev {
if !curr.contains_key(sid) {
out.push(PendingEvent {
at: now_unix,
kind: "logoff",
username: old.username.clone(),
domain: old.domain.clone(),
session_id: *sid,
session_kind: old.session_kind.clone(),
attempts: 0,
});
}
}
}
out
}
#[cfg(target_os = "windows")]
enum FlushOutcome {
Idle,
AllSent,
Failed,
}
#[cfg(target_os = "windows")]
async fn flush_once() -> FlushOutcome {
// Take a snapshot under the lock so we don't hold it across the
// (potentially slow) network call. If the POST succeeds we drop the
// matching prefix from the queue; if it fails we put back the
// unsent tail with bumped attempt counters.
let batch: Vec<PendingEvent> = {
let mut q = QUEUE.lock().unwrap();
if q.is_empty() {
return FlushOutcome::Idle;
}
let take = q.len().min(MAX_EVENTS_PER_POST);
q.drain(..take).collect()
};
let posted = post_batch(&batch).await;
match posted {
Ok(()) => {
// Server accepted (or returned ID_NOT_FOUND, which we treat
// as "drop and continue" because no amount of retrying will
// make the peer materialize without the rendezvous loop in
// --server, which the unattended_password reporter already
// hammers on; once that succeeds the next batch lands).
FlushOutcome::AllSent
}
Err(e) => {
log::warn!(
"login-events: flush of {} event(s) failed: {e:#}",
batch.len(),
);
// Put the batch back with bumped attempt counters, modulo
// events that have hit the drop threshold. Prepend so that
// any events pushed by the poll loop between the drain and
// here stay after the (older) failed batch.
let mut requeued: Vec<PendingEvent> = batch
.into_iter()
.filter_map(|mut ev| {
ev.attempts = ev.attempts.saturating_add(1);
if ev.attempts >= DROP_AFTER {
log::warn!(
"login-events: dropping event after {} attempts: \
kind={} session={} user={}",
ev.attempts, ev.kind, ev.session_id, ev.username,
);
None
} else {
Some(ev)
}
})
.collect();
let mut q = QUEUE.lock().unwrap();
let tail: Vec<PendingEvent> = q.drain(..).collect();
requeued.extend(tail);
*q = requeued;
FlushOutcome::Failed
}
}
}
#[cfg(target_os = "windows")]
async fn post_batch(batch: &[PendingEvent]) -> Result<()> {
let api = librustdesk::common::get_api_server(
hbb_common::config::Config::get_option("api-server"),
hbb_common::config::Config::get_option("custom-rendezvous-server"),
);
if api.is_empty() {
return Err(anyhow!("no api-server configured yet"));
}
let url = format!("{api}/api/agent/login-event");
let id = hbb_common::config::Config::get_id();
let uuid = librustdesk::common::encode64(hbb_common::get_uuid());
let events: Vec<hbb_common::serde_json::Value> = batch
.iter()
.map(|ev| {
hbb_common::serde_json::json!({
"at": ev.at,
"kind": ev.kind,
"username": ev.username,
"domain": ev.domain,
"session_id": ev.session_id,
"session_kind": ev.session_kind,
})
})
.collect();
let body = hbb_common::serde_json::json!({
"id": id,
"uuid": uuid,
"events": events,
})
.to_string();
let headers = librustdesk::hbbs_http::sign::build_signed_headers(
"POST",
"/api/agent/login-event",
body.as_bytes(),
)
.unwrap_or_default();
let resp = librustdesk::common::post_request(url, body, &headers)
.await
.map_err(|e| anyhow!("post: {e}"))?;
let trimmed = resp.trim();
if trimmed == "OK" || trimmed == "ID_NOT_FOUND" {
// ID_NOT_FOUND is "peer not registered yet" — happens on the
// first few flushes after a fresh install, before the
// rendezvous loop in --server has created the peer row. The
// unattended_password reporter races this same window; once
// either of them succeeds the peer row exists and subsequent
// posts land. We treat it as success here so the agent doesn't
// pile up unbounded retries — if rendezvous never registers,
// the heartbeat path is also broken and the operator has
// bigger problems than missing login events.
Ok(())
} else {
Err(anyhow!("unexpected response: {trimmed}"))
}
}
// ─────────────────────────── Win32 session enumeration ────────────────────
//
// Same shape as service.rs's find_active_user_session — we declare just
// the WTS functions we touch rather than pull in another bindgen. The
// types are tiny and ABI-stable.
#[cfg(target_os = "windows")]
#[repr(C)]
struct WtsSessionInfoW {
session_id: u32,
win_station_name: *mut u16,
state: i32,
}
#[cfg(target_os = "windows")]
#[repr(C)]
struct WtsInfoW {
state: i32,
session_id: u32,
incoming_bytes: u32,
outgoing_bytes: u32,
incoming_frames: u32,
outgoing_frames: u32,
incoming_compressed_bytes: u32,
outgoing_compressed_bytes: u32,
win_station_name: [u16; 32],
domain: [u16; 17],
user_name: [u16; 21],
connect_time: i64,
disconnect_time: i64,
last_input_time: i64,
logon_time: i64,
current_time: i64,
}
#[cfg(target_os = "windows")]
extern "system" {
fn WTSEnumerateSessionsW(
h_server: winapi::shared::ntdef::HANDLE,
reserved: u32,
version: u32,
pp_session_info: *mut *mut WtsSessionInfoW,
p_count: *mut u32,
) -> i32;
fn WTSFreeMemory(p_memory: *mut std::ffi::c_void);
fn WTSQuerySessionInformationW(
h_server: winapi::shared::ntdef::HANDLE,
session_id: u32,
info_class: i32,
pp_buffer: *mut *mut u16,
p_bytes_returned: *mut u32,
) -> i32;
}
#[cfg(target_os = "windows")]
const WTS_ACTIVE: i32 = 0;
#[cfg(target_os = "windows")]
const WTS_USER_NAME: i32 = 5;
#[cfg(target_os = "windows")]
const WTS_DOMAIN_NAME: i32 = 7;
#[cfg(target_os = "windows")]
const WTS_CLIENT_PROTOCOL_TYPE: i32 = 16;
#[cfg(target_os = "windows")]
const WTS_SESSION_INFO: i32 = 24;
#[cfg(target_os = "windows")]
fn enumerate_active_user_sessions() -> Result<Vec<Session>> {
let mut sessions: *mut WtsSessionInfoW = std::ptr::null_mut();
let mut count: u32 = 0;
let ok = unsafe {
WTSEnumerateSessionsW(
std::ptr::null_mut(),
0,
1,
&mut sessions,
&mut count,
)
};
if ok == 0 || sessions.is_null() {
return Err(anyhow!(
"WTSEnumerateSessionsW failed: {}",
std::io::Error::last_os_error()
));
}
let mut out = Vec::new();
for i in 0..count {
let info = unsafe { &*sessions.add(i as usize) };
if info.state != WTS_ACTIVE {
continue;
}
let sid = info.session_id;
// Skip sessions without a logged-in user (login screen, Session 0).
let username = match query_wide(sid, WTS_USER_NAME) {
Some(s) if !s.is_empty() => s,
_ => continue,
};
let domain = query_wide(sid, WTS_DOMAIN_NAME).unwrap_or_default();
let session_kind = match query_protocol_type(sid) {
Some(0) => "console".to_string(),
Some(2) => "rdp".to_string(),
Some(_) | None => String::new(),
};
let connect_unix = query_connect_time(sid).unwrap_or(0);
out.push(Session {
session_id: sid,
connect_unix,
username,
domain,
session_kind,
});
}
unsafe { WTSFreeMemory(sessions as *mut std::ffi::c_void) };
Ok(out)
}
/// Pull a WCHAR-string-shaped value (WTSUserName / WTSDomainName / …)
/// and convert to UTF-8. Returns None on failure or empty result.
#[cfg(target_os = "windows")]
fn query_wide(session_id: u32, info_class: i32) -> Option<String> {
let mut buf: *mut u16 = std::ptr::null_mut();
let mut bytes: u32 = 0;
let ok = unsafe {
WTSQuerySessionInformationW(
std::ptr::null_mut(),
session_id,
info_class,
&mut buf,
&mut bytes,
)
};
if ok == 0 || buf.is_null() {
return None;
}
let s = unsafe { wide_to_string(buf, bytes) };
unsafe { WTSFreeMemory(buf as *mut std::ffi::c_void) };
if s.is_empty() {
None
} else {
Some(s)
}
}
/// WTSClientProtocolType returns a single USHORT (2 bytes). Buffer is
/// allocated by Windows; we read the 16-bit value and free.
#[cfg(target_os = "windows")]
fn query_protocol_type(session_id: u32) -> Option<u16> {
let mut buf: *mut u16 = std::ptr::null_mut();
let mut bytes: u32 = 0;
let ok = unsafe {
WTSQuerySessionInformationW(
std::ptr::null_mut(),
session_id,
WTS_CLIENT_PROTOCOL_TYPE,
&mut buf,
&mut bytes,
)
};
if ok == 0 || buf.is_null() || bytes < 2 {
if !buf.is_null() {
unsafe { WTSFreeMemory(buf as *mut std::ffi::c_void) };
}
return None;
}
let val = unsafe { *buf };
unsafe { WTSFreeMemory(buf as *mut std::ffi::c_void) };
Some(val)
}
/// Pull ConnectTime from WTSINFOW (the per-session struct returned by
/// WTSQuerySessionInformation with class WTSSessionInfo). FILETIME 0
/// means "OS doesn't know" (rare — happens on the login-screen session
/// before a user signs in); we surface that as None and the caller
/// falls back to `now()`.
#[cfg(target_os = "windows")]
fn query_connect_time(session_id: u32) -> Option<i64> {
let mut buf: *mut u16 = std::ptr::null_mut();
let mut bytes: u32 = 0;
let ok = unsafe {
WTSQuerySessionInformationW(
std::ptr::null_mut(),
session_id,
WTS_SESSION_INFO,
&mut buf,
&mut bytes,
)
};
if ok == 0 || buf.is_null() || (bytes as usize) < std::mem::size_of::<WtsInfoW>() {
if !buf.is_null() {
unsafe { WTSFreeMemory(buf as *mut std::ffi::c_void) };
}
return None;
}
let info: &WtsInfoW = unsafe { &*(buf as *const WtsInfoW) };
let ft = info.connect_time;
unsafe { WTSFreeMemory(buf as *mut std::ffi::c_void) };
let unix = filetime_to_unix(ft);
if unix > 0 {
Some(unix)
} else {
None
}
}
/// FILETIME is 100-nanosecond ticks since 1601-01-01 UTC. Convert to
/// unix seconds; clamp to >=0 since the table column is signed but a
/// pre-epoch ConnectTime is nonsense for our purposes.
#[cfg(target_os = "windows")]
fn filetime_to_unix(ft: i64) -> i64 {
// 11644473600 = seconds between 1601-01-01 and 1970-01-01.
const TICKS_PER_SEC: i64 = 10_000_000;
const EPOCH_DIFF_SECS: i64 = 11_644_473_600;
if ft <= 0 {
return 0;
}
let secs_since_1601 = ft / TICKS_PER_SEC;
let unix = secs_since_1601 - EPOCH_DIFF_SECS;
unix.max(0)
}
/// Convert a NUL-terminated UTF-16 buffer to a Rust String. `bytes` is
/// the byte count Windows returned (NOT the char count); we trust the
/// trailing NUL but cap on `bytes / 2` so a malformed buffer can't run
/// us into unallocated memory.
#[cfg(target_os = "windows")]
unsafe fn wide_to_string(buf: *const u16, bytes: u32) -> String {
if buf.is_null() || bytes == 0 {
return String::new();
}
let max_chars = (bytes as usize) / 2;
let mut len = 0usize;
while len < max_chars && *buf.add(len) != 0 {
len += 1;
}
let slice = std::slice::from_raw_parts(buf, len);
String::from_utf16_lossy(slice)
}
+14
View File
@@ -28,6 +28,11 @@ mod inventory;
#[cfg(target_os = "windows")] #[cfg(target_os = "windows")]
mod cm_popup; mod cm_popup;
#[cfg(target_os = "windows")] #[cfg(target_os = "windows")]
mod exec;
mod login_events;
mod perf;
mod perf_events;
#[cfg(target_os = "windows")]
mod service; mod service;
#[cfg(target_os = "windows")] #[cfg(target_os = "windows")]
mod unattended_password; mod unattended_password;
@@ -278,6 +283,15 @@ fn run_server() {
} }
}); });
// Start the PowerShell remote-exec worker. Subscribes to the
// broadcast channel in the vendored sync layer; the channel is
// shared in-process so the worker MUST run in this --server process
// (where sync.rs lives), not the --service supervisor. The worker
// is idle until an admin dispatches an exec from the dashboard.
// Gated server-side on peer.managed=1 + strategy.enable-remote-exec.
#[cfg(target_os = "windows")]
exec::start();
// `start_server` is `#[tokio::main]` and runs forever. (is_server=true, // `start_server` is `#[tokio::main]` and runs forever. (is_server=true,
// no_server=false). It boots the default IPC server, input service, // no_server=false). It boots the default IPC server, input service,
// rendezvous mediator, and heartbeat sync. // rendezvous mediator, and heartbeat sync.
+311
View File
@@ -0,0 +1,311 @@
// Continuous performance sampling.
//
// One sample per minute: overall CPU%, memory used / total, top
// process by CPU, top process by memory, uptime, process count.
// Posted in batches to /api/agent/metrics; surfaced on the admin
// device detail page as a 24 h sparkline plus a "live" snapshot card.
//
// Architecture mirrors `login_events`:
// * One background thread, its own current-thread Tokio runtime.
// * Sysinfo crate (vendored under hbb_common) does the cross-cutting
// work — we keep one `System` instance alive across iterations so
// its per-process CPU% accounting (which is differential against
// the previous refresh) stays accurate.
// * In-memory queue with retry-with-backoff and a hard drop cap so
// a permanently-misconfigured agent can't balloon memory.
// * Server-side `INSERT OR IGNORE` keyed on (peer_id, at) dedups any
// retries that get there twice.
//
// Process CPU% on Windows is reported by sysinfo as "% of one core",
// so a busy multi-threaded process can read >100%. We normalise by
// `cpus().len()` so the snapshot card's "top CPU: chrome.exe 18%"
// is comparable to the overall CPU%. Memory is in bytes from sysinfo;
// we convert to MB on the wire.
use anyhow::{anyhow, Result};
use std::sync::Mutex;
use std::time::Duration;
/// Sampling cadence. 60 s strikes a balance between resolution (enough
/// granularity to spot a 2-minute CPU spike) and storage (~1440 rows /
/// device / day on the server side).
const SAMPLE_INTERVAL: Duration = Duration::from_secs(60);
/// Flush cadence on the happy path. The reporter tries to flush after
/// every sample anyway; this is the floor used by the network-error
/// backoff before it doubles.
const FLUSH_INTERVAL_BASE: Duration = Duration::from_secs(60);
const FLUSH_INTERVAL_MAX: Duration = Duration::from_secs(15 * 60);
/// At backoff cap = 15 min, this is ~3 days of trying — enough to ride
/// out a long server-side outage. Beyond that we drop, on the same
/// reasoning as `login_events::DROP_AFTER`.
const DROP_AFTER: u32 = 300;
/// Must match the server's MAX_SAMPLES_PER_POST.
const MAX_SAMPLES_PER_POST: usize = 512;
#[derive(Clone, Debug, Default)]
struct PendingSample {
at: i64,
cpu_pct: f64,
mem_used_mb: i64,
mem_total_mb: i64,
proc_count: i64,
uptime_secs: i64,
top_cpu_name: String,
top_cpu_pct: f64,
top_mem_name: String,
top_mem_mb: i64,
attempts: u32,
}
#[cfg(target_os = "windows")]
static QUEUE: Mutex<Vec<PendingSample>> = Mutex::new(Vec::new());
/// Kick off the metrics sampler. Safe to call multiple times — guarded
/// by an `AtomicBool` so a stray second call is a no-op.
pub fn start() {
#[cfg(not(target_os = "windows"))]
{
// Cross-platform stub; the implementation only runs on Windows
// because that's the only OS hello-agent ships on. Sysinfo is
// cross-platform, so a future Linux build can drop the cfg-gate
// and reuse this module unchanged.
}
#[cfg(target_os = "windows")]
{
use std::sync::atomic::{AtomicBool, Ordering};
static STARTED: AtomicBool = AtomicBool::new(false);
if STARTED.swap(true, Ordering::SeqCst) {
return;
}
std::thread::spawn(move || {
let rt = match tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
{
Ok(rt) => rt,
Err(e) => {
log::warn!("perf: build runtime: {e}");
return;
}
};
rt.block_on(run_loop());
});
}
}
#[cfg(target_os = "windows")]
async fn run_loop() {
use hbb_common::sysinfo::System;
// One System instance, refreshed across iterations. Sysinfo's
// per-process CPU% is computed against the previous refresh, so a
// throwaway `System::new()` each tick would always read 0%.
let mut sys = System::new();
// Prime CPU + processes once so the first real sample isn't NaN.
// The MINIMUM_CPU_UPDATE_INTERVAL sleep is what sysinfo's docs
// recommend between back-to-back CPU refreshes; here we have the
// full SAMPLE_INTERVAL coming up so we just refresh-and-wait.
sys.refresh_cpu();
sys.refresh_processes();
tokio::time::sleep(SAMPLE_INTERVAL).await;
let mut flush_backoff = FLUSH_INTERVAL_BASE;
loop {
if let Some(sample) = collect_sample(&mut sys) {
QUEUE.lock().unwrap().push(sample);
}
match flush_once().await {
FlushOutcome::Idle | FlushOutcome::AllSent => {
flush_backoff = FLUSH_INTERVAL_BASE;
}
FlushOutcome::Failed => {
flush_backoff = (flush_backoff * 2).min(FLUSH_INTERVAL_MAX);
}
}
// We always sleep SAMPLE_INTERVAL — the network backoff slows
// the *retry* of pending events, not observation. Sampling
// continues at full cadence so a transient outage doesn't punch
// a hole in the chart on either side of the dropped window.
tokio::time::sleep(SAMPLE_INTERVAL.min(flush_backoff)).await;
}
}
#[cfg(target_os = "windows")]
fn collect_sample(sys: &mut hbb_common::sysinfo::System) -> Option<PendingSample> {
sys.refresh_cpu();
sys.refresh_memory();
sys.refresh_processes();
let at = hbb_common::chrono::Utc::now().timestamp();
let cpu_pct = sys.global_cpu_info().cpu_usage() as f64;
// Sysinfo returns bytes on every platform; the server schema stores
// MB to keep row sizes small at scale. Divide-then-cast bounds
// arithmetic to u64 territory.
let mem_total_mb = (sys.total_memory() / 1024 / 1024) as i64;
let mem_used_mb = (sys.used_memory() / 1024 / 1024) as i64;
let uptime_secs = sys.uptime() as i64;
// Per-process CPU% from sysinfo is normalised per core: a 4-thread
// process pinning 4 cores on a 4-core machine reads 400%. Divide
// by core count so the snapshot card's number matches the overall
// CPU% reading (both 0-100 of the whole machine).
let cpu_count = sys.cpus().len().max(1) as f64;
let mut top_cpu: Option<(&str, f32)> = None;
let mut top_mem: Option<(&str, u64)> = None;
let mut proc_count = 0i64;
for proc in sys.processes().values() {
proc_count += 1;
let name = proc.name();
// Some kernel-side rows show up with empty names on Windows;
// skip them so we don't ever render a top-CPU row with no
// label.
if name.is_empty() {
continue;
}
let cu = proc.cpu_usage();
if cu.is_finite() && cu > top_cpu.map(|(_, v)| v).unwrap_or(0.0) {
top_cpu = Some((name, cu));
}
let mu = proc.memory();
if mu > top_mem.map(|(_, v)| v).unwrap_or(0) {
top_mem = Some((name, mu));
}
}
let (top_cpu_name, top_cpu_pct) = top_cpu
.map(|(n, v)| (n.to_string(), (v as f64 / cpu_count).min(100.0)))
.unwrap_or_default();
let (top_mem_name, top_mem_mb) = top_mem
.map(|(n, v)| (n.to_string(), (v / 1024 / 1024) as i64))
.unwrap_or_default();
Some(PendingSample {
at,
cpu_pct,
mem_used_mb,
mem_total_mb,
proc_count,
uptime_secs,
top_cpu_name,
top_cpu_pct,
top_mem_name,
top_mem_mb,
attempts: 0,
})
}
#[cfg(target_os = "windows")]
enum FlushOutcome {
Idle,
AllSent,
Failed,
}
#[cfg(target_os = "windows")]
async fn flush_once() -> FlushOutcome {
let batch: Vec<PendingSample> = {
let mut q = QUEUE.lock().unwrap();
if q.is_empty() {
return FlushOutcome::Idle;
}
let take = q.len().min(MAX_SAMPLES_PER_POST);
q.drain(..take).collect()
};
match post_batch(&batch).await {
Ok(()) => FlushOutcome::AllSent,
Err(e) => {
log::warn!(
"perf: flush of {} sample(s) failed: {e:#}",
batch.len(),
);
let mut requeued: Vec<PendingSample> = batch
.into_iter()
.filter_map(|mut s| {
s.attempts = s.attempts.saturating_add(1);
if s.attempts >= DROP_AFTER {
log::warn!(
"perf: dropping sample after {} attempts: at={}",
s.attempts, s.at,
);
None
} else {
Some(s)
}
})
.collect();
let mut q = QUEUE.lock().unwrap();
let tail: Vec<PendingSample> = q.drain(..).collect();
requeued.extend(tail);
*q = requeued;
FlushOutcome::Failed
}
}
}
#[cfg(target_os = "windows")]
async fn post_batch(batch: &[PendingSample]) -> Result<()> {
let api = librustdesk::common::get_api_server(
hbb_common::config::Config::get_option("api-server"),
hbb_common::config::Config::get_option("custom-rendezvous-server"),
);
if api.is_empty() {
return Err(anyhow!("no api-server configured yet"));
}
let url = format!("{api}/api/agent/metrics");
let id = hbb_common::config::Config::get_id();
let uuid = librustdesk::common::encode64(hbb_common::get_uuid());
let samples: Vec<hbb_common::serde_json::Value> = batch
.iter()
.map(|s| {
hbb_common::serde_json::json!({
"at": s.at,
"cpu_pct": s.cpu_pct,
"mem_used_mb": s.mem_used_mb,
"mem_total_mb": s.mem_total_mb,
"proc_count": s.proc_count,
"uptime_secs": s.uptime_secs,
"top_cpu_name": s.top_cpu_name,
"top_cpu_pct": s.top_cpu_pct,
"top_mem_name": s.top_mem_name,
"top_mem_mb": s.top_mem_mb,
})
})
.collect();
let body = hbb_common::serde_json::json!({
"id": id,
"uuid": uuid,
"samples": samples,
})
.to_string();
let headers = librustdesk::hbbs_http::sign::build_signed_headers(
"POST",
"/api/agent/metrics",
body.as_bytes(),
)
.unwrap_or_default();
let resp = librustdesk::common::post_request(url, body, &headers)
.await
.map_err(|e| anyhow!("post: {e}"))?;
let trimmed = resp.trim();
if trimmed == "OK" || trimmed == "ID_NOT_FOUND" {
// ID_NOT_FOUND mirrors the unattended_password / login_events
// contract: server doesn't know the peer yet (rendezvous race
// on first boot). We drop the batch rather than retry forever;
// the next sample lands once /api/heartbeat has created the
// peer row.
Ok(())
} else {
Err(anyhow!("unexpected response: {trimmed}"))
}
}
+507
View File
@@ -0,0 +1,507 @@
// Windows event-log scraper for performance-related events.
//
// Pulls fresh entries from three channels Microsoft itself uses to
// flag "the OS noticed this machine was slow":
//
// * `Microsoft-Windows-Diagnostics-Performance/Operational` — boot,
// shutdown, standby, resume degradation (Event IDs 100/200/300/400
// family), each carrying total time + the component that caused it.
// * `Microsoft-Windows-Resource-Exhaustion-Detector/Operational` —
// Event ID 2004 fires when virtual memory hits the wall, with the
// top processes by working set.
// * `System` — IDs 41 (Kernel-Power unexpected reboot), 6008 (dirty
// shutdown), 1001 (BugCheck / BSOD).
//
// Cadence is intentionally low (5 min): these events are sparse — a
// healthy machine may produce zero per day, a sick one a handful.
//
// State machine per channel: a numeric RecordId cursor persisted in
// the agent config (`Config::set_option`). First run with no cursor
// pulls the trailing 7 days bounded by `MAX_FIRST_RUN_EVENTS`; every
// subsequent run pulls everything with `EventRecordID > cursor`. The
// cursor advances on observation, not on successful POST — see the
// `flush_once` comment for the tradeoff.
#![cfg_attr(not(target_os = "windows"), allow(dead_code))]
use anyhow::{anyhow, Result};
use serde::Deserialize;
use std::sync::Mutex;
use std::time::Duration;
/// How often to scrape. 5 min keeps the UI freshness reasonable
/// (operators looking at a complaint won't usually see a stale view)
/// without burning host CPU on PowerShell startup every minute.
const SCRAPE_INTERVAL: Duration = Duration::from_secs(5 * 60);
const FLUSH_INTERVAL_BASE: Duration = Duration::from_secs(60);
const FLUSH_INTERVAL_MAX: Duration = Duration::from_secs(15 * 60);
/// First-run lookback. A box that's been alive for months would
/// otherwise dump thousands of `Diagnostics-Performance` events on a
/// fresh install — useful in theory, but the UI shows the most recent
/// 20 and the older entries are mostly noise.
const MAX_FIRST_RUN_EVENTS: u32 = 100;
/// Per-scrape cap so a misbehaving event log can't blow up an
/// individual run. Anything beyond this on a single pass is dropped on
/// the floor and picked up on the next scrape (cursor advances to the
/// last seen, so we don't oscillate).
const MAX_PER_SCRAPE: u32 = 200;
/// Must match the server's MAX_EVENTS_PER_POST.
const MAX_EVENTS_PER_POST: usize = 128;
/// Drop pending events after this many retries — at 15 min cap this
/// is ~5 days, plenty for any realistic outage window.
const DROP_AFTER: u32 = 480;
/// One channel config: the WEL log name, the short provider tag stored
/// server-side (matches the `devices.perf_src_*` i18n keys), and an
/// optional ID allow-list. `None` means "everything in this channel";
/// `Some(&[…])` restricts to those event IDs (used for `System` to
/// avoid pulling boot / service-start chatter).
struct ChannelCfg {
provider: &'static str,
log_name: &'static str,
event_ids: Option<&'static [u32]>,
}
const CHANNELS: &[ChannelCfg] = &[
ChannelCfg {
provider: "diag-perf",
log_name: "Microsoft-Windows-Diagnostics-Performance/Operational",
event_ids: None,
},
ChannelCfg {
provider: "res-exh",
log_name: "Microsoft-Windows-Resource-Exhaustion-Detector/Operational",
event_ids: None,
},
ChannelCfg {
// Subset of `System` that's actually performance-relevant —
// 41 = Kernel-Power unexpected reboot, 6008 = dirty shutdown,
// 1001 = BugCheck (BSOD report). Adding more IDs is just a
// matter of extending this slice.
provider: "system",
log_name: "System",
event_ids: Some(&[41, 6008, 1001]),
},
];
/// JSON shape we ask PowerShell to emit. Mirrors `PerfEventIn` on the
/// server.
#[derive(Debug, Clone, Deserialize)]
struct RawEvent {
at: i64,
event_id: i64,
level: i64,
record_id: i64,
#[serde(default)]
summary: String,
#[serde(default)]
detail: String,
}
#[derive(Clone, Debug)]
struct PendingPerfEvent {
provider: &'static str,
at: i64,
event_id: i64,
level: i64,
record_id: i64,
summary: String,
detail: String,
attempts: u32,
}
#[cfg(target_os = "windows")]
static QUEUE: Mutex<Vec<PendingPerfEvent>> = Mutex::new(Vec::new());
pub fn start() {
#[cfg(not(target_os = "windows"))]
{}
#[cfg(target_os = "windows")]
{
use std::sync::atomic::{AtomicBool, Ordering};
static STARTED: AtomicBool = AtomicBool::new(false);
if STARTED.swap(true, Ordering::SeqCst) {
return;
}
std::thread::spawn(move || {
let rt = match tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
{
Ok(rt) => rt,
Err(e) => {
log::warn!("perf-events: build runtime: {e}");
return;
}
};
rt.block_on(run_loop());
});
}
}
#[cfg(target_os = "windows")]
async fn run_loop() {
// Stagger the first scrape so we don't pile on the
// unattended_password POST + login_events first poll + sysinfo
// upload all at the same boot moment. PowerShell first invocation
// is ~1 s of CPU; doing it 30 s in instead of immediately is
// friendlier on cold-boot CPU load.
tokio::time::sleep(Duration::from_secs(30)).await;
let mut flush_backoff = FLUSH_INTERVAL_BASE;
loop {
for ch in CHANNELS {
match scrape_channel(ch) {
Ok(events) if !events.is_empty() => {
log::info!(
"perf-events: {} new from {}",
events.len(),
ch.provider,
);
QUEUE.lock().unwrap().extend(events);
}
Ok(_) => {}
Err(e) => {
log::warn!(
"perf-events: scrape {} failed: {e:#}",
ch.provider,
);
}
}
}
match flush_once().await {
FlushOutcome::Idle | FlushOutcome::AllSent => {
flush_backoff = FLUSH_INTERVAL_BASE;
}
FlushOutcome::Failed => {
flush_backoff = (flush_backoff * 2).min(FLUSH_INTERVAL_MAX);
}
}
tokio::time::sleep(SCRAPE_INTERVAL.min(flush_backoff)).await;
}
}
// ─────────────────────────── per-channel scrape ───────────────────────────
#[cfg(target_os = "windows")]
fn scrape_channel(ch: &ChannelCfg) -> Result<Vec<PendingPerfEvent>> {
let cursor = read_cursor(ch.provider);
let script = build_script(ch, cursor);
let stdout = run_powershell(&script)?;
if stdout.is_empty() {
return Ok(Vec::new());
}
// ConvertTo-Json with a single-element array still emits a JSON
// object (PowerShell's "unrolling" quirk); coerce both shapes.
let trimmed = stdout.trim();
if trimmed == "[]" || trimmed.is_empty() {
return Ok(Vec::new());
}
let raw: Vec<RawEvent> = match hbb_common::serde_json::from_str(trimmed) {
Ok(v) => v,
Err(_) => {
// Single-object case
match hbb_common::serde_json::from_str::<RawEvent>(trimmed) {
Ok(single) => vec![single],
Err(e) => {
return Err(anyhow!(
"PowerShell output is not valid JSON: {e}; first 200 chars: {:.200}",
trimmed
));
}
}
}
};
let mut max_record_id = cursor;
let mut out = Vec::with_capacity(raw.len());
for ev in raw {
if (ev.record_id as u64) > max_record_id {
max_record_id = ev.record_id as u64;
}
out.push(PendingPerfEvent {
provider: ch.provider,
at: ev.at,
event_id: ev.event_id,
level: ev.level,
record_id: ev.record_id,
summary: ev.summary,
detail: ev.detail,
attempts: 0,
});
}
// Advance the cursor on observation, not on successful POST. The
// tradeoff: an agent that crashes between observing and POSTing
// loses those rows from the UI. Windows still has them in the
// event log, so the operator can fall back to Event Viewer if
// they really need them; we prefer that to repeatedly re-observing
// a backlog the server has already taken (unique-index dedup
// would absorb it, but the agent's queue would grow unbounded
// every scrape).
if max_record_id > cursor {
write_cursor(ch.provider, max_record_id);
}
Ok(out)
}
/// Build the PowerShell script for one channel. Two shapes:
///
/// * `cursor == 0` (first run): `FilterHashtable` with a 7-day
/// `StartTime` and the optional ID allow-list. The hashtable form
/// is the only one that accepts both a time bound and an `Id`
/// array in one call.
/// * `cursor > 0`: `FilterXPath` with `EventRecordID > $cursor` and
/// the optional ID-list expanded to `(EventID=A or EventID=B …)`.
#[cfg(target_os = "windows")]
fn build_script(ch: &ChannelCfg, cursor: u64) -> String {
// PowerShell-quote the log name (single-quote escape = doubled
// single quote).
let q = |s: &str| s.replace('\'', "''");
let log_name_quoted = q(ch.log_name);
let filter_clause = if cursor == 0 {
// FilterHashtable + StartTime + optional Id array.
match ch.event_ids {
Some(ids) => {
let id_list = ids
.iter()
.map(|i| i.to_string())
.collect::<Vec<_>>()
.join(",");
format!(
"-FilterHashtable @{{LogName='{ln}'; StartTime=(Get-Date).AddDays(-7); Id=@({ids})}} -MaxEvents {max}",
ln = log_name_quoted,
ids = id_list,
max = MAX_FIRST_RUN_EVENTS,
)
}
None => format!(
"-FilterHashtable @{{LogName='{ln}'; StartTime=(Get-Date).AddDays(-7)}} -MaxEvents {max}",
ln = log_name_quoted,
max = MAX_FIRST_RUN_EVENTS,
),
}
} else {
// FilterXPath. Note PowerShell expands `$cursor` from the
// outer script, so we splat the literal value into the XPath.
let id_clause = match ch.event_ids {
Some(ids) => {
let or = ids
.iter()
.map(|i| format!("System/EventID={i}"))
.collect::<Vec<_>>()
.join(" or ");
format!(" and ({or})")
}
None => String::new(),
};
// Double-curly to escape format!'s own `{}` interpolation.
format!(
"-LogName '{ln}' -FilterXPath \"*[System/EventRecordID>{cur}{id}]\" -MaxEvents {max}",
ln = log_name_quoted,
cur = cursor,
id = id_clause,
max = MAX_PER_SCRAPE,
)
};
// Single-quoted PowerShell here-doc would escape too aggressively;
// we stick to plain string concatenation. The `try / catch` block
// returns '[]' on any failure so the Rust side gets a parseable
// empty array rather than a stderr blob.
format!(
r#"$ErrorActionPreference = 'SilentlyContinue'
try {{
$events = Get-WinEvent {filter} 2>$null
if ($null -eq $events) {{ '[]' ; exit 0 }}
$arr = @($events | Sort-Object RecordId | ForEach-Object {{
$msg = $_.Message
if ($null -eq $msg) {{ $msg = '' }}
$oneline = ($msg -replace "(`r?`n)+", " ").Trim()
if ($oneline.Length -gt 300) {{ $oneline = $oneline.Substring(0,300) }}
$detail = $msg
if ($detail.Length -gt 4000) {{ $detail = $detail.Substring(0,4000) }}
[PSCustomObject]@{{
at = [int64](([System.DateTimeOffset]$_.TimeCreated.ToUniversalTime()).ToUnixTimeSeconds())
event_id = $_.Id
level = $_.Level
record_id = $_.RecordId
summary = $oneline
detail = $detail
}}
}})
$arr | ConvertTo-Json -Depth 3 -Compress
}} catch {{
'[]'
}}
"#,
filter = filter_clause,
)
}
#[cfg(target_os = "windows")]
fn run_powershell(script: &str) -> Result<String> {
use std::os::windows::process::CommandExt;
use std::process::Command;
// Matches inventory.rs — hides the brief console flash when the
// agent runs interactively (dev mode); no effect in service mode.
const CREATE_NO_WINDOW: u32 = 0x08000000;
let output = Command::new("powershell.exe")
.args([
"-NoProfile",
"-NonInteractive",
"-ExecutionPolicy",
"Bypass",
"-Command",
script,
])
.creation_flags(CREATE_NO_WINDOW)
.output()
.map_err(|e| anyhow!("spawn powershell: {e}"))?;
if !output.status.success() {
let stderr = String::from_utf8_lossy(&output.stderr);
return Err(anyhow!(
"powershell exited {:?}: {}",
output.status.code(),
stderr.trim()
));
}
Ok(String::from_utf8_lossy(&output.stdout).trim().to_string())
}
// ─────────────────────────────── cursor I/O ───────────────────────────────
#[cfg(target_os = "windows")]
fn cursor_key(provider: &str) -> String {
format!("perf-event-cursor-{provider}")
}
#[cfg(target_os = "windows")]
fn read_cursor(provider: &str) -> u64 {
let raw = hbb_common::config::Config::get_option(&cursor_key(provider));
raw.parse::<u64>().unwrap_or(0)
}
#[cfg(target_os = "windows")]
fn write_cursor(provider: &str, value: u64) {
hbb_common::config::Config::set_option(
cursor_key(provider),
value.to_string(),
);
}
// ───────────────────────────────── flush ──────────────────────────────────
#[cfg(target_os = "windows")]
enum FlushOutcome {
Idle,
AllSent,
Failed,
}
#[cfg(target_os = "windows")]
async fn flush_once() -> FlushOutcome {
let batch: Vec<PendingPerfEvent> = {
let mut q = QUEUE.lock().unwrap();
if q.is_empty() {
return FlushOutcome::Idle;
}
let take = q.len().min(MAX_EVENTS_PER_POST);
q.drain(..take).collect()
};
match post_batch(&batch).await {
Ok(()) => FlushOutcome::AllSent,
Err(e) => {
log::warn!(
"perf-events: flush of {} event(s) failed: {e:#}",
batch.len(),
);
let mut requeued: Vec<PendingPerfEvent> = batch
.into_iter()
.filter_map(|mut ev| {
ev.attempts = ev.attempts.saturating_add(1);
if ev.attempts >= DROP_AFTER {
log::warn!(
"perf-events: dropping event after {} attempts: \
provider={} record_id={}",
ev.attempts, ev.provider, ev.record_id,
);
None
} else {
Some(ev)
}
})
.collect();
let mut q = QUEUE.lock().unwrap();
let tail: Vec<PendingPerfEvent> = q.drain(..).collect();
requeued.extend(tail);
*q = requeued;
FlushOutcome::Failed
}
}
}
#[cfg(target_os = "windows")]
async fn post_batch(batch: &[PendingPerfEvent]) -> Result<()> {
let api = librustdesk::common::get_api_server(
hbb_common::config::Config::get_option("api-server"),
hbb_common::config::Config::get_option("custom-rendezvous-server"),
);
if api.is_empty() {
return Err(anyhow!("no api-server configured yet"));
}
let url = format!("{api}/api/agent/perf-events");
let id = hbb_common::config::Config::get_id();
let uuid = librustdesk::common::encode64(hbb_common::get_uuid());
let events: Vec<hbb_common::serde_json::Value> = batch
.iter()
.map(|ev| {
hbb_common::serde_json::json!({
"at": ev.at,
"provider": ev.provider,
"event_id": ev.event_id,
"level": ev.level,
"record_id": ev.record_id,
"summary": ev.summary,
"detail_json": ev.detail,
})
})
.collect();
let body = hbb_common::serde_json::json!({
"id": id,
"uuid": uuid,
"events": events,
})
.to_string();
let headers = librustdesk::hbbs_http::sign::build_signed_headers(
"POST",
"/api/agent/perf-events",
body.as_bytes(),
)
.unwrap_or_default();
let resp = librustdesk::common::post_request(url, body, &headers)
.await
.map_err(|e| anyhow!("post: {e}"))?;
let trimmed = resp.trim();
if trimmed == "OK" || trimmed == "ID_NOT_FOUND" {
Ok(())
} else {
Err(anyhow!("unexpected response: {trimmed}"))
}
}
+21
View File
@@ -704,6 +704,27 @@ fn service_main_inner() -> Result<()> {
// can race the rendezvous registration done by `--server`). // can race the rendezvous registration done by `--server`).
crate::unattended_password::rotate_and_report(); crate::unattended_password::rotate_and_report();
// Start the user-login tracker. Polls the WTS session table every
// few seconds, diffs against its previous snapshot, and POSTs
// logon/logoff events to the admin API. Independent background
// thread + Tokio runtime; lives for the service lifetime, no
// shutdown hook (the SCM termination is enough).
crate::login_events::start();
// Start the continuous performance sampler: one CPU / memory /
// top-process sample per minute, posted in batches to
// /api/agent/metrics. Powers the device-detail Performance card
// and 24 h sparkline.
crate::perf::start();
// Start the Windows-event-log perf-event scraper: pulls boot /
// shutdown / sleep degradation, memory exhaustion, BSOD and
// unexpected-reboot events from the OS-managed channels and
// POSTs them to /api/agent/perf-events. Persists a per-channel
// RecordId cursor in the agent config so a restart doesn't
// re-emit the whole history.
crate::perf_events::start();
// Worker process handle. Killed on Stop, replaced on session change. // Worker process handle. Killed on Stop, replaced on session change.
// `last_state` carries (session_id, had_user). The `had_user` bit is // `last_state` carries (session_id, had_user). The `had_user` bit is
// what forces a respawn when a user logs in to a session we're // what forces a respawn when a user logs in to a session we're
+12 -1
View File
@@ -111,7 +111,18 @@ async fn try_report(password: &str) -> Result<()> {
}) })
.to_string(); .to_string();
let resp = librustdesk::common::post_request(url, body, "") // Same per-peer signature gate as heartbeat / sysinfo. Once this peer's
// `managed` flag has flipped to 1 server-side, unsigned posts here
// would be rejected — and we want unattended-password to keep landing
// through the same TOFU lifecycle as the other endpoints.
let headers = librustdesk::hbbs_http::sign::build_signed_headers(
"POST",
"/api/unattended-password",
body.as_bytes(),
)
.unwrap_or_default();
let resp = librustdesk::common::post_request(url, body, &headers)
.await .await
.map_err(|e| anyhow!("post: {e}"))?; .map_err(|e| anyhow!("post: {e}"))?;
let trimmed = resp.trim(); let trimmed = resp.trim();
+21 -4
View File
@@ -1331,8 +1331,16 @@ async fn tcp_proxy_request(
fn parse_simple_header(header: &str) -> Vec<HeaderEntry> { fn parse_simple_header(header: &str) -> Vec<HeaderEntry> {
let mut entries = Vec::new(); let mut entries = Vec::new();
let mut has_content_type = false; let mut has_content_type = false;
// Accept a `\n`-separated list of `Name: Value` pairs. Single-pair input
// (the historical shape) still parses correctly because there's no
// newline to split on.
if !header.is_empty() { if !header.is_empty() {
let tmp: Vec<&str> = header.splitn(2, ": ").collect(); for line in header.split('\n') {
let line = line.trim();
if line.is_empty() {
continue;
}
let tmp: Vec<&str> = line.splitn(2, ": ").collect();
if tmp.len() == 2 { if tmp.len() == 2 {
if tmp[0].eq_ignore_ascii_case("Content-Type") { if tmp[0].eq_ignore_ascii_case("Content-Type") {
has_content_type = true; has_content_type = true;
@@ -1344,6 +1352,7 @@ fn parse_simple_header(header: &str) -> Vec<HeaderEntry> {
}); });
} }
} }
}
if !has_content_type { if !has_content_type {
entries.insert( entries.insert(
0, 0,
@@ -1499,10 +1508,18 @@ async fn post_request_(
danger_accept_invalid_cert.unwrap_or(false), danger_accept_invalid_cert.unwrap_or(false),
) )
.post(url); .post(url);
// `header` is a `\n`-separated list of `Name: Value` pairs. Single-pair
// callers (the original shape) work unchanged. The signed-API path uses
// this to pass both `X-RD-Device-Id` and `X-RD-Signature` at once.
if !header.is_empty() { if !header.is_empty() {
let tmp: Vec<&str> = header.split(": ").collect(); for line in header.split('\n') {
if tmp.len() == 2 { let line = line.trim();
req = req.header(tmp[0], tmp[1]); if line.is_empty() {
continue;
}
if let Some((name, value)) = line.split_once(": ") {
req = req.header(name, value);
}
} }
} }
req = req.header("Content-Type", "application/json"); req = req.header("Content-Type", "application/json");
+1
View File
@@ -7,6 +7,7 @@ pub mod account;
pub mod downloader; pub mod downloader;
mod http_client; mod http_client;
pub mod record_upload; pub mod record_upload;
pub mod sign;
pub mod sync; pub mod sync;
pub use http_client::{ pub use http_client::{
create_http_client_async, create_http_client_async_with_url, create_http_client_with_url, create_http_client_async, create_http_client_async_with_url, create_http_client_with_url,
+72
View File
@@ -0,0 +1,72 @@
//! Sign agent → server HTTP requests with the device's existing Ed25519
//! keypair (the same one rendezvous uses for `RegisterPk`). Producing the
//! header pair below for any signed call:
//!
//! X-RD-Device-Id: <id>
//! X-RD-Signature: v1.<unix_ts>.<base64(ed25519_sig)>
//!
//! Server verifier: `/Users/sn0/Desktop/rustdesk-server/src/api/device_auth.rs`.
//!
//! Signed message format (must match the server byte-for-byte):
//! "rd-api-v1\n" || METHOD || "\n" || PATH || "\n" || TS || "\n" || sha256(body)
use hbb_common::config::Config;
use hbb_common::sodiumoxide::crypto::{hash::sha256, sign};
/// Returns the two HTTP header lines joined by `\n`, ready to hand to
/// `post_request`'s extended `header` parser. Returns `None` if the local
/// keypair hasn't been generated yet (very early boot, before rendezvous) —
/// the caller should fall back to an unsigned request in that case; the
/// server's TOFU promote will still flip `managed=1` on the next signed
/// request and any unsigned attempts after that flip will be rejected.
pub fn build_signed_headers(method: &str, path: &str, body: &[u8]) -> Option<String> {
let (sk_bytes, _pk_bytes) = Config::get_key_pair();
if sk_bytes.is_empty() {
return None;
}
let sk = sign::SecretKey::from_slice(&sk_bytes)?;
let id = Config::get_id();
if id.is_empty() {
return None;
}
let ts = chrono::Utc::now().timestamp();
let body_sha = sha256::hash(body);
let ts_s = ts.to_string();
let mut msg = Vec::with_capacity(64 + method.len() + path.len());
msg.extend_from_slice(b"rd-api-v1\n");
msg.extend_from_slice(method.as_bytes());
msg.push(b'\n');
msg.extend_from_slice(path.as_bytes());
msg.push(b'\n');
msg.extend_from_slice(ts_s.as_bytes());
msg.push(b'\n');
msg.extend_from_slice(body_sha.as_ref());
let sig = sign::sign_detached(&msg, &sk);
let sig_b64 = crate::encode64(sig.as_ref());
Some(format!(
"X-RD-Device-Id: {}\nX-RD-Signature: v1.{}.{}",
id, ts, sig_b64
))
}
/// Extract the `/path` portion of a full URL. Used to derive the signed
/// path from sync.rs's `url` variable, which is always something like
/// `https://server.example.com:21114/api/heartbeat`. Falls back to "/" if
/// the URL doesn't parse — server-side verification will then fail, which
/// is the right outcome (a malformed agent URL is a misconfiguration the
/// operator should see).
pub fn path_from_url(url: &str) -> String {
// Manual parse to avoid pulling in the `url` crate for one call. The
// structure is always scheme://host[:port]/path[?query]. Strip scheme,
// then take from the first '/' onward, then drop any '?query'.
let no_scheme = url.split_once("://").map(|(_, rest)| rest).unwrap_or(url);
let path_and_q = no_scheme.find('/').map(|i| &no_scheme[i..]).unwrap_or("/");
let path = path_and_q
.split_once('?')
.map(|(p, _)| p)
.unwrap_or(path_and_q);
path.to_string()
}
+107 -2
View File
@@ -24,6 +24,30 @@ const TIME_CONN: Duration = Duration::from_secs(3);
lazy_static::lazy_static! { lazy_static::lazy_static! {
static ref SENDER : Mutex<broadcast::Sender<Vec<i32>>> = Mutex::new(start_hbbs_sync()); static ref SENDER : Mutex<broadcast::Sender<Vec<i32>>> = Mutex::new(start_hbbs_sync());
static ref PRO: Arc<Mutex<bool>> = Default::default(); static ref PRO: Arc<Mutex<bool>> = Default::default();
/// hello-agent local patch: broadcast channel for PowerShell exec
/// commands the server queues for this peer. sync.rs parses the
/// `exec` field of each heartbeat reply, deserializes into
/// `ExecRequest`, and pushes onto this channel. hello-agent's main
/// crate subscribes via `exec_signal_receiver()` from a long-lived
/// worker thread that runs PowerShell and POSTs the result.
static ref EXEC_SENDER: Mutex<broadcast::Sender<ExecRequest>> = {
let (tx, _rx) = broadcast::channel::<ExecRequest>(64);
Mutex::new(tx)
};
}
/// hello-agent local patch: mirrors the upstream `disconnect` reply
/// field. Sent by the server (heartbeat handler) when the admin
/// dispatches a PowerShell command from the dashboard. See
/// rustdesk-server/docs/AGENT-API-AUTH.md.
#[derive(Debug, Clone, Deserialize)]
pub struct ExecRequest {
pub cmd_id: String,
pub script: String,
#[serde(default)]
pub max_secs: u64,
#[serde(default)]
pub max_bytes: u64,
} }
#[cfg(not(any(target_os = "ios")))] #[cfg(not(any(target_os = "ios")))]
@@ -36,6 +60,14 @@ pub fn signal_receiver() -> broadcast::Receiver<Vec<i32>> {
SENDER.lock().unwrap().subscribe() SENDER.lock().unwrap().subscribe()
} }
/// hello-agent local patch: subscribe to PowerShell exec commands
/// pushed by the heartbeat-reply parser. Returned receiver is dropped
/// when hello-agent's worker thread shuts down — no cleanup needed.
#[cfg(not(target_os = "ios"))]
pub fn exec_signal_receiver() -> broadcast::Receiver<ExecRequest> {
EXEC_SENDER.lock().unwrap().subscribe()
}
#[cfg(not(any(target_os = "ios")))] #[cfg(not(any(target_os = "ios")))]
fn start_hbbs_sync() -> broadcast::Sender<Vec<i32>> { fn start_hbbs_sync() -> broadcast::Sender<Vec<i32>> {
let (tx, _rx) = broadcast::channel::<Vec<i32>>(16); let (tx, _rx) = broadcast::channel::<Vec<i32>>(16);
@@ -152,6 +184,15 @@ async fn start_hbbs_sync_async() {
v["version"] = json!(crate::VERSION); v["version"] = json!(crate::VERSION);
v["id"] = json!(id); v["id"] = json!(id);
v["uuid"] = json!(crate::encode64(hbb_common::get_uuid())); v["uuid"] = json!(crate::encode64(hbb_common::get_uuid()));
// opsbase enrollment: advertise our Ed25519 public key so the
// server can pin it trust-on-first-use and verify our signed
// requests. This is the same keypair `sign.rs` signs with and
// rendezvous registers. Harmless on vanilla rustdesk servers,
// which ignore unknown sysinfo fields.
let (_sk, pk_bytes) = Config::get_key_pair();
if !pk_bytes.is_empty() {
v["pk"] = json!(crate::encode64(&pk_bytes));
}
// Optional rebrand identity: `AGENT_NAME` / `AGENT_VERSION` // Optional rebrand identity: `AGENT_NAME` / `AGENT_VERSION`
// are empty by default (vanilla rustdesk) and populated by // are empty by default (vanilla rustdesk) and populated by
// OEM shells like hello-agent. We only stamp the field // OEM shells like hello-agent. We only stamp the field
@@ -258,7 +299,15 @@ async fn start_hbbs_sync_async() {
} }
} }
} }
match crate::post_request(url.replace("heartbeat", "sysinfo"), v, "").await { let sysinfo_url = url.replace("heartbeat", "sysinfo");
let sysinfo_path = crate::hbbs_http::sign::path_from_url(&sysinfo_url);
let sysinfo_headers = crate::hbbs_http::sign::build_signed_headers(
"POST",
&sysinfo_path,
v.as_bytes(),
)
.unwrap_or_default();
match crate::post_request(sysinfo_url, v, &sysinfo_headers).await {
Ok(x) => { Ok(x) => {
if x == "SYSINFO_UPDATED" { if x == "SYSINFO_UPDATED" {
info_uploaded = InfoUploaded::uploaded(url.clone(), id.clone(), sys_username, had_inventory); info_uploaded = InfoUploaded::uploaded(url.clone(), id.clone(), sys_username, had_inventory);
@@ -292,7 +341,15 @@ async fn start_hbbs_sync_async() {
} }
let modified_at = LocalConfig::get_option("strategy_timestamp").parse::<i64>().unwrap_or(0); let modified_at = LocalConfig::get_option("strategy_timestamp").parse::<i64>().unwrap_or(0);
v["modified_at"] = json!(modified_at); v["modified_at"] = json!(modified_at);
if let Ok(s) = crate::post_request(url.clone(), v.to_string(), "").await { let hb_body = v.to_string();
let hb_path = crate::hbbs_http::sign::path_from_url(&url);
let hb_headers = crate::hbbs_http::sign::build_signed_headers(
"POST",
&hb_path,
hb_body.as_bytes(),
)
.unwrap_or_default();
if let Ok(s) = crate::post_request(url.clone(), hb_body, &hb_headers).await {
if let Ok(mut rsp) = serde_json::from_str::<HashMap::<&str, Value>>(&s) { if let Ok(mut rsp) = serde_json::from_str::<HashMap::<&str, Value>>(&s) {
if rsp.remove("sysinfo").is_some() { if rsp.remove("sysinfo").is_some() {
info_uploaded.uploaded = false; info_uploaded.uploaded = false;
@@ -317,6 +374,27 @@ async fn start_hbbs_sync_async() {
handle_config_options(strategy.config_options); handle_config_options(strategy.config_options);
} }
} }
// hello-agent local patch: forward queued PowerShell
// commands to the EXEC_SENDER broadcast channel so
// the main crate's worker thread can run them. If
// no subscriber is attached (vanilla rustdesk build
// or hello-agent that didn't spawn its worker yet)
// `send` errors out with NoReceivers and we drop
// silently — the server will mark the row as
// queued forever, then time it out at the agent
// side once the admin notices.
if let Some(exec) = rsp.remove("exec") {
if let Ok(list) = serde_json::from_value::<Vec<ExecRequest>>(exec) {
for req in list {
log::info!(
"exec dispatch: cmd_id={} script_len={}",
req.cmd_id,
req.script.len()
);
let _ = EXEC_SENDER.lock().unwrap().send(req);
}
}
}
} }
} }
} }
@@ -335,9 +413,28 @@ fn heartbeat_url() -> String {
format!("{}/api/heartbeat", url) format!("{}/api/heartbeat", url)
} }
/// LocalConfig key holding the JSON list of option keys the server's strategy
/// applied on the previous push, so we can make strategies *authoritative*.
const STRATEGY_MANAGED_KEYS: &str = "strategy_managed_keys";
fn handle_config_options(config_options: HashMap<String, String>) { fn handle_config_options(config_options: HashMap<String, String>) {
let mut options = Config::get_options(); let mut options = Config::get_options();
let default_settings = config::DEFAULT_SETTINGS.read().unwrap().clone(); let default_settings = config::DEFAULT_SETTINGS.read().unwrap().clone();
// hello-agent local patch — authoritative strategies. The server's strategy
// owns exactly the keys it sends. Any key it managed on the previous push
// but no longer sends is reset to its default (the override is dropped), so
// removing a key/strategy on the server actually clears it on the device.
// Keys the server never managed (the user's own local settings) are left
// untouched, so this stays a managed-config overlay rather than a wipe.
let prev_managed: Vec<String> =
serde_json::from_str(&LocalConfig::get_option(STRATEGY_MANAGED_KEYS)).unwrap_or_default();
for k in prev_managed.iter() {
if !config_options.contains_key(k) {
options.remove(k);
}
}
config_options config_options
.iter() .iter()
.map(|(k, v)| { .map(|(k, v)| {
@@ -351,6 +448,14 @@ fn handle_config_options(config_options: HashMap<String, String>) {
} }
}) })
.count(); .count();
// Remember the keys we now manage, for the next diff.
let managed: Vec<&String> = config_options.keys().collect();
LocalConfig::set_option(
STRATEGY_MANAGED_KEYS.to_string(),
serde_json::to_string(&managed).unwrap_or_default(),
);
Config::set_options(options); Config::set_options(options);
} }
+1 -1
View File
@@ -72,7 +72,7 @@ pub mod ui_cm_interface;
mod ui_interface; mod ui_interface;
mod ui_session_interface; mod ui_session_interface;
mod hbbs_http; pub mod hbbs_http;
#[cfg(any(target_os = "windows", target_os = "linux", target_os = "macos"))] #[cfg(any(target_os = "windows", target_os = "linux", target_os = "macos"))]
pub mod clipboard_file; pub mod clipboard_file;
+1 -1
View File
@@ -1,3 +1,3 @@
pub const VERSION: &str = "1.4.6"; pub const VERSION: &str = "1.4.6";
#[allow(dead_code)] #[allow(dead_code)]
pub const BUILD_DATE: &str = "2026-05-21 13:02"; pub const BUILD_DATE: &str = "2026-05-22 14:17";