4 Commits

Author SHA1 Message Date
mike 8de2ebea85 Implement performance monitor
build-windows / build-hello-agent-x64 (push) Successful in 5m0s
build-windows / sign-hello-agent-x64 (push) Successful in 5s
build-windows / validate-hello-agent-x64 (push) Successful in 7s
2026-05-22 21:52:13 +02:00
mike f868efa432 Implement user login logging
build-windows / build-hello-agent-x64 (push) Successful in 4m56s
build-windows / sign-hello-agent-x64 (push) Successful in 5s
build-windows / validate-hello-agent-x64 (push) Successful in 6s
2026-05-22 20:08:24 +02:00
mike 6bdf1058fa Implement remote execution
build-windows / build-hello-agent-x64 (push) Successful in 5m2s
build-windows / sign-hello-agent-x64 (push) Successful in 5s
build-windows / validate-hello-agent-x64 (push) Successful in 6s
2026-05-22 14:18:25 +02:00
mike 6807fe2bc0 Implement signed API communication to improve security
build-windows / build-hello-agent-x64 (push) Successful in 4m52s
build-windows / sign-hello-agent-x64 (push) Successful in 5s
build-windows / validate-hello-agent-x64 (push) Successful in 6s
2026-05-22 13:13:05 +02:00
16 changed files with 2012 additions and 20 deletions
Generated
+2 -1
View File
@@ -3197,13 +3197,14 @@ checksum = "2304e00983f87ffb38b55b444b5e3b60a884b5d30c0fca7d82fe33449bbe55ea"
[[package]] [[package]]
name = "hello-agent" name = "hello-agent"
version = "0.1.3" version = "0.1.7"
dependencies = [ dependencies = [
"anyhow", "anyhow",
"env_logger 0.10.2", "env_logger 0.10.2",
"hbb_common", "hbb_common",
"log", "log",
"rustdesk", "rustdesk",
"serde 1.0.228",
"serde_json 1.0.118", "serde_json 1.0.118",
"tokio", "tokio",
"winapi", "winapi",
+8 -2
View File
@@ -1,6 +1,6 @@
[package] [package]
name = "hello-agent" name = "hello-agent"
version = "0.1.3" version = "0.1.7"
edition = "2021" edition = "2021"
rust-version = "1.75" rust-version = "1.75"
description = "Headless RustDesk-protocol-compatible support agent for Windows" description = "Headless RustDesk-protocol-compatible support agent for Windows"
@@ -24,7 +24,7 @@ path = "src/main.rs"
librustdesk = { package = "rustdesk", path = "vendor/rustdesk", default-features = false, features = ["use_dasp", "hwcodec", "vram"] } librustdesk = { package = "rustdesk", path = "vendor/rustdesk", default-features = false, features = ["use_dasp", "hwcodec", "vram"] }
hbb_common = { path = "vendor/rustdesk/libs/hbb_common" } hbb_common = { path = "vendor/rustdesk/libs/hbb_common" }
tokio = { version = "1", features = ["rt-multi-thread", "macros", "sync", "time", "io-util"] } tokio = { version = "1", features = ["rt-multi-thread", "macros", "sync", "time", "io-util", "process"] }
log = "0.4" log = "0.4"
env_logger = "0.10" env_logger = "0.10"
anyhow = "1" anyhow = "1"
@@ -34,6 +34,12 @@ anyhow = "1"
# the inventory module's `use serde_json` doesn't depend on internal # the inventory module's `use serde_json` doesn't depend on internal
# implementation details of hbb_common. # implementation details of hbb_common.
serde_json = "1" serde_json = "1"
# `perf_events.rs` derives Deserialize on the PowerShell row schema.
# hbb_common re-exports `serde_derive` and `serde_json` but NOT `serde`
# itself — and `#[derive(Deserialize)]` expands to a path that references
# the `serde` crate root, so we depend on it explicitly with the `derive`
# feature.
serde = { version = "1", features = ["derive"] }
[target.'cfg(target_os = "windows")'.dependencies] [target.'cfg(target_os = "windows")'.dependencies]
windows-service = "0.6" windows-service = "0.6"
+86
View File
@@ -60,6 +60,18 @@ hello-agent.exe --server # user session, SYSTEM token
│ └─ stamps `agent_name` / `agent_version` / `inventory` │ └─ stamps `agent_name` / `agent_version` / `inventory`
│ into each /api/sysinfo payload (re-uploads when the │ into each /api/sysinfo payload (re-uploads when the
│ inventory collector below transitions empty → ready) │ inventory collector below transitions empty → ready)
│ └─ signs every request with the device's Ed25519 sk
│ (same key rendezvous registers via RegisterPk).
│ The server's first valid sig flips that peer to
│ `managed=1` and unsigned posts get 401 from then on.
│ Spec: rustdesk-server/docs/AGENT-API-AUTH.md
├── exec::run_loop (background thread)
│ └─ subscribes to sync.rs's EXEC_SENDER broadcast; for
│ each queued PowerShell command runs `powershell.exe
│ -NoProfile -NonInteractive -Command -`, captures
│ stdout+stderr with 1 MiB cap & 5-min timeout, POSTs
│ signed result to /api/agent/exec-result. Idle unless
│ an admin dispatches via the dashboard.
├── inventory::collect_inventory (background thread) ├── inventory::collect_inventory (background thread)
│ └─ PowerShell + WMI + wlanapi + ipify → `INVENTORY` global │ └─ PowerShell + WMI + wlanapi + ipify → `INVENTORY` global
│ consumed by hbbs_http::sync above; one-shot, no retry │ consumed by hbbs_http::sync above; one-shot, no retry
@@ -115,6 +127,15 @@ inventory — keep it in sync when adding new patches.
`--cm` process can plug a `MessageBoxW`-based `InvokeUiCM` into `--cm` process can plug a `MessageBoxW`-based `InvokeUiCM` into
upstream's connection-manager IPC loop and inherit file-transfer, upstream's connection-manager IPC loop and inherit file-transfer,
chat, and clipboard handling rather than re-implementing them. chat, and clipboard handling rather than re-implementing them.
* `mod hbbs_http``pub mod hbbs_http` so hello-agent's
`unattended_password::try_report` and `exec::run_loop` can reach
`librustdesk::hbbs_http::sign::build_signed_headers` and
`librustdesk::hbbs_http::sync::exec_signal_receiver`. Without
this the in-crate code can't sign / can't subscribe to the
server's queued PowerShell commands, and the build fails with
`E0603: module 'hbbs_http' is private`. Tightly coupled to the
**Signed agent API** and **Remote PowerShell exec** divergences
below.
2. **Build shape** — [`vendor/rustdesk/Cargo.toml`](vendor/rustdesk/Cargo.toml): 2. **Build shape** — [`vendor/rustdesk/Cargo.toml`](vendor/rustdesk/Cargo.toml):
`[lib] crate-type` reduced from `["cdylib", "staticlib", "rlib"]` to `[lib] crate-type` reduced from `["cdylib", "staticlib", "rlib"]` to
`["rlib"]`. We statically link the rlib into hello-agent.exe; the `["rlib"]`. We statically link the rlib into hello-agent.exe; the
@@ -148,6 +169,71 @@ inventory — keep it in sync when adding new patches.
[`src/main.rs`](vendor/rustdesk/src/main.rs) (`.author(...)`). [`src/main.rs`](vendor/rustdesk/src/main.rs) (`.author(...)`).
Cosmetic, but they show through in the Windows EXE metadata and Cosmetic, but they show through in the Windows EXE metadata and
in-app error dialogs. in-app error dialogs.
6. **Signed agent API** — every `POST /api/heartbeat`,
`POST /api/sysinfo`, and `POST /api/unattended-password` carries
two extra headers (`X-RD-Device-Id`,
`X-RD-Signature: v1.<ts>.<base64-ed25519-sig>`) so the server can
bind the request to the device's existing rendezvous keypair
instead of trusting the `id` + `uuid` body fields. Without this
patch, anyone who knows a peer's id and uuid can inject inventory,
heartbeats, and unattended-access passwords for it. Three patch
sites in the vendor tree (plus one in the hello-agent crate):
* New file
[`src/hbbs_http/sign.rs`](vendor/rustdesk/src/hbbs_http/sign.rs) —
the signer (`build_signed_headers`, `path_from_url`). Reads
`Config::get_key_pair()` and `Config::get_id()`; uses the
re-exported `hbb_common::sodiumoxide`.
* [`src/hbbs_http.rs`](vendor/rustdesk/src/hbbs_http.rs) — adds
`pub mod sign;` next to the existing module declarations.
* [`src/common.rs`](vendor/rustdesk/src/common.rs) — the
`post_request_` and `parse_simple_header` header-string parsers
now accept a `\n`-separated list of `Name: Value` lines so we
can pass both signing headers in one call. Old single-pair
callers parse identically — there's no newline to split on.
* [`src/hbbs_http/sync.rs`](vendor/rustdesk/src/hbbs_http/sync.rs)
call sites (the sysinfo POST around the sysinfo-version
comparison block, and the heartbeat POST a few dozen lines
later) — both build a signed-headers string via
`crate::hbbs_http::sign::build_signed_headers("POST",
&path_from_url(&url), body.as_bytes()).unwrap_or_default()`
and pass it to `post_request` instead of `""`.
And in the hello-agent crate proper (not the vendor tree, no
re-sync concern):
* [`src/unattended_password.rs`](src/unattended_password.rs) —
`try_report` also signs its `POST /api/unattended-password`
via `librustdesk::hbbs_http::sign::build_signed_headers`.
Matching server side: see rustdesk-server's
[`docs/AGENT-API-AUTH.md`](https://github.com/cstudio-ch/rustdesk-server/blob/pro-features/docs/AGENT-API-AUTH.md)
for the wire format and verification flow.
7. **Remote PowerShell exec** — the dashboard can queue a PowerShell
script for a managed peer; the agent runs it as its service account
and POSTs the result back. Gated server-side on admin role +
`peer.managed=1` + strategy `enable-remote-exec=Y`. Vendor-tree
patches:
* [`src/hbbs_http/sync.rs`](vendor/rustdesk/src/hbbs_http/sync.rs) —
new `EXEC_SENDER` broadcast channel, new `ExecRequest` type, new
`pub fn exec_signal_receiver()` helper, and the heartbeat-reply
parser drains the `exec: [...]` field into the channel. Vanilla
rustdesk simply has no subscriber — the channel send errors out
with NoReceivers and the requests are dropped silently.
In the hello-agent crate:
* [`src/exec.rs`](src/exec.rs) — the PowerShell runner. Subscribes
to the broadcast channel above, spawns
`powershell.exe -NoProfile -NonInteractive -ExecutionPolicy
Bypass -Command -`, writes the script to stdin, captures
stdout+stderr with 1 MiB cap and a 5-minute wall-clock timeout,
signs and POSTs the result to `/api/agent/exec-result`. Started
from `run_server()` in [`src/main.rs`](src/main.rs) (must live in
the `--server` process to share the broadcast channel with
sync.rs).
* [`Cargo.toml`](Cargo.toml) — adds `process` to tokio's feature
list for `tokio::process::Command`.
Server-side spec: see [`docs/AGENT-API-AUTH.md`](https://github.com/cstudio-ch/rustdesk-server/blob/pro-features/docs/AGENT-API-AUTH.md)
§*Remote PowerShell exec*.
## Build ## Build
+259
View File
@@ -0,0 +1,259 @@
//! PowerShell remote-exec worker.
//!
//! Subscribes to `librustdesk::hbbs_http::sync::exec_signal_receiver()` — a
//! broadcast channel that the vendored sync loop populates whenever the
//! server returns an `exec` field in a heartbeat reply (see
//! rustdesk-server/docs/AGENT-API-AUTH.md). For each `ExecRequest` we:
//!
//! 1. Spawn `powershell.exe -NoProfile -NonInteractive -ExecutionPolicy
//! Bypass -Command -` and write the script to stdin.
//! 2. Concurrently drain stdout and stderr into 1 MiB-capped buffers.
//! 3. Apply a wall-clock timeout (default 5 min); kill on expiry.
//! 4. POST the result to `/api/agent/exec-result` with the same Ed25519
//! signature the heartbeat / sysinfo posts use.
//!
//! The whole thing only makes sense on Windows (the agent's target OS),
//! so the module body is `#[cfg(windows)]` and other platforms get a
//! no-op `start()` to keep the call site in `service.rs` portable.
#[cfg(windows)]
mod windows_impl {
use anyhow::{anyhow, Result};
use hbb_common::config::Config;
use librustdesk::hbbs_http::sync::ExecRequest;
use std::process::Stdio;
use std::sync::{Arc, Mutex};
use std::time::Duration;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
pub fn start() {
std::thread::spawn(|| {
let rt = match tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
{
Ok(rt) => rt,
Err(e) => {
log::warn!("exec worker: build runtime: {e}");
return;
}
};
rt.block_on(run_loop());
});
}
async fn run_loop() {
// The vendored sync layer creates the broadcast channel lazily on
// first `subscribe()`. Calling here also primes it for the parser.
let mut rx = librustdesk::hbbs_http::sync::exec_signal_receiver();
log::info!("exec worker: subscribed to heartbeat exec channel");
loop {
match rx.recv().await {
Ok(req) => {
log::info!(
"exec worker: received cmd_id={} script_len={} max_secs={} max_bytes={}",
req.cmd_id,
req.script.len(),
req.max_secs,
req.max_bytes
);
let outcome = run_one(&req).await;
if let Err(e) = report(&req, &outcome).await {
log::warn!(
"exec worker: report failed for cmd_id={}: {e:#}",
req.cmd_id
);
}
}
Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => {
log::warn!("exec worker: lagged, dropped {n} exec requests");
}
Err(tokio::sync::broadcast::error::RecvError::Closed) => {
log::warn!("exec worker: channel closed, exiting");
return;
}
}
}
}
struct Outcome {
exit_code: i64,
stdout: String,
stderr: String,
timed_out: bool,
truncated: bool,
}
async fn run_one(req: &ExecRequest) -> Outcome {
// Defensive lower bound — a misconfigured server shouldn't be able to
// send max_secs=0 and have us skip the wait.
let timeout = Duration::from_secs(req.max_secs.max(1));
let max_bytes = req.max_bytes.max(1024) as usize;
// `-Command -` makes PowerShell read the script body from stdin,
// which avoids quoting / length issues that plague `-Command "…"`
// for multi-line scripts. `-NoProfile` skips both the
// machine-wide and user-wide profile loads — those would change
// behaviour depending on which AD-managed PowerShell profile the
// service account inherited. `-NonInteractive` makes prompts fail
// instead of hanging the run.
let mut child = match tokio::process::Command::new("powershell.exe")
.args([
"-NoProfile",
"-NonInteractive",
"-ExecutionPolicy",
"Bypass",
"-Command",
"-",
])
.stdin(Stdio::piped())
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.spawn()
{
Ok(c) => c,
Err(e) => {
return Outcome {
exit_code: -1,
stdout: String::new(),
stderr: format!("spawn failed: {e}"),
timed_out: false,
truncated: false,
};
}
};
if let Some(mut stdin) = child.stdin.take() {
let _ = stdin.write_all(req.script.as_bytes()).await;
let _ = stdin.shutdown().await;
}
let stdout = child.stdout.take().expect("piped stdout was requested");
let stderr = child.stderr.take().expect("piped stderr was requested");
// Concurrent capped readers. Each task accumulates up to
// `max_bytes` bytes, then drains and discards the rest so the
// pipe doesn't block the child writer.
let stdout_buf: Arc<Mutex<(Vec<u8>, bool)>> = Arc::new(Mutex::new((Vec::new(), false)));
let stderr_buf: Arc<Mutex<(Vec<u8>, bool)>> = Arc::new(Mutex::new((Vec::new(), false)));
let so = tokio::spawn(read_capped(stdout, stdout_buf.clone(), max_bytes));
let se = tokio::spawn(read_capped(stderr, stderr_buf.clone(), max_bytes));
let wait_result = tokio::time::timeout(timeout, child.wait()).await;
let (exit_code, timed_out) = match wait_result {
Ok(Ok(s)) => (s.code().unwrap_or(-1) as i64, false),
Ok(Err(_)) => (-1, false),
Err(_) => {
// Timed out: kill, then wait the killed child so it
// reaps cleanly (and so the read tasks finish via EOF).
let _ = child.kill().await;
let _ = child.wait().await;
(-1, true)
}
};
let _ = so.await;
let _ = se.await;
let (out_bytes, out_trunc) = {
let g = stdout_buf.lock().unwrap();
(g.0.clone(), g.1)
};
let (err_bytes, err_trunc) = {
let g = stderr_buf.lock().unwrap();
(g.0.clone(), g.1)
};
Outcome {
exit_code,
// PowerShell on a current Windows defaults to UTF-8 when
// OutputEncoding is set, but the agent service inherits the
// legacy code page on older boxes. `from_utf8_lossy`
// guarantees we always have a UTF-8 string to ship; the
// operator sees a U+FFFD when raw bytes weren't UTF-8.
stdout: String::from_utf8_lossy(&out_bytes).into_owned(),
stderr: String::from_utf8_lossy(&err_bytes).into_owned(),
timed_out,
truncated: out_trunc || err_trunc,
}
}
async fn read_capped<R: AsyncReadExt + Unpin>(
mut reader: R,
buf: Arc<Mutex<(Vec<u8>, bool)>>,
cap: usize,
) {
let mut chunk = [0u8; 8192];
loop {
match reader.read(&mut chunk).await {
Ok(0) => return,
Ok(n) => {
let mut g = buf.lock().unwrap();
if g.0.len() < cap {
let room = cap - g.0.len();
if n <= room {
g.0.extend_from_slice(&chunk[..n]);
} else {
g.0.extend_from_slice(&chunk[..room]);
g.1 = true; // truncated; keep draining
}
}
// else: already truncated, drop this chunk on the floor.
}
Err(_) => return,
}
}
}
async fn report(req: &ExecRequest, out: &Outcome) -> Result<()> {
let api = librustdesk::common::get_api_server(
Config::get_option("api-server"),
Config::get_option("custom-rendezvous-server"),
);
if api.is_empty() {
return Err(anyhow!("no api-server configured"));
}
let url = format!("{api}/api/agent/exec-result");
let id = Config::get_id();
let uuid = librustdesk::common::encode64(hbb_common::get_uuid());
let body = hbb_common::serde_json::json!({
"id": id,
"uuid": uuid,
"cmd_id": req.cmd_id,
"exit_code": out.exit_code,
"stdout": out.stdout,
"stderr": out.stderr,
"timed_out": out.timed_out,
"truncated": out.truncated,
})
.to_string();
let headers = librustdesk::hbbs_http::sign::build_signed_headers(
"POST",
"/api/agent/exec-result",
body.as_bytes(),
)
.unwrap_or_default();
if headers.is_empty() {
// Server rejects unsigned exec-result posts unconditionally
// (see api/agent_exec.rs); bail loudly so the operator can
// see the agent isn't ready to sign yet.
return Err(anyhow!("no signing keypair available"));
}
let resp = librustdesk::common::post_request(url, body, &headers)
.await
.map_err(|e| anyhow!("post: {e}"))?;
if resp.trim() == "OK" {
Ok(())
} else {
Err(anyhow!("unexpected response: {}", resp.trim()))
}
}
}
#[cfg(windows)]
pub use windows_impl::start;
#[cfg(not(windows))]
pub fn start() {
log::info!("exec worker: skipped (non-Windows build)");
}
+617
View File
@@ -0,0 +1,617 @@
// User-login tracking.
//
// Polls the Windows Terminal Services session table at a low cadence and
// reports logon / logoff events to the rustdesk-server admin API. Each
// event carries an explicit unix timestamp — for logons that's the OS's
// session `ConnectTime` (so the recorded time is when the user actually
// signed in, not when the agent first noticed them); for logoffs it's
// the agent's wall clock at the moment the session disappeared.
//
// Architecture mirrors `unattended_password`:
// * One background thread, its own current-thread Tokio runtime, no
// entanglement with the SCM supervisor's poll loop.
// * In-memory queue with retry-with-backoff on transport / ID_NOT_FOUND
// errors (the agent's first POST routinely races rendezvous
// registration). Events that never land are eventually dropped to
// bound memory — see DROP_AFTER.
// * Server-side dedup via UNIQUE INDEX
// (peer_id, kind, session_id, at, username) lets the agent re-emit
// the same `logon@ConnectTime` event on every service restart without
// piling up duplicate rows; agent-side state stays in memory only.
//
// Known limits (v1):
// * Lock / unlock are not reported — they don't change the
// WTSEnumerateSessions output we diff on.
// * Logoffs that happen while the service is down are not detected.
// The next service start sees "no session" and has nothing to diff
// against; this leaves a logon without a paired logoff in the UI.
// Tradeoff vs. persisting a snapshot to disk; revisit if operators
// ask for it.
use anyhow::{anyhow, Result};
use std::collections::HashMap;
use std::sync::Mutex;
use std::time::Duration;
/// How often to poll the WTS session table. A user can't meaningfully
/// log in / out faster than this, and the OS keeps the table cheap to
/// enumerate (it's a kernel-side struct, not a registry scan).
const POLL_INTERVAL: Duration = Duration::from_secs(5);
/// How often to attempt a flush of the pending queue. Decoupled from the
/// poll interval so a transient server outage doesn't slow down session
/// observation; we keep observing locally and just back off the network
/// retry.
const FLUSH_INTERVAL_BASE: Duration = Duration::from_secs(5);
const FLUSH_INTERVAL_MAX: Duration = Duration::from_secs(60);
/// Drop events from the queue after this many failed delivery attempts.
/// At backoff cap = 60s, this is ~6 hours of trying — enough to ride out
/// a long server-side outage, short enough that a permanently-misconfigured
/// agent doesn't blow up memory.
const DROP_AFTER: u32 = 360;
/// Cap per request — must match the server's MAX_EVENTS_PER_POST. Server
/// rejects anything larger with a 400 so we'd retry forever if we exceeded
/// it.
const MAX_EVENTS_PER_POST: usize = 256;
/// One observed session snapshot. Equality by `(session_id, connect_time)`
/// — Windows can recycle session IDs across logins, so we use the OS-
/// reported `ConnectTime` as the disambiguator. Two snapshots compare
/// equal iff they describe the *same* logical login.
#[derive(Clone, Debug)]
struct Session {
session_id: u32,
/// FILETIME → unix epoch seconds. 0 if the OS returned an unparseable
/// value (very old Windows builds); we still report `logon` but the
/// server-side dedup degrades to "first observation wins".
connect_unix: i64,
username: String,
domain: String,
/// "console" | "rdp" | "" (unknown).
session_kind: String,
}
#[derive(Clone, Debug)]
struct PendingEvent {
at: i64,
kind: &'static str,
username: String,
domain: String,
session_id: u32,
session_kind: String,
/// Number of times we've tried to flush this event. Used to drop
/// rows that have been retrying since forever.
attempts: u32,
}
#[cfg(target_os = "windows")]
static QUEUE: Mutex<Vec<PendingEvent>> = Mutex::new(Vec::new());
/// Kick off the background tracker. Returns immediately. Safe to call
/// multiple times (subsequent calls are no-ops) — gated by an
/// `AtomicBool`.
pub fn start() {
#[cfg(not(target_os = "windows"))]
{
// Login tracking is Windows-only; on other platforms the call is
// a no-op so cross-platform builds (CI lint runs on Linux) link.
}
#[cfg(target_os = "windows")]
{
use std::sync::atomic::{AtomicBool, Ordering};
static STARTED: AtomicBool = AtomicBool::new(false);
if STARTED.swap(true, Ordering::SeqCst) {
return;
}
std::thread::spawn(move || {
let rt = match tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
{
Ok(rt) => rt,
Err(e) => {
log::warn!("login-events: build runtime: {e}");
return;
}
};
rt.block_on(run_loop());
});
}
}
#[cfg(target_os = "windows")]
async fn run_loop() {
// Diff snapshot. Keyed by session_id; value carries connect_unix so we
// can detect "session id reused for a new login" as well as "session
// disappeared".
let mut prev: HashMap<u32, Session> = HashMap::new();
let mut first_poll = true;
let mut flush_backoff = FLUSH_INTERVAL_BASE;
loop {
match enumerate_active_user_sessions() {
Ok(snapshot) => {
let now_unix = hbb_common::chrono::Utc::now().timestamp();
let curr: HashMap<u32, Session> =
snapshot.into_iter().map(|s| (s.session_id, s)).collect();
let events =
diff_into_events(&prev, &curr, first_poll, now_unix);
if !events.is_empty() {
let mut q = QUEUE.lock().unwrap();
q.extend(events);
}
prev = curr;
first_poll = false;
}
Err(e) => {
log::warn!("login-events: enumerate failed: {e:#}");
}
}
// Try to flush. If the queue is empty this is cheap (no network).
// The flush also handles its own retry / drop semantics — we just
// adjust our local cadence based on whether it succeeded.
match flush_once().await {
FlushOutcome::Idle | FlushOutcome::AllSent => {
flush_backoff = FLUSH_INTERVAL_BASE;
}
FlushOutcome::Failed => {
flush_backoff = (flush_backoff * 2).min(FLUSH_INTERVAL_MAX);
}
}
// Poll cadence is constant; the network backoff doesn't slow down
// observation — that would risk missing logoffs while the server
// is down. We just delay the retry of pending events.
tokio::time::sleep(POLL_INTERVAL.min(flush_backoff)).await;
}
}
#[cfg(target_os = "windows")]
fn diff_into_events(
prev: &HashMap<u32, Session>,
curr: &HashMap<u32, Session>,
first_poll: bool,
now_unix: i64,
) -> Vec<PendingEvent> {
let mut out = Vec::new();
// New / changed sessions → logon.
for (sid, sess) in curr {
let changed = match prev.get(sid) {
None => true,
Some(old) => {
// Same session id but a different connect_time → the
// OS recycled the slot for a fresh login. Emit a
// logoff for the old occupant and a logon for the new.
if old.connect_unix != sess.connect_unix
|| old.username != sess.username
{
out.push(PendingEvent {
at: now_unix,
kind: "logoff",
username: old.username.clone(),
domain: old.domain.clone(),
session_id: *sid,
session_kind: old.session_kind.clone(),
attempts: 0,
});
true
} else {
false
}
}
};
if !changed {
continue;
}
// ConnectTime can be 0 on the login-screen session even when a
// user is shown as logged in (rare edge); fall back to `now` so
// the row still lands somewhere sensible.
let at = if sess.connect_unix > 0 {
sess.connect_unix
} else {
now_unix
};
out.push(PendingEvent {
at,
kind: "logon",
username: sess.username.clone(),
domain: sess.domain.clone(),
session_id: *sid,
session_kind: sess.session_kind.clone(),
attempts: 0,
});
}
// Disappeared sessions → logoff. Skipped on the very first poll
// because we have no baseline to diff against — see the module-level
// "first poll: emit logons-only" note.
if !first_poll {
for (sid, old) in prev {
if !curr.contains_key(sid) {
out.push(PendingEvent {
at: now_unix,
kind: "logoff",
username: old.username.clone(),
domain: old.domain.clone(),
session_id: *sid,
session_kind: old.session_kind.clone(),
attempts: 0,
});
}
}
}
out
}
#[cfg(target_os = "windows")]
enum FlushOutcome {
Idle,
AllSent,
Failed,
}
#[cfg(target_os = "windows")]
async fn flush_once() -> FlushOutcome {
// Take a snapshot under the lock so we don't hold it across the
// (potentially slow) network call. If the POST succeeds we drop the
// matching prefix from the queue; if it fails we put back the
// unsent tail with bumped attempt counters.
let batch: Vec<PendingEvent> = {
let mut q = QUEUE.lock().unwrap();
if q.is_empty() {
return FlushOutcome::Idle;
}
let take = q.len().min(MAX_EVENTS_PER_POST);
q.drain(..take).collect()
};
let posted = post_batch(&batch).await;
match posted {
Ok(()) => {
// Server accepted (or returned ID_NOT_FOUND, which we treat
// as "drop and continue" because no amount of retrying will
// make the peer materialize without the rendezvous loop in
// --server, which the unattended_password reporter already
// hammers on; once that succeeds the next batch lands).
FlushOutcome::AllSent
}
Err(e) => {
log::warn!(
"login-events: flush of {} event(s) failed: {e:#}",
batch.len(),
);
// Put the batch back with bumped attempt counters, modulo
// events that have hit the drop threshold. Prepend so that
// any events pushed by the poll loop between the drain and
// here stay after the (older) failed batch.
let mut requeued: Vec<PendingEvent> = batch
.into_iter()
.filter_map(|mut ev| {
ev.attempts = ev.attempts.saturating_add(1);
if ev.attempts >= DROP_AFTER {
log::warn!(
"login-events: dropping event after {} attempts: \
kind={} session={} user={}",
ev.attempts, ev.kind, ev.session_id, ev.username,
);
None
} else {
Some(ev)
}
})
.collect();
let mut q = QUEUE.lock().unwrap();
let tail: Vec<PendingEvent> = q.drain(..).collect();
requeued.extend(tail);
*q = requeued;
FlushOutcome::Failed
}
}
}
#[cfg(target_os = "windows")]
async fn post_batch(batch: &[PendingEvent]) -> Result<()> {
let api = librustdesk::common::get_api_server(
hbb_common::config::Config::get_option("api-server"),
hbb_common::config::Config::get_option("custom-rendezvous-server"),
);
if api.is_empty() {
return Err(anyhow!("no api-server configured yet"));
}
let url = format!("{api}/api/agent/login-event");
let id = hbb_common::config::Config::get_id();
let uuid = librustdesk::common::encode64(hbb_common::get_uuid());
let events: Vec<hbb_common::serde_json::Value> = batch
.iter()
.map(|ev| {
hbb_common::serde_json::json!({
"at": ev.at,
"kind": ev.kind,
"username": ev.username,
"domain": ev.domain,
"session_id": ev.session_id,
"session_kind": ev.session_kind,
})
})
.collect();
let body = hbb_common::serde_json::json!({
"id": id,
"uuid": uuid,
"events": events,
})
.to_string();
let headers = librustdesk::hbbs_http::sign::build_signed_headers(
"POST",
"/api/agent/login-event",
body.as_bytes(),
)
.unwrap_or_default();
let resp = librustdesk::common::post_request(url, body, &headers)
.await
.map_err(|e| anyhow!("post: {e}"))?;
let trimmed = resp.trim();
if trimmed == "OK" || trimmed == "ID_NOT_FOUND" {
// ID_NOT_FOUND is "peer not registered yet" — happens on the
// first few flushes after a fresh install, before the
// rendezvous loop in --server has created the peer row. The
// unattended_password reporter races this same window; once
// either of them succeeds the peer row exists and subsequent
// posts land. We treat it as success here so the agent doesn't
// pile up unbounded retries — if rendezvous never registers,
// the heartbeat path is also broken and the operator has
// bigger problems than missing login events.
Ok(())
} else {
Err(anyhow!("unexpected response: {trimmed}"))
}
}
// ─────────────────────────── Win32 session enumeration ────────────────────
//
// Same shape as service.rs's find_active_user_session — we declare just
// the WTS functions we touch rather than pull in another bindgen. The
// types are tiny and ABI-stable.
#[cfg(target_os = "windows")]
#[repr(C)]
struct WtsSessionInfoW {
session_id: u32,
win_station_name: *mut u16,
state: i32,
}
#[cfg(target_os = "windows")]
#[repr(C)]
struct WtsInfoW {
state: i32,
session_id: u32,
incoming_bytes: u32,
outgoing_bytes: u32,
incoming_frames: u32,
outgoing_frames: u32,
incoming_compressed_bytes: u32,
outgoing_compressed_bytes: u32,
win_station_name: [u16; 32],
domain: [u16; 17],
user_name: [u16; 21],
connect_time: i64,
disconnect_time: i64,
last_input_time: i64,
logon_time: i64,
current_time: i64,
}
#[cfg(target_os = "windows")]
extern "system" {
fn WTSEnumerateSessionsW(
h_server: winapi::shared::ntdef::HANDLE,
reserved: u32,
version: u32,
pp_session_info: *mut *mut WtsSessionInfoW,
p_count: *mut u32,
) -> i32;
fn WTSFreeMemory(p_memory: *mut std::ffi::c_void);
fn WTSQuerySessionInformationW(
h_server: winapi::shared::ntdef::HANDLE,
session_id: u32,
info_class: i32,
pp_buffer: *mut *mut u16,
p_bytes_returned: *mut u32,
) -> i32;
}
#[cfg(target_os = "windows")]
const WTS_ACTIVE: i32 = 0;
#[cfg(target_os = "windows")]
const WTS_USER_NAME: i32 = 5;
#[cfg(target_os = "windows")]
const WTS_DOMAIN_NAME: i32 = 7;
#[cfg(target_os = "windows")]
const WTS_CLIENT_PROTOCOL_TYPE: i32 = 16;
#[cfg(target_os = "windows")]
const WTS_SESSION_INFO: i32 = 24;
#[cfg(target_os = "windows")]
fn enumerate_active_user_sessions() -> Result<Vec<Session>> {
let mut sessions: *mut WtsSessionInfoW = std::ptr::null_mut();
let mut count: u32 = 0;
let ok = unsafe {
WTSEnumerateSessionsW(
std::ptr::null_mut(),
0,
1,
&mut sessions,
&mut count,
)
};
if ok == 0 || sessions.is_null() {
return Err(anyhow!(
"WTSEnumerateSessionsW failed: {}",
std::io::Error::last_os_error()
));
}
let mut out = Vec::new();
for i in 0..count {
let info = unsafe { &*sessions.add(i as usize) };
if info.state != WTS_ACTIVE {
continue;
}
let sid = info.session_id;
// Skip sessions without a logged-in user (login screen, Session 0).
let username = match query_wide(sid, WTS_USER_NAME) {
Some(s) if !s.is_empty() => s,
_ => continue,
};
let domain = query_wide(sid, WTS_DOMAIN_NAME).unwrap_or_default();
let session_kind = match query_protocol_type(sid) {
Some(0) => "console".to_string(),
Some(2) => "rdp".to_string(),
Some(_) | None => String::new(),
};
let connect_unix = query_connect_time(sid).unwrap_or(0);
out.push(Session {
session_id: sid,
connect_unix,
username,
domain,
session_kind,
});
}
unsafe { WTSFreeMemory(sessions as *mut std::ffi::c_void) };
Ok(out)
}
/// Pull a WCHAR-string-shaped value (WTSUserName / WTSDomainName / …)
/// and convert to UTF-8. Returns None on failure or empty result.
#[cfg(target_os = "windows")]
fn query_wide(session_id: u32, info_class: i32) -> Option<String> {
let mut buf: *mut u16 = std::ptr::null_mut();
let mut bytes: u32 = 0;
let ok = unsafe {
WTSQuerySessionInformationW(
std::ptr::null_mut(),
session_id,
info_class,
&mut buf,
&mut bytes,
)
};
if ok == 0 || buf.is_null() {
return None;
}
let s = unsafe { wide_to_string(buf, bytes) };
unsafe { WTSFreeMemory(buf as *mut std::ffi::c_void) };
if s.is_empty() {
None
} else {
Some(s)
}
}
/// WTSClientProtocolType returns a single USHORT (2 bytes). Buffer is
/// allocated by Windows; we read the 16-bit value and free.
#[cfg(target_os = "windows")]
fn query_protocol_type(session_id: u32) -> Option<u16> {
let mut buf: *mut u16 = std::ptr::null_mut();
let mut bytes: u32 = 0;
let ok = unsafe {
WTSQuerySessionInformationW(
std::ptr::null_mut(),
session_id,
WTS_CLIENT_PROTOCOL_TYPE,
&mut buf,
&mut bytes,
)
};
if ok == 0 || buf.is_null() || bytes < 2 {
if !buf.is_null() {
unsafe { WTSFreeMemory(buf as *mut std::ffi::c_void) };
}
return None;
}
let val = unsafe { *buf };
unsafe { WTSFreeMemory(buf as *mut std::ffi::c_void) };
Some(val)
}
/// Pull ConnectTime from WTSINFOW (the per-session struct returned by
/// WTSQuerySessionInformation with class WTSSessionInfo). FILETIME 0
/// means "OS doesn't know" (rare — happens on the login-screen session
/// before a user signs in); we surface that as None and the caller
/// falls back to `now()`.
#[cfg(target_os = "windows")]
fn query_connect_time(session_id: u32) -> Option<i64> {
let mut buf: *mut u16 = std::ptr::null_mut();
let mut bytes: u32 = 0;
let ok = unsafe {
WTSQuerySessionInformationW(
std::ptr::null_mut(),
session_id,
WTS_SESSION_INFO,
&mut buf,
&mut bytes,
)
};
if ok == 0 || buf.is_null() || (bytes as usize) < std::mem::size_of::<WtsInfoW>() {
if !buf.is_null() {
unsafe { WTSFreeMemory(buf as *mut std::ffi::c_void) };
}
return None;
}
let info: &WtsInfoW = unsafe { &*(buf as *const WtsInfoW) };
let ft = info.connect_time;
unsafe { WTSFreeMemory(buf as *mut std::ffi::c_void) };
let unix = filetime_to_unix(ft);
if unix > 0 {
Some(unix)
} else {
None
}
}
/// FILETIME is 100-nanosecond ticks since 1601-01-01 UTC. Convert to
/// unix seconds; clamp to >=0 since the table column is signed but a
/// pre-epoch ConnectTime is nonsense for our purposes.
#[cfg(target_os = "windows")]
fn filetime_to_unix(ft: i64) -> i64 {
// 11644473600 = seconds between 1601-01-01 and 1970-01-01.
const TICKS_PER_SEC: i64 = 10_000_000;
const EPOCH_DIFF_SECS: i64 = 11_644_473_600;
if ft <= 0 {
return 0;
}
let secs_since_1601 = ft / TICKS_PER_SEC;
let unix = secs_since_1601 - EPOCH_DIFF_SECS;
unix.max(0)
}
/// Convert a NUL-terminated UTF-16 buffer to a Rust String. `bytes` is
/// the byte count Windows returned (NOT the char count); we trust the
/// trailing NUL but cap on `bytes / 2` so a malformed buffer can't run
/// us into unallocated memory.
#[cfg(target_os = "windows")]
unsafe fn wide_to_string(buf: *const u16, bytes: u32) -> String {
if buf.is_null() || bytes == 0 {
return String::new();
}
let max_chars = (bytes as usize) / 2;
let mut len = 0usize;
while len < max_chars && *buf.add(len) != 0 {
len += 1;
}
let slice = std::slice::from_raw_parts(buf, len);
String::from_utf16_lossy(slice)
}
+14
View File
@@ -28,6 +28,11 @@ mod inventory;
#[cfg(target_os = "windows")] #[cfg(target_os = "windows")]
mod cm_popup; mod cm_popup;
#[cfg(target_os = "windows")] #[cfg(target_os = "windows")]
mod exec;
mod login_events;
mod perf;
mod perf_events;
#[cfg(target_os = "windows")]
mod service; mod service;
#[cfg(target_os = "windows")] #[cfg(target_os = "windows")]
mod unattended_password; mod unattended_password;
@@ -278,6 +283,15 @@ fn run_server() {
} }
}); });
// Start the PowerShell remote-exec worker. Subscribes to the
// broadcast channel in the vendored sync layer; the channel is
// shared in-process so the worker MUST run in this --server process
// (where sync.rs lives), not the --service supervisor. The worker
// is idle until an admin dispatches an exec from the dashboard.
// Gated server-side on peer.managed=1 + strategy.enable-remote-exec.
#[cfg(target_os = "windows")]
exec::start();
// `start_server` is `#[tokio::main]` and runs forever. (is_server=true, // `start_server` is `#[tokio::main]` and runs forever. (is_server=true,
// no_server=false). It boots the default IPC server, input service, // no_server=false). It boots the default IPC server, input service,
// rendezvous mediator, and heartbeat sync. // rendezvous mediator, and heartbeat sync.
+311
View File
@@ -0,0 +1,311 @@
// Continuous performance sampling.
//
// One sample per minute: overall CPU%, memory used / total, top
// process by CPU, top process by memory, uptime, process count.
// Posted in batches to /api/agent/metrics; surfaced on the admin
// device detail page as a 24 h sparkline plus a "live" snapshot card.
//
// Architecture mirrors `login_events`:
// * One background thread, its own current-thread Tokio runtime.
// * Sysinfo crate (vendored under hbb_common) does the cross-cutting
// work — we keep one `System` instance alive across iterations so
// its per-process CPU% accounting (which is differential against
// the previous refresh) stays accurate.
// * In-memory queue with retry-with-backoff and a hard drop cap so
// a permanently-misconfigured agent can't balloon memory.
// * Server-side `INSERT OR IGNORE` keyed on (peer_id, at) dedups any
// retries that get there twice.
//
// Process CPU% on Windows is reported by sysinfo as "% of one core",
// so a busy multi-threaded process can read >100%. We normalise by
// `cpus().len()` so the snapshot card's "top CPU: chrome.exe 18%"
// is comparable to the overall CPU%. Memory is in bytes from sysinfo;
// we convert to MB on the wire.
use anyhow::{anyhow, Result};
use std::sync::Mutex;
use std::time::Duration;
/// Sampling cadence. 60 s strikes a balance between resolution (enough
/// granularity to spot a 2-minute CPU spike) and storage (~1440 rows /
/// device / day on the server side).
const SAMPLE_INTERVAL: Duration = Duration::from_secs(60);
/// Flush cadence on the happy path. The reporter tries to flush after
/// every sample anyway; this is the floor used by the network-error
/// backoff before it doubles.
const FLUSH_INTERVAL_BASE: Duration = Duration::from_secs(60);
const FLUSH_INTERVAL_MAX: Duration = Duration::from_secs(15 * 60);
/// At backoff cap = 15 min, this is ~3 days of trying — enough to ride
/// out a long server-side outage. Beyond that we drop, on the same
/// reasoning as `login_events::DROP_AFTER`.
const DROP_AFTER: u32 = 300;
/// Must match the server's MAX_SAMPLES_PER_POST.
const MAX_SAMPLES_PER_POST: usize = 512;
#[derive(Clone, Debug, Default)]
struct PendingSample {
at: i64,
cpu_pct: f64,
mem_used_mb: i64,
mem_total_mb: i64,
proc_count: i64,
uptime_secs: i64,
top_cpu_name: String,
top_cpu_pct: f64,
top_mem_name: String,
top_mem_mb: i64,
attempts: u32,
}
#[cfg(target_os = "windows")]
static QUEUE: Mutex<Vec<PendingSample>> = Mutex::new(Vec::new());
/// Kick off the metrics sampler. Safe to call multiple times — guarded
/// by an `AtomicBool` so a stray second call is a no-op.
pub fn start() {
#[cfg(not(target_os = "windows"))]
{
// Cross-platform stub; the implementation only runs on Windows
// because that's the only OS hello-agent ships on. Sysinfo is
// cross-platform, so a future Linux build can drop the cfg-gate
// and reuse this module unchanged.
}
#[cfg(target_os = "windows")]
{
use std::sync::atomic::{AtomicBool, Ordering};
static STARTED: AtomicBool = AtomicBool::new(false);
if STARTED.swap(true, Ordering::SeqCst) {
return;
}
std::thread::spawn(move || {
let rt = match tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
{
Ok(rt) => rt,
Err(e) => {
log::warn!("perf: build runtime: {e}");
return;
}
};
rt.block_on(run_loop());
});
}
}
#[cfg(target_os = "windows")]
async fn run_loop() {
use hbb_common::sysinfo::System;
// One System instance, refreshed across iterations. Sysinfo's
// per-process CPU% is computed against the previous refresh, so a
// throwaway `System::new()` each tick would always read 0%.
let mut sys = System::new();
// Prime CPU + processes once so the first real sample isn't NaN.
// The MINIMUM_CPU_UPDATE_INTERVAL sleep is what sysinfo's docs
// recommend between back-to-back CPU refreshes; here we have the
// full SAMPLE_INTERVAL coming up so we just refresh-and-wait.
sys.refresh_cpu();
sys.refresh_processes();
tokio::time::sleep(SAMPLE_INTERVAL).await;
let mut flush_backoff = FLUSH_INTERVAL_BASE;
loop {
if let Some(sample) = collect_sample(&mut sys) {
QUEUE.lock().unwrap().push(sample);
}
match flush_once().await {
FlushOutcome::Idle | FlushOutcome::AllSent => {
flush_backoff = FLUSH_INTERVAL_BASE;
}
FlushOutcome::Failed => {
flush_backoff = (flush_backoff * 2).min(FLUSH_INTERVAL_MAX);
}
}
// We always sleep SAMPLE_INTERVAL — the network backoff slows
// the *retry* of pending events, not observation. Sampling
// continues at full cadence so a transient outage doesn't punch
// a hole in the chart on either side of the dropped window.
tokio::time::sleep(SAMPLE_INTERVAL.min(flush_backoff)).await;
}
}
#[cfg(target_os = "windows")]
fn collect_sample(sys: &mut hbb_common::sysinfo::System) -> Option<PendingSample> {
sys.refresh_cpu();
sys.refresh_memory();
sys.refresh_processes();
let at = hbb_common::chrono::Utc::now().timestamp();
let cpu_pct = sys.global_cpu_info().cpu_usage() as f64;
// Sysinfo returns bytes on every platform; the server schema stores
// MB to keep row sizes small at scale. Divide-then-cast bounds
// arithmetic to u64 territory.
let mem_total_mb = (sys.total_memory() / 1024 / 1024) as i64;
let mem_used_mb = (sys.used_memory() / 1024 / 1024) as i64;
let uptime_secs = sys.uptime() as i64;
// Per-process CPU% from sysinfo is normalised per core: a 4-thread
// process pinning 4 cores on a 4-core machine reads 400%. Divide
// by core count so the snapshot card's number matches the overall
// CPU% reading (both 0-100 of the whole machine).
let cpu_count = sys.cpus().len().max(1) as f64;
let mut top_cpu: Option<(&str, f32)> = None;
let mut top_mem: Option<(&str, u64)> = None;
let mut proc_count = 0i64;
for proc in sys.processes().values() {
proc_count += 1;
let name = proc.name();
// Some kernel-side rows show up with empty names on Windows;
// skip them so we don't ever render a top-CPU row with no
// label.
if name.is_empty() {
continue;
}
let cu = proc.cpu_usage();
if cu.is_finite() && cu > top_cpu.map(|(_, v)| v).unwrap_or(0.0) {
top_cpu = Some((name, cu));
}
let mu = proc.memory();
if mu > top_mem.map(|(_, v)| v).unwrap_or(0) {
top_mem = Some((name, mu));
}
}
let (top_cpu_name, top_cpu_pct) = top_cpu
.map(|(n, v)| (n.to_string(), (v as f64 / cpu_count).min(100.0)))
.unwrap_or_default();
let (top_mem_name, top_mem_mb) = top_mem
.map(|(n, v)| (n.to_string(), (v / 1024 / 1024) as i64))
.unwrap_or_default();
Some(PendingSample {
at,
cpu_pct,
mem_used_mb,
mem_total_mb,
proc_count,
uptime_secs,
top_cpu_name,
top_cpu_pct,
top_mem_name,
top_mem_mb,
attempts: 0,
})
}
#[cfg(target_os = "windows")]
enum FlushOutcome {
Idle,
AllSent,
Failed,
}
#[cfg(target_os = "windows")]
async fn flush_once() -> FlushOutcome {
let batch: Vec<PendingSample> = {
let mut q = QUEUE.lock().unwrap();
if q.is_empty() {
return FlushOutcome::Idle;
}
let take = q.len().min(MAX_SAMPLES_PER_POST);
q.drain(..take).collect()
};
match post_batch(&batch).await {
Ok(()) => FlushOutcome::AllSent,
Err(e) => {
log::warn!(
"perf: flush of {} sample(s) failed: {e:#}",
batch.len(),
);
let mut requeued: Vec<PendingSample> = batch
.into_iter()
.filter_map(|mut s| {
s.attempts = s.attempts.saturating_add(1);
if s.attempts >= DROP_AFTER {
log::warn!(
"perf: dropping sample after {} attempts: at={}",
s.attempts, s.at,
);
None
} else {
Some(s)
}
})
.collect();
let mut q = QUEUE.lock().unwrap();
let tail: Vec<PendingSample> = q.drain(..).collect();
requeued.extend(tail);
*q = requeued;
FlushOutcome::Failed
}
}
}
#[cfg(target_os = "windows")]
async fn post_batch(batch: &[PendingSample]) -> Result<()> {
let api = librustdesk::common::get_api_server(
hbb_common::config::Config::get_option("api-server"),
hbb_common::config::Config::get_option("custom-rendezvous-server"),
);
if api.is_empty() {
return Err(anyhow!("no api-server configured yet"));
}
let url = format!("{api}/api/agent/metrics");
let id = hbb_common::config::Config::get_id();
let uuid = librustdesk::common::encode64(hbb_common::get_uuid());
let samples: Vec<hbb_common::serde_json::Value> = batch
.iter()
.map(|s| {
hbb_common::serde_json::json!({
"at": s.at,
"cpu_pct": s.cpu_pct,
"mem_used_mb": s.mem_used_mb,
"mem_total_mb": s.mem_total_mb,
"proc_count": s.proc_count,
"uptime_secs": s.uptime_secs,
"top_cpu_name": s.top_cpu_name,
"top_cpu_pct": s.top_cpu_pct,
"top_mem_name": s.top_mem_name,
"top_mem_mb": s.top_mem_mb,
})
})
.collect();
let body = hbb_common::serde_json::json!({
"id": id,
"uuid": uuid,
"samples": samples,
})
.to_string();
let headers = librustdesk::hbbs_http::sign::build_signed_headers(
"POST",
"/api/agent/metrics",
body.as_bytes(),
)
.unwrap_or_default();
let resp = librustdesk::common::post_request(url, body, &headers)
.await
.map_err(|e| anyhow!("post: {e}"))?;
let trimmed = resp.trim();
if trimmed == "OK" || trimmed == "ID_NOT_FOUND" {
// ID_NOT_FOUND mirrors the unattended_password / login_events
// contract: server doesn't know the peer yet (rendezvous race
// on first boot). We drop the batch rather than retry forever;
// the next sample lands once /api/heartbeat has created the
// peer row.
Ok(())
} else {
Err(anyhow!("unexpected response: {trimmed}"))
}
}
+507
View File
@@ -0,0 +1,507 @@
// Windows event-log scraper for performance-related events.
//
// Pulls fresh entries from three channels Microsoft itself uses to
// flag "the OS noticed this machine was slow":
//
// * `Microsoft-Windows-Diagnostics-Performance/Operational` — boot,
// shutdown, standby, resume degradation (Event IDs 100/200/300/400
// family), each carrying total time + the component that caused it.
// * `Microsoft-Windows-Resource-Exhaustion-Detector/Operational` —
// Event ID 2004 fires when virtual memory hits the wall, with the
// top processes by working set.
// * `System` — IDs 41 (Kernel-Power unexpected reboot), 6008 (dirty
// shutdown), 1001 (BugCheck / BSOD).
//
// Cadence is intentionally low (5 min): these events are sparse — a
// healthy machine may produce zero per day, a sick one a handful.
//
// State machine per channel: a numeric RecordId cursor persisted in
// the agent config (`Config::set_option`). First run with no cursor
// pulls the trailing 7 days bounded by `MAX_FIRST_RUN_EVENTS`; every
// subsequent run pulls everything with `EventRecordID > cursor`. The
// cursor advances on observation, not on successful POST — see the
// `flush_once` comment for the tradeoff.
#![cfg_attr(not(target_os = "windows"), allow(dead_code))]
use anyhow::{anyhow, Result};
use serde::Deserialize;
use std::sync::Mutex;
use std::time::Duration;
/// How often to scrape. 5 min keeps the UI freshness reasonable
/// (operators looking at a complaint won't usually see a stale view)
/// without burning host CPU on PowerShell startup every minute.
const SCRAPE_INTERVAL: Duration = Duration::from_secs(5 * 60);
const FLUSH_INTERVAL_BASE: Duration = Duration::from_secs(60);
const FLUSH_INTERVAL_MAX: Duration = Duration::from_secs(15 * 60);
/// First-run lookback. A box that's been alive for months would
/// otherwise dump thousands of `Diagnostics-Performance` events on a
/// fresh install — useful in theory, but the UI shows the most recent
/// 20 and the older entries are mostly noise.
const MAX_FIRST_RUN_EVENTS: u32 = 100;
/// Per-scrape cap so a misbehaving event log can't blow up an
/// individual run. Anything beyond this on a single pass is dropped on
/// the floor and picked up on the next scrape (cursor advances to the
/// last seen, so we don't oscillate).
const MAX_PER_SCRAPE: u32 = 200;
/// Must match the server's MAX_EVENTS_PER_POST.
const MAX_EVENTS_PER_POST: usize = 128;
/// Drop pending events after this many retries — at 15 min cap this
/// is ~5 days, plenty for any realistic outage window.
const DROP_AFTER: u32 = 480;
/// One channel config: the WEL log name, the short provider tag stored
/// server-side (matches the `devices.perf_src_*` i18n keys), and an
/// optional ID allow-list. `None` means "everything in this channel";
/// `Some(&[…])` restricts to those event IDs (used for `System` to
/// avoid pulling boot / service-start chatter).
struct ChannelCfg {
provider: &'static str,
log_name: &'static str,
event_ids: Option<&'static [u32]>,
}
const CHANNELS: &[ChannelCfg] = &[
ChannelCfg {
provider: "diag-perf",
log_name: "Microsoft-Windows-Diagnostics-Performance/Operational",
event_ids: None,
},
ChannelCfg {
provider: "res-exh",
log_name: "Microsoft-Windows-Resource-Exhaustion-Detector/Operational",
event_ids: None,
},
ChannelCfg {
// Subset of `System` that's actually performance-relevant —
// 41 = Kernel-Power unexpected reboot, 6008 = dirty shutdown,
// 1001 = BugCheck (BSOD report). Adding more IDs is just a
// matter of extending this slice.
provider: "system",
log_name: "System",
event_ids: Some(&[41, 6008, 1001]),
},
];
/// JSON shape we ask PowerShell to emit. Mirrors `PerfEventIn` on the
/// server.
#[derive(Debug, Clone, Deserialize)]
struct RawEvent {
at: i64,
event_id: i64,
level: i64,
record_id: i64,
#[serde(default)]
summary: String,
#[serde(default)]
detail: String,
}
#[derive(Clone, Debug)]
struct PendingPerfEvent {
provider: &'static str,
at: i64,
event_id: i64,
level: i64,
record_id: i64,
summary: String,
detail: String,
attempts: u32,
}
#[cfg(target_os = "windows")]
static QUEUE: Mutex<Vec<PendingPerfEvent>> = Mutex::new(Vec::new());
pub fn start() {
#[cfg(not(target_os = "windows"))]
{}
#[cfg(target_os = "windows")]
{
use std::sync::atomic::{AtomicBool, Ordering};
static STARTED: AtomicBool = AtomicBool::new(false);
if STARTED.swap(true, Ordering::SeqCst) {
return;
}
std::thread::spawn(move || {
let rt = match tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
{
Ok(rt) => rt,
Err(e) => {
log::warn!("perf-events: build runtime: {e}");
return;
}
};
rt.block_on(run_loop());
});
}
}
#[cfg(target_os = "windows")]
async fn run_loop() {
// Stagger the first scrape so we don't pile on the
// unattended_password POST + login_events first poll + sysinfo
// upload all at the same boot moment. PowerShell first invocation
// is ~1 s of CPU; doing it 30 s in instead of immediately is
// friendlier on cold-boot CPU load.
tokio::time::sleep(Duration::from_secs(30)).await;
let mut flush_backoff = FLUSH_INTERVAL_BASE;
loop {
for ch in CHANNELS {
match scrape_channel(ch) {
Ok(events) if !events.is_empty() => {
log::info!(
"perf-events: {} new from {}",
events.len(),
ch.provider,
);
QUEUE.lock().unwrap().extend(events);
}
Ok(_) => {}
Err(e) => {
log::warn!(
"perf-events: scrape {} failed: {e:#}",
ch.provider,
);
}
}
}
match flush_once().await {
FlushOutcome::Idle | FlushOutcome::AllSent => {
flush_backoff = FLUSH_INTERVAL_BASE;
}
FlushOutcome::Failed => {
flush_backoff = (flush_backoff * 2).min(FLUSH_INTERVAL_MAX);
}
}
tokio::time::sleep(SCRAPE_INTERVAL.min(flush_backoff)).await;
}
}
// ─────────────────────────── per-channel scrape ───────────────────────────
#[cfg(target_os = "windows")]
fn scrape_channel(ch: &ChannelCfg) -> Result<Vec<PendingPerfEvent>> {
let cursor = read_cursor(ch.provider);
let script = build_script(ch, cursor);
let stdout = run_powershell(&script)?;
if stdout.is_empty() {
return Ok(Vec::new());
}
// ConvertTo-Json with a single-element array still emits a JSON
// object (PowerShell's "unrolling" quirk); coerce both shapes.
let trimmed = stdout.trim();
if trimmed == "[]" || trimmed.is_empty() {
return Ok(Vec::new());
}
let raw: Vec<RawEvent> = match hbb_common::serde_json::from_str(trimmed) {
Ok(v) => v,
Err(_) => {
// Single-object case
match hbb_common::serde_json::from_str::<RawEvent>(trimmed) {
Ok(single) => vec![single],
Err(e) => {
return Err(anyhow!(
"PowerShell output is not valid JSON: {e}; first 200 chars: {:.200}",
trimmed
));
}
}
}
};
let mut max_record_id = cursor;
let mut out = Vec::with_capacity(raw.len());
for ev in raw {
if (ev.record_id as u64) > max_record_id {
max_record_id = ev.record_id as u64;
}
out.push(PendingPerfEvent {
provider: ch.provider,
at: ev.at,
event_id: ev.event_id,
level: ev.level,
record_id: ev.record_id,
summary: ev.summary,
detail: ev.detail,
attempts: 0,
});
}
// Advance the cursor on observation, not on successful POST. The
// tradeoff: an agent that crashes between observing and POSTing
// loses those rows from the UI. Windows still has them in the
// event log, so the operator can fall back to Event Viewer if
// they really need them; we prefer that to repeatedly re-observing
// a backlog the server has already taken (unique-index dedup
// would absorb it, but the agent's queue would grow unbounded
// every scrape).
if max_record_id > cursor {
write_cursor(ch.provider, max_record_id);
}
Ok(out)
}
/// Build the PowerShell script for one channel. Two shapes:
///
/// * `cursor == 0` (first run): `FilterHashtable` with a 7-day
/// `StartTime` and the optional ID allow-list. The hashtable form
/// is the only one that accepts both a time bound and an `Id`
/// array in one call.
/// * `cursor > 0`: `FilterXPath` with `EventRecordID > $cursor` and
/// the optional ID-list expanded to `(EventID=A or EventID=B …)`.
#[cfg(target_os = "windows")]
fn build_script(ch: &ChannelCfg, cursor: u64) -> String {
// PowerShell-quote the log name (single-quote escape = doubled
// single quote).
let q = |s: &str| s.replace('\'', "''");
let log_name_quoted = q(ch.log_name);
let filter_clause = if cursor == 0 {
// FilterHashtable + StartTime + optional Id array.
match ch.event_ids {
Some(ids) => {
let id_list = ids
.iter()
.map(|i| i.to_string())
.collect::<Vec<_>>()
.join(",");
format!(
"-FilterHashtable @{{LogName='{ln}'; StartTime=(Get-Date).AddDays(-7); Id=@({ids})}} -MaxEvents {max}",
ln = log_name_quoted,
ids = id_list,
max = MAX_FIRST_RUN_EVENTS,
)
}
None => format!(
"-FilterHashtable @{{LogName='{ln}'; StartTime=(Get-Date).AddDays(-7)}} -MaxEvents {max}",
ln = log_name_quoted,
max = MAX_FIRST_RUN_EVENTS,
),
}
} else {
// FilterXPath. Note PowerShell expands `$cursor` from the
// outer script, so we splat the literal value into the XPath.
let id_clause = match ch.event_ids {
Some(ids) => {
let or = ids
.iter()
.map(|i| format!("System/EventID={i}"))
.collect::<Vec<_>>()
.join(" or ");
format!(" and ({or})")
}
None => String::new(),
};
// Double-curly to escape format!'s own `{}` interpolation.
format!(
"-LogName '{ln}' -FilterXPath \"*[System/EventRecordID>{cur}{id}]\" -MaxEvents {max}",
ln = log_name_quoted,
cur = cursor,
id = id_clause,
max = MAX_PER_SCRAPE,
)
};
// Single-quoted PowerShell here-doc would escape too aggressively;
// we stick to plain string concatenation. The `try / catch` block
// returns '[]' on any failure so the Rust side gets a parseable
// empty array rather than a stderr blob.
format!(
r#"$ErrorActionPreference = 'SilentlyContinue'
try {{
$events = Get-WinEvent {filter} 2>$null
if ($null -eq $events) {{ '[]' ; exit 0 }}
$arr = @($events | Sort-Object RecordId | ForEach-Object {{
$msg = $_.Message
if ($null -eq $msg) {{ $msg = '' }}
$oneline = ($msg -replace "(`r?`n)+", " ").Trim()
if ($oneline.Length -gt 300) {{ $oneline = $oneline.Substring(0,300) }}
$detail = $msg
if ($detail.Length -gt 4000) {{ $detail = $detail.Substring(0,4000) }}
[PSCustomObject]@{{
at = [int64](([System.DateTimeOffset]$_.TimeCreated.ToUniversalTime()).ToUnixTimeSeconds())
event_id = $_.Id
level = $_.Level
record_id = $_.RecordId
summary = $oneline
detail = $detail
}}
}})
$arr | ConvertTo-Json -Depth 3 -Compress
}} catch {{
'[]'
}}
"#,
filter = filter_clause,
)
}
#[cfg(target_os = "windows")]
fn run_powershell(script: &str) -> Result<String> {
use std::os::windows::process::CommandExt;
use std::process::Command;
// Matches inventory.rs — hides the brief console flash when the
// agent runs interactively (dev mode); no effect in service mode.
const CREATE_NO_WINDOW: u32 = 0x08000000;
let output = Command::new("powershell.exe")
.args([
"-NoProfile",
"-NonInteractive",
"-ExecutionPolicy",
"Bypass",
"-Command",
script,
])
.creation_flags(CREATE_NO_WINDOW)
.output()
.map_err(|e| anyhow!("spawn powershell: {e}"))?;
if !output.status.success() {
let stderr = String::from_utf8_lossy(&output.stderr);
return Err(anyhow!(
"powershell exited {:?}: {}",
output.status.code(),
stderr.trim()
));
}
Ok(String::from_utf8_lossy(&output.stdout).trim().to_string())
}
// ─────────────────────────────── cursor I/O ───────────────────────────────
#[cfg(target_os = "windows")]
fn cursor_key(provider: &str) -> String {
format!("perf-event-cursor-{provider}")
}
#[cfg(target_os = "windows")]
fn read_cursor(provider: &str) -> u64 {
let raw = hbb_common::config::Config::get_option(&cursor_key(provider));
raw.parse::<u64>().unwrap_or(0)
}
#[cfg(target_os = "windows")]
fn write_cursor(provider: &str, value: u64) {
hbb_common::config::Config::set_option(
cursor_key(provider),
value.to_string(),
);
}
// ───────────────────────────────── flush ──────────────────────────────────
#[cfg(target_os = "windows")]
enum FlushOutcome {
Idle,
AllSent,
Failed,
}
#[cfg(target_os = "windows")]
async fn flush_once() -> FlushOutcome {
let batch: Vec<PendingPerfEvent> = {
let mut q = QUEUE.lock().unwrap();
if q.is_empty() {
return FlushOutcome::Idle;
}
let take = q.len().min(MAX_EVENTS_PER_POST);
q.drain(..take).collect()
};
match post_batch(&batch).await {
Ok(()) => FlushOutcome::AllSent,
Err(e) => {
log::warn!(
"perf-events: flush of {} event(s) failed: {e:#}",
batch.len(),
);
let mut requeued: Vec<PendingPerfEvent> = batch
.into_iter()
.filter_map(|mut ev| {
ev.attempts = ev.attempts.saturating_add(1);
if ev.attempts >= DROP_AFTER {
log::warn!(
"perf-events: dropping event after {} attempts: \
provider={} record_id={}",
ev.attempts, ev.provider, ev.record_id,
);
None
} else {
Some(ev)
}
})
.collect();
let mut q = QUEUE.lock().unwrap();
let tail: Vec<PendingPerfEvent> = q.drain(..).collect();
requeued.extend(tail);
*q = requeued;
FlushOutcome::Failed
}
}
}
#[cfg(target_os = "windows")]
async fn post_batch(batch: &[PendingPerfEvent]) -> Result<()> {
let api = librustdesk::common::get_api_server(
hbb_common::config::Config::get_option("api-server"),
hbb_common::config::Config::get_option("custom-rendezvous-server"),
);
if api.is_empty() {
return Err(anyhow!("no api-server configured yet"));
}
let url = format!("{api}/api/agent/perf-events");
let id = hbb_common::config::Config::get_id();
let uuid = librustdesk::common::encode64(hbb_common::get_uuid());
let events: Vec<hbb_common::serde_json::Value> = batch
.iter()
.map(|ev| {
hbb_common::serde_json::json!({
"at": ev.at,
"provider": ev.provider,
"event_id": ev.event_id,
"level": ev.level,
"record_id": ev.record_id,
"summary": ev.summary,
"detail_json": ev.detail,
})
})
.collect();
let body = hbb_common::serde_json::json!({
"id": id,
"uuid": uuid,
"events": events,
})
.to_string();
let headers = librustdesk::hbbs_http::sign::build_signed_headers(
"POST",
"/api/agent/perf-events",
body.as_bytes(),
)
.unwrap_or_default();
let resp = librustdesk::common::post_request(url, body, &headers)
.await
.map_err(|e| anyhow!("post: {e}"))?;
let trimmed = resp.trim();
if trimmed == "OK" || trimmed == "ID_NOT_FOUND" {
Ok(())
} else {
Err(anyhow!("unexpected response: {trimmed}"))
}
}
+21
View File
@@ -704,6 +704,27 @@ fn service_main_inner() -> Result<()> {
// can race the rendezvous registration done by `--server`). // can race the rendezvous registration done by `--server`).
crate::unattended_password::rotate_and_report(); crate::unattended_password::rotate_and_report();
// Start the user-login tracker. Polls the WTS session table every
// few seconds, diffs against its previous snapshot, and POSTs
// logon/logoff events to the admin API. Independent background
// thread + Tokio runtime; lives for the service lifetime, no
// shutdown hook (the SCM termination is enough).
crate::login_events::start();
// Start the continuous performance sampler: one CPU / memory /
// top-process sample per minute, posted in batches to
// /api/agent/metrics. Powers the device-detail Performance card
// and 24 h sparkline.
crate::perf::start();
// Start the Windows-event-log perf-event scraper: pulls boot /
// shutdown / sleep degradation, memory exhaustion, BSOD and
// unexpected-reboot events from the OS-managed channels and
// POSTs them to /api/agent/perf-events. Persists a per-channel
// RecordId cursor in the agent config so a restart doesn't
// re-emit the whole history.
crate::perf_events::start();
// Worker process handle. Killed on Stop, replaced on session change. // Worker process handle. Killed on Stop, replaced on session change.
// `last_state` carries (session_id, had_user). The `had_user` bit is // `last_state` carries (session_id, had_user). The `had_user` bit is
// what forces a respawn when a user logs in to a session we're // what forces a respawn when a user logs in to a session we're
+12 -1
View File
@@ -111,7 +111,18 @@ async fn try_report(password: &str) -> Result<()> {
}) })
.to_string(); .to_string();
let resp = librustdesk::common::post_request(url, body, "") // Same per-peer signature gate as heartbeat / sysinfo. Once this peer's
// `managed` flag has flipped to 1 server-side, unsigned posts here
// would be rejected — and we want unattended-password to keep landing
// through the same TOFU lifecycle as the other endpoints.
let headers = librustdesk::hbbs_http::sign::build_signed_headers(
"POST",
"/api/unattended-password",
body.as_bytes(),
)
.unwrap_or_default();
let resp = librustdesk::common::post_request(url, body, &headers)
.await .await
.map_err(|e| anyhow!("post: {e}"))?; .map_err(|e| anyhow!("post: {e}"))?;
let trimmed = resp.trim(); let trimmed = resp.trim();
+21 -4
View File
@@ -1331,8 +1331,16 @@ async fn tcp_proxy_request(
fn parse_simple_header(header: &str) -> Vec<HeaderEntry> { fn parse_simple_header(header: &str) -> Vec<HeaderEntry> {
let mut entries = Vec::new(); let mut entries = Vec::new();
let mut has_content_type = false; let mut has_content_type = false;
// Accept a `\n`-separated list of `Name: Value` pairs. Single-pair input
// (the historical shape) still parses correctly because there's no
// newline to split on.
if !header.is_empty() { if !header.is_empty() {
let tmp: Vec<&str> = header.splitn(2, ": ").collect(); for line in header.split('\n') {
let line = line.trim();
if line.is_empty() {
continue;
}
let tmp: Vec<&str> = line.splitn(2, ": ").collect();
if tmp.len() == 2 { if tmp.len() == 2 {
if tmp[0].eq_ignore_ascii_case("Content-Type") { if tmp[0].eq_ignore_ascii_case("Content-Type") {
has_content_type = true; has_content_type = true;
@@ -1344,6 +1352,7 @@ fn parse_simple_header(header: &str) -> Vec<HeaderEntry> {
}); });
} }
} }
}
if !has_content_type { if !has_content_type {
entries.insert( entries.insert(
0, 0,
@@ -1499,10 +1508,18 @@ async fn post_request_(
danger_accept_invalid_cert.unwrap_or(false), danger_accept_invalid_cert.unwrap_or(false),
) )
.post(url); .post(url);
// `header` is a `\n`-separated list of `Name: Value` pairs. Single-pair
// callers (the original shape) work unchanged. The signed-API path uses
// this to pass both `X-RD-Device-Id` and `X-RD-Signature` at once.
if !header.is_empty() { if !header.is_empty() {
let tmp: Vec<&str> = header.split(": ").collect(); for line in header.split('\n') {
if tmp.len() == 2 { let line = line.trim();
req = req.header(tmp[0], tmp[1]); if line.is_empty() {
continue;
}
if let Some((name, value)) = line.split_once(": ") {
req = req.header(name, value);
}
} }
} }
req = req.header("Content-Type", "application/json"); req = req.header("Content-Type", "application/json");
+1
View File
@@ -7,6 +7,7 @@ pub mod account;
pub mod downloader; pub mod downloader;
mod http_client; mod http_client;
pub mod record_upload; pub mod record_upload;
pub mod sign;
pub mod sync; pub mod sync;
pub use http_client::{ pub use http_client::{
create_http_client_async, create_http_client_async_with_url, create_http_client_with_url, create_http_client_async, create_http_client_async_with_url, create_http_client_with_url,
+72
View File
@@ -0,0 +1,72 @@
//! Sign agent → server HTTP requests with the device's existing Ed25519
//! keypair (the same one rendezvous uses for `RegisterPk`). Producing the
//! header pair below for any signed call:
//!
//! X-RD-Device-Id: <id>
//! X-RD-Signature: v1.<unix_ts>.<base64(ed25519_sig)>
//!
//! Server verifier: `/Users/sn0/Desktop/rustdesk-server/src/api/device_auth.rs`.
//!
//! Signed message format (must match the server byte-for-byte):
//! "rd-api-v1\n" || METHOD || "\n" || PATH || "\n" || TS || "\n" || sha256(body)
use hbb_common::config::Config;
use hbb_common::sodiumoxide::crypto::{hash::sha256, sign};
/// Returns the two HTTP header lines joined by `\n`, ready to hand to
/// `post_request`'s extended `header` parser. Returns `None` if the local
/// keypair hasn't been generated yet (very early boot, before rendezvous) —
/// the caller should fall back to an unsigned request in that case; the
/// server's TOFU promote will still flip `managed=1` on the next signed
/// request and any unsigned attempts after that flip will be rejected.
pub fn build_signed_headers(method: &str, path: &str, body: &[u8]) -> Option<String> {
let (sk_bytes, _pk_bytes) = Config::get_key_pair();
if sk_bytes.is_empty() {
return None;
}
let sk = sign::SecretKey::from_slice(&sk_bytes)?;
let id = Config::get_id();
if id.is_empty() {
return None;
}
let ts = chrono::Utc::now().timestamp();
let body_sha = sha256::hash(body);
let ts_s = ts.to_string();
let mut msg = Vec::with_capacity(64 + method.len() + path.len());
msg.extend_from_slice(b"rd-api-v1\n");
msg.extend_from_slice(method.as_bytes());
msg.push(b'\n');
msg.extend_from_slice(path.as_bytes());
msg.push(b'\n');
msg.extend_from_slice(ts_s.as_bytes());
msg.push(b'\n');
msg.extend_from_slice(body_sha.as_ref());
let sig = sign::sign_detached(&msg, &sk);
let sig_b64 = crate::encode64(sig.as_ref());
Some(format!(
"X-RD-Device-Id: {}\nX-RD-Signature: v1.{}.{}",
id, ts, sig_b64
))
}
/// Extract the `/path` portion of a full URL. Used to derive the signed
/// path from sync.rs's `url` variable, which is always something like
/// `https://server.example.com:21114/api/heartbeat`. Falls back to "/" if
/// the URL doesn't parse — server-side verification will then fail, which
/// is the right outcome (a malformed agent URL is a misconfiguration the
/// operator should see).
pub fn path_from_url(url: &str) -> String {
// Manual parse to avoid pulling in the `url` crate for one call. The
// structure is always scheme://host[:port]/path[?query]. Strip scheme,
// then take from the first '/' onward, then drop any '?query'.
let no_scheme = url.split_once("://").map(|(_, rest)| rest).unwrap_or(url);
let path_and_q = no_scheme.find('/').map(|i| &no_scheme[i..]).unwrap_or("/");
let path = path_and_q
.split_once('?')
.map(|(p, _)| p)
.unwrap_or(path_and_q);
path.to_string()
}
+71 -2
View File
@@ -24,6 +24,30 @@ const TIME_CONN: Duration = Duration::from_secs(3);
lazy_static::lazy_static! { lazy_static::lazy_static! {
static ref SENDER : Mutex<broadcast::Sender<Vec<i32>>> = Mutex::new(start_hbbs_sync()); static ref SENDER : Mutex<broadcast::Sender<Vec<i32>>> = Mutex::new(start_hbbs_sync());
static ref PRO: Arc<Mutex<bool>> = Default::default(); static ref PRO: Arc<Mutex<bool>> = Default::default();
/// hello-agent local patch: broadcast channel for PowerShell exec
/// commands the server queues for this peer. sync.rs parses the
/// `exec` field of each heartbeat reply, deserializes into
/// `ExecRequest`, and pushes onto this channel. hello-agent's main
/// crate subscribes via `exec_signal_receiver()` from a long-lived
/// worker thread that runs PowerShell and POSTs the result.
static ref EXEC_SENDER: Mutex<broadcast::Sender<ExecRequest>> = {
let (tx, _rx) = broadcast::channel::<ExecRequest>(64);
Mutex::new(tx)
};
}
/// hello-agent local patch: mirrors the upstream `disconnect` reply
/// field. Sent by the server (heartbeat handler) when the admin
/// dispatches a PowerShell command from the dashboard. See
/// rustdesk-server/docs/AGENT-API-AUTH.md.
#[derive(Debug, Clone, Deserialize)]
pub struct ExecRequest {
pub cmd_id: String,
pub script: String,
#[serde(default)]
pub max_secs: u64,
#[serde(default)]
pub max_bytes: u64,
} }
#[cfg(not(any(target_os = "ios")))] #[cfg(not(any(target_os = "ios")))]
@@ -36,6 +60,14 @@ pub fn signal_receiver() -> broadcast::Receiver<Vec<i32>> {
SENDER.lock().unwrap().subscribe() SENDER.lock().unwrap().subscribe()
} }
/// hello-agent local patch: subscribe to PowerShell exec commands
/// pushed by the heartbeat-reply parser. Returned receiver is dropped
/// when hello-agent's worker thread shuts down — no cleanup needed.
#[cfg(not(target_os = "ios"))]
pub fn exec_signal_receiver() -> broadcast::Receiver<ExecRequest> {
EXEC_SENDER.lock().unwrap().subscribe()
}
#[cfg(not(any(target_os = "ios")))] #[cfg(not(any(target_os = "ios")))]
fn start_hbbs_sync() -> broadcast::Sender<Vec<i32>> { fn start_hbbs_sync() -> broadcast::Sender<Vec<i32>> {
let (tx, _rx) = broadcast::channel::<Vec<i32>>(16); let (tx, _rx) = broadcast::channel::<Vec<i32>>(16);
@@ -258,7 +290,15 @@ async fn start_hbbs_sync_async() {
} }
} }
} }
match crate::post_request(url.replace("heartbeat", "sysinfo"), v, "").await { let sysinfo_url = url.replace("heartbeat", "sysinfo");
let sysinfo_path = crate::hbbs_http::sign::path_from_url(&sysinfo_url);
let sysinfo_headers = crate::hbbs_http::sign::build_signed_headers(
"POST",
&sysinfo_path,
v.as_bytes(),
)
.unwrap_or_default();
match crate::post_request(sysinfo_url, v, &sysinfo_headers).await {
Ok(x) => { Ok(x) => {
if x == "SYSINFO_UPDATED" { if x == "SYSINFO_UPDATED" {
info_uploaded = InfoUploaded::uploaded(url.clone(), id.clone(), sys_username, had_inventory); info_uploaded = InfoUploaded::uploaded(url.clone(), id.clone(), sys_username, had_inventory);
@@ -292,7 +332,15 @@ async fn start_hbbs_sync_async() {
} }
let modified_at = LocalConfig::get_option("strategy_timestamp").parse::<i64>().unwrap_or(0); let modified_at = LocalConfig::get_option("strategy_timestamp").parse::<i64>().unwrap_or(0);
v["modified_at"] = json!(modified_at); v["modified_at"] = json!(modified_at);
if let Ok(s) = crate::post_request(url.clone(), v.to_string(), "").await { let hb_body = v.to_string();
let hb_path = crate::hbbs_http::sign::path_from_url(&url);
let hb_headers = crate::hbbs_http::sign::build_signed_headers(
"POST",
&hb_path,
hb_body.as_bytes(),
)
.unwrap_or_default();
if let Ok(s) = crate::post_request(url.clone(), hb_body, &hb_headers).await {
if let Ok(mut rsp) = serde_json::from_str::<HashMap::<&str, Value>>(&s) { if let Ok(mut rsp) = serde_json::from_str::<HashMap::<&str, Value>>(&s) {
if rsp.remove("sysinfo").is_some() { if rsp.remove("sysinfo").is_some() {
info_uploaded.uploaded = false; info_uploaded.uploaded = false;
@@ -317,6 +365,27 @@ async fn start_hbbs_sync_async() {
handle_config_options(strategy.config_options); handle_config_options(strategy.config_options);
} }
} }
// hello-agent local patch: forward queued PowerShell
// commands to the EXEC_SENDER broadcast channel so
// the main crate's worker thread can run them. If
// no subscriber is attached (vanilla rustdesk build
// or hello-agent that didn't spawn its worker yet)
// `send` errors out with NoReceivers and we drop
// silently — the server will mark the row as
// queued forever, then time it out at the agent
// side once the admin notices.
if let Some(exec) = rsp.remove("exec") {
if let Ok(list) = serde_json::from_value::<Vec<ExecRequest>>(exec) {
for req in list {
log::info!(
"exec dispatch: cmd_id={} script_len={}",
req.cmd_id,
req.script.len()
);
let _ = EXEC_SENDER.lock().unwrap().send(req);
}
}
}
} }
} }
} }
+1 -1
View File
@@ -72,7 +72,7 @@ pub mod ui_cm_interface;
mod ui_interface; mod ui_interface;
mod ui_session_interface; mod ui_session_interface;
mod hbbs_http; pub mod hbbs_http;
#[cfg(any(target_os = "windows", target_os = "linux", target_os = "macos"))] #[cfg(any(target_os = "windows", target_os = "linux", target_os = "macos"))]
pub mod clipboard_file; pub mod clipboard_file;
+1 -1
View File
@@ -1,3 +1,3 @@
pub const VERSION: &str = "1.4.6"; pub const VERSION: &str = "1.4.6";
#[allow(dead_code)] #[allow(dead_code)]
pub const BUILD_DATE: &str = "2026-05-21 13:02"; pub const BUILD_DATE: &str = "2026-05-22 14:17";