6 Commits

Author SHA1 Message Date
mike 95e51842ae release: bump to 0.1.8 (fix invalid edition)
build-windows / build-hello-agent-x64 (push) Successful in 6m19s
build-windows / sign-hello-agent-x64 (push) Successful in 5s
build-windows / validate-hello-agent-x64 (push) Successful in 9s
Bump version for the authoritative-strategy + session-notification changes.
edition was set to an invalid "2026"; restore to 2021 (rust-version 1.75
predates edition 2024).

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-16 10:59:09 +02:00
mike faf3b1303b strategy: make pushed config authoritative
handle_config_options only ever inserted/overwrote the keys a strategy
sent, so removing a key (or unassigning a strategy) on the server left the
old value lingering on the device.

Persist the set of keys applied on the previous push (strategy_managed_keys
in LocalConfig) and, on each apply, reset any key the server managed before
but no longer sends back to its default. Keys the server never managed (the
user's own local settings) are left untouched, so this remains a managed
overlay rather than a wipe.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-16 08:54:12 +00:00
mike 31d5a881b6 cm: only notify the user once all sessions have closed
remove_connection fired the "session ended" banner for every connection
that closed. With multiple concurrent sessions (a second supporter, or a
file-transfer session running alongside remote control) the user saw one
banner per close even though support was still ongoing.

Track the remaining approved-session count under the same lock and show the
banner only when the last session has ended. Denied/never-approved
connections still produce no banner.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-16 08:54:12 +00:00
mike 14411987e7 sysinfo: advertise Ed25519 public key for opsbase TOFU enrollment
build-windows / build-hello-agent-x64 (push) Successful in 5m19s
build-windows / sign-hello-agent-x64 (push) Successful in 8s
build-windows / validate-hello-agent-x64 (push) Successful in 11s
Include the agent's base64 Ed25519 public key in the `pk` field of the
sysinfo upload. opsbase (acting as the agent's api-server) has no rendezvous
server to learn the key from, so it pins this key trust-on-first-use on first
contact and verifies every later signed request against it.

This is the same keypair sign.rs already signs requests with. Vanilla
rustdesk servers ignore the unknown field, so the change is backward
compatible.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-15 19:23:58 +00:00
mike 8de2ebea85 Implement performance monitor
build-windows / build-hello-agent-x64 (push) Successful in 5m0s
build-windows / sign-hello-agent-x64 (push) Successful in 5s
build-windows / validate-hello-agent-x64 (push) Successful in 7s
2026-05-22 21:52:13 +02:00
mike f868efa432 Implement user login logging
build-windows / build-hello-agent-x64 (push) Successful in 4m56s
build-windows / sign-hello-agent-x64 (push) Successful in 5s
build-windows / validate-hello-agent-x64 (push) Successful in 6s
2026-05-22 20:08:24 +02:00
9 changed files with 1523 additions and 4 deletions
Generated
+2 -1
View File
@@ -3197,13 +3197,14 @@ checksum = "2304e00983f87ffb38b55b444b5e3b60a884b5d30c0fca7d82fe33449bbe55ea"
[[package]] [[package]]
name = "hello-agent" name = "hello-agent"
version = "0.1.5" version = "0.1.8"
dependencies = [ dependencies = [
"anyhow", "anyhow",
"env_logger 0.10.2", "env_logger 0.10.2",
"hbb_common", "hbb_common",
"log", "log",
"rustdesk", "rustdesk",
"serde 1.0.228",
"serde_json 1.0.118", "serde_json 1.0.118",
"tokio", "tokio",
"winapi", "winapi",
+7 -1
View File
@@ -1,6 +1,6 @@
[package] [package]
name = "hello-agent" name = "hello-agent"
version = "0.1.5" version = "0.1.8"
edition = "2021" edition = "2021"
rust-version = "1.75" rust-version = "1.75"
description = "Headless RustDesk-protocol-compatible support agent for Windows" description = "Headless RustDesk-protocol-compatible support agent for Windows"
@@ -34,6 +34,12 @@ anyhow = "1"
# the inventory module's `use serde_json` doesn't depend on internal # the inventory module's `use serde_json` doesn't depend on internal
# implementation details of hbb_common. # implementation details of hbb_common.
serde_json = "1" serde_json = "1"
# `perf_events.rs` derives Deserialize on the PowerShell row schema.
# hbb_common re-exports `serde_derive` and `serde_json` but NOT `serde`
# itself — and `#[derive(Deserialize)]` expands to a path that references
# the `serde` crate root, so we depend on it explicitly with the `derive`
# feature.
serde = { version = "1", features = ["derive"] }
[target.'cfg(target_os = "windows")'.dependencies] [target.'cfg(target_os = "windows")'.dependencies]
windows-service = "0.6" windows-service = "0.6"
+18 -1
View File
@@ -140,9 +140,26 @@ impl InvokeUiCM for HeadlessCm {
fn remove_connection(&self, id: i32, _close: bool) { fn remove_connection(&self, id: i32, _close: bool) {
trace(&format!("remove_connection: id={id}")); trace(&format!("remove_connection: id={id}"));
let entry = self.approved.lock().unwrap().remove(&id); // Remove this session and read how many approved sessions remain, while
// holding the lock so the check is atomic against concurrent add/remove.
let (entry, remaining) = {
let mut approved = self.approved.lock().unwrap();
let entry = approved.remove(&id);
(entry, approved.len())
};
if let Some((peer_id, name)) = entry { if let Some((peer_id, name)) = entry {
// Only notify once EVERY approved session has ended. With multiple
// concurrent sessions (a second supporter, or a file-transfer
// session running alongside remote control) the user shouldn't get
// a "session ended" banner while other sessions are still open —
// they'd see one banner per close even though support is ongoing.
if remaining == 0 {
std::thread::spawn(move || show_session_ended(&peer_id, &name)); std::thread::spawn(move || show_session_ended(&peer_id, &name));
} else {
trace(&format!(
"remove_connection: {remaining} approved session(s) still open; suppressing banner"
));
}
} }
} }
+617
View File
@@ -0,0 +1,617 @@
// User-login tracking.
//
// Polls the Windows Terminal Services session table at a low cadence and
// reports logon / logoff events to the rustdesk-server admin API. Each
// event carries an explicit unix timestamp — for logons that's the OS's
// session `ConnectTime` (so the recorded time is when the user actually
// signed in, not when the agent first noticed them); for logoffs it's
// the agent's wall clock at the moment the session disappeared.
//
// Architecture mirrors `unattended_password`:
// * One background thread, its own current-thread Tokio runtime, no
// entanglement with the SCM supervisor's poll loop.
// * In-memory queue with retry-with-backoff on transport / ID_NOT_FOUND
// errors (the agent's first POST routinely races rendezvous
// registration). Events that never land are eventually dropped to
// bound memory — see DROP_AFTER.
// * Server-side dedup via UNIQUE INDEX
// (peer_id, kind, session_id, at, username) lets the agent re-emit
// the same `logon@ConnectTime` event on every service restart without
// piling up duplicate rows; agent-side state stays in memory only.
//
// Known limits (v1):
// * Lock / unlock are not reported — they don't change the
// WTSEnumerateSessions output we diff on.
// * Logoffs that happen while the service is down are not detected.
// The next service start sees "no session" and has nothing to diff
// against; this leaves a logon without a paired logoff in the UI.
// Tradeoff vs. persisting a snapshot to disk; revisit if operators
// ask for it.
use anyhow::{anyhow, Result};
use std::collections::HashMap;
use std::sync::Mutex;
use std::time::Duration;
/// How often to poll the WTS session table. A user can't meaningfully
/// log in / out faster than this, and the OS keeps the table cheap to
/// enumerate (it's a kernel-side struct, not a registry scan).
const POLL_INTERVAL: Duration = Duration::from_secs(5);
/// How often to attempt a flush of the pending queue. Decoupled from the
/// poll interval so a transient server outage doesn't slow down session
/// observation; we keep observing locally and just back off the network
/// retry.
const FLUSH_INTERVAL_BASE: Duration = Duration::from_secs(5);
const FLUSH_INTERVAL_MAX: Duration = Duration::from_secs(60);
/// Drop events from the queue after this many failed delivery attempts.
/// At backoff cap = 60s, this is ~6 hours of trying — enough to ride out
/// a long server-side outage, short enough that a permanently-misconfigured
/// agent doesn't blow up memory.
const DROP_AFTER: u32 = 360;
/// Cap per request — must match the server's MAX_EVENTS_PER_POST. Server
/// rejects anything larger with a 400 so we'd retry forever if we exceeded
/// it.
const MAX_EVENTS_PER_POST: usize = 256;
/// One observed session snapshot. Equality by `(session_id, connect_time)`
/// — Windows can recycle session IDs across logins, so we use the OS-
/// reported `ConnectTime` as the disambiguator. Two snapshots compare
/// equal iff they describe the *same* logical login.
#[derive(Clone, Debug)]
struct Session {
session_id: u32,
/// FILETIME → unix epoch seconds. 0 if the OS returned an unparseable
/// value (very old Windows builds); we still report `logon` but the
/// server-side dedup degrades to "first observation wins".
connect_unix: i64,
username: String,
domain: String,
/// "console" | "rdp" | "" (unknown).
session_kind: String,
}
#[derive(Clone, Debug)]
struct PendingEvent {
at: i64,
kind: &'static str,
username: String,
domain: String,
session_id: u32,
session_kind: String,
/// Number of times we've tried to flush this event. Used to drop
/// rows that have been retrying since forever.
attempts: u32,
}
#[cfg(target_os = "windows")]
static QUEUE: Mutex<Vec<PendingEvent>> = Mutex::new(Vec::new());
/// Kick off the background tracker. Returns immediately. Safe to call
/// multiple times (subsequent calls are no-ops) — gated by an
/// `AtomicBool`.
pub fn start() {
#[cfg(not(target_os = "windows"))]
{
// Login tracking is Windows-only; on other platforms the call is
// a no-op so cross-platform builds (CI lint runs on Linux) link.
}
#[cfg(target_os = "windows")]
{
use std::sync::atomic::{AtomicBool, Ordering};
static STARTED: AtomicBool = AtomicBool::new(false);
if STARTED.swap(true, Ordering::SeqCst) {
return;
}
std::thread::spawn(move || {
let rt = match tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
{
Ok(rt) => rt,
Err(e) => {
log::warn!("login-events: build runtime: {e}");
return;
}
};
rt.block_on(run_loop());
});
}
}
#[cfg(target_os = "windows")]
async fn run_loop() {
// Diff snapshot. Keyed by session_id; value carries connect_unix so we
// can detect "session id reused for a new login" as well as "session
// disappeared".
let mut prev: HashMap<u32, Session> = HashMap::new();
let mut first_poll = true;
let mut flush_backoff = FLUSH_INTERVAL_BASE;
loop {
match enumerate_active_user_sessions() {
Ok(snapshot) => {
let now_unix = hbb_common::chrono::Utc::now().timestamp();
let curr: HashMap<u32, Session> =
snapshot.into_iter().map(|s| (s.session_id, s)).collect();
let events =
diff_into_events(&prev, &curr, first_poll, now_unix);
if !events.is_empty() {
let mut q = QUEUE.lock().unwrap();
q.extend(events);
}
prev = curr;
first_poll = false;
}
Err(e) => {
log::warn!("login-events: enumerate failed: {e:#}");
}
}
// Try to flush. If the queue is empty this is cheap (no network).
// The flush also handles its own retry / drop semantics — we just
// adjust our local cadence based on whether it succeeded.
match flush_once().await {
FlushOutcome::Idle | FlushOutcome::AllSent => {
flush_backoff = FLUSH_INTERVAL_BASE;
}
FlushOutcome::Failed => {
flush_backoff = (flush_backoff * 2).min(FLUSH_INTERVAL_MAX);
}
}
// Poll cadence is constant; the network backoff doesn't slow down
// observation — that would risk missing logoffs while the server
// is down. We just delay the retry of pending events.
tokio::time::sleep(POLL_INTERVAL.min(flush_backoff)).await;
}
}
#[cfg(target_os = "windows")]
fn diff_into_events(
prev: &HashMap<u32, Session>,
curr: &HashMap<u32, Session>,
first_poll: bool,
now_unix: i64,
) -> Vec<PendingEvent> {
let mut out = Vec::new();
// New / changed sessions → logon.
for (sid, sess) in curr {
let changed = match prev.get(sid) {
None => true,
Some(old) => {
// Same session id but a different connect_time → the
// OS recycled the slot for a fresh login. Emit a
// logoff for the old occupant and a logon for the new.
if old.connect_unix != sess.connect_unix
|| old.username != sess.username
{
out.push(PendingEvent {
at: now_unix,
kind: "logoff",
username: old.username.clone(),
domain: old.domain.clone(),
session_id: *sid,
session_kind: old.session_kind.clone(),
attempts: 0,
});
true
} else {
false
}
}
};
if !changed {
continue;
}
// ConnectTime can be 0 on the login-screen session even when a
// user is shown as logged in (rare edge); fall back to `now` so
// the row still lands somewhere sensible.
let at = if sess.connect_unix > 0 {
sess.connect_unix
} else {
now_unix
};
out.push(PendingEvent {
at,
kind: "logon",
username: sess.username.clone(),
domain: sess.domain.clone(),
session_id: *sid,
session_kind: sess.session_kind.clone(),
attempts: 0,
});
}
// Disappeared sessions → logoff. Skipped on the very first poll
// because we have no baseline to diff against — see the module-level
// "first poll: emit logons-only" note.
if !first_poll {
for (sid, old) in prev {
if !curr.contains_key(sid) {
out.push(PendingEvent {
at: now_unix,
kind: "logoff",
username: old.username.clone(),
domain: old.domain.clone(),
session_id: *sid,
session_kind: old.session_kind.clone(),
attempts: 0,
});
}
}
}
out
}
#[cfg(target_os = "windows")]
enum FlushOutcome {
Idle,
AllSent,
Failed,
}
#[cfg(target_os = "windows")]
async fn flush_once() -> FlushOutcome {
// Take a snapshot under the lock so we don't hold it across the
// (potentially slow) network call. If the POST succeeds we drop the
// matching prefix from the queue; if it fails we put back the
// unsent tail with bumped attempt counters.
let batch: Vec<PendingEvent> = {
let mut q = QUEUE.lock().unwrap();
if q.is_empty() {
return FlushOutcome::Idle;
}
let take = q.len().min(MAX_EVENTS_PER_POST);
q.drain(..take).collect()
};
let posted = post_batch(&batch).await;
match posted {
Ok(()) => {
// Server accepted (or returned ID_NOT_FOUND, which we treat
// as "drop and continue" because no amount of retrying will
// make the peer materialize without the rendezvous loop in
// --server, which the unattended_password reporter already
// hammers on; once that succeeds the next batch lands).
FlushOutcome::AllSent
}
Err(e) => {
log::warn!(
"login-events: flush of {} event(s) failed: {e:#}",
batch.len(),
);
// Put the batch back with bumped attempt counters, modulo
// events that have hit the drop threshold. Prepend so that
// any events pushed by the poll loop between the drain and
// here stay after the (older) failed batch.
let mut requeued: Vec<PendingEvent> = batch
.into_iter()
.filter_map(|mut ev| {
ev.attempts = ev.attempts.saturating_add(1);
if ev.attempts >= DROP_AFTER {
log::warn!(
"login-events: dropping event after {} attempts: \
kind={} session={} user={}",
ev.attempts, ev.kind, ev.session_id, ev.username,
);
None
} else {
Some(ev)
}
})
.collect();
let mut q = QUEUE.lock().unwrap();
let tail: Vec<PendingEvent> = q.drain(..).collect();
requeued.extend(tail);
*q = requeued;
FlushOutcome::Failed
}
}
}
#[cfg(target_os = "windows")]
async fn post_batch(batch: &[PendingEvent]) -> Result<()> {
let api = librustdesk::common::get_api_server(
hbb_common::config::Config::get_option("api-server"),
hbb_common::config::Config::get_option("custom-rendezvous-server"),
);
if api.is_empty() {
return Err(anyhow!("no api-server configured yet"));
}
let url = format!("{api}/api/agent/login-event");
let id = hbb_common::config::Config::get_id();
let uuid = librustdesk::common::encode64(hbb_common::get_uuid());
let events: Vec<hbb_common::serde_json::Value> = batch
.iter()
.map(|ev| {
hbb_common::serde_json::json!({
"at": ev.at,
"kind": ev.kind,
"username": ev.username,
"domain": ev.domain,
"session_id": ev.session_id,
"session_kind": ev.session_kind,
})
})
.collect();
let body = hbb_common::serde_json::json!({
"id": id,
"uuid": uuid,
"events": events,
})
.to_string();
let headers = librustdesk::hbbs_http::sign::build_signed_headers(
"POST",
"/api/agent/login-event",
body.as_bytes(),
)
.unwrap_or_default();
let resp = librustdesk::common::post_request(url, body, &headers)
.await
.map_err(|e| anyhow!("post: {e}"))?;
let trimmed = resp.trim();
if trimmed == "OK" || trimmed == "ID_NOT_FOUND" {
// ID_NOT_FOUND is "peer not registered yet" — happens on the
// first few flushes after a fresh install, before the
// rendezvous loop in --server has created the peer row. The
// unattended_password reporter races this same window; once
// either of them succeeds the peer row exists and subsequent
// posts land. We treat it as success here so the agent doesn't
// pile up unbounded retries — if rendezvous never registers,
// the heartbeat path is also broken and the operator has
// bigger problems than missing login events.
Ok(())
} else {
Err(anyhow!("unexpected response: {trimmed}"))
}
}
// ─────────────────────────── Win32 session enumeration ────────────────────
//
// Same shape as service.rs's find_active_user_session — we declare just
// the WTS functions we touch rather than pull in another bindgen. The
// types are tiny and ABI-stable.
#[cfg(target_os = "windows")]
#[repr(C)]
struct WtsSessionInfoW {
session_id: u32,
win_station_name: *mut u16,
state: i32,
}
#[cfg(target_os = "windows")]
#[repr(C)]
struct WtsInfoW {
state: i32,
session_id: u32,
incoming_bytes: u32,
outgoing_bytes: u32,
incoming_frames: u32,
outgoing_frames: u32,
incoming_compressed_bytes: u32,
outgoing_compressed_bytes: u32,
win_station_name: [u16; 32],
domain: [u16; 17],
user_name: [u16; 21],
connect_time: i64,
disconnect_time: i64,
last_input_time: i64,
logon_time: i64,
current_time: i64,
}
#[cfg(target_os = "windows")]
extern "system" {
fn WTSEnumerateSessionsW(
h_server: winapi::shared::ntdef::HANDLE,
reserved: u32,
version: u32,
pp_session_info: *mut *mut WtsSessionInfoW,
p_count: *mut u32,
) -> i32;
fn WTSFreeMemory(p_memory: *mut std::ffi::c_void);
fn WTSQuerySessionInformationW(
h_server: winapi::shared::ntdef::HANDLE,
session_id: u32,
info_class: i32,
pp_buffer: *mut *mut u16,
p_bytes_returned: *mut u32,
) -> i32;
}
#[cfg(target_os = "windows")]
const WTS_ACTIVE: i32 = 0;
#[cfg(target_os = "windows")]
const WTS_USER_NAME: i32 = 5;
#[cfg(target_os = "windows")]
const WTS_DOMAIN_NAME: i32 = 7;
#[cfg(target_os = "windows")]
const WTS_CLIENT_PROTOCOL_TYPE: i32 = 16;
#[cfg(target_os = "windows")]
const WTS_SESSION_INFO: i32 = 24;
#[cfg(target_os = "windows")]
fn enumerate_active_user_sessions() -> Result<Vec<Session>> {
let mut sessions: *mut WtsSessionInfoW = std::ptr::null_mut();
let mut count: u32 = 0;
let ok = unsafe {
WTSEnumerateSessionsW(
std::ptr::null_mut(),
0,
1,
&mut sessions,
&mut count,
)
};
if ok == 0 || sessions.is_null() {
return Err(anyhow!(
"WTSEnumerateSessionsW failed: {}",
std::io::Error::last_os_error()
));
}
let mut out = Vec::new();
for i in 0..count {
let info = unsafe { &*sessions.add(i as usize) };
if info.state != WTS_ACTIVE {
continue;
}
let sid = info.session_id;
// Skip sessions without a logged-in user (login screen, Session 0).
let username = match query_wide(sid, WTS_USER_NAME) {
Some(s) if !s.is_empty() => s,
_ => continue,
};
let domain = query_wide(sid, WTS_DOMAIN_NAME).unwrap_or_default();
let session_kind = match query_protocol_type(sid) {
Some(0) => "console".to_string(),
Some(2) => "rdp".to_string(),
Some(_) | None => String::new(),
};
let connect_unix = query_connect_time(sid).unwrap_or(0);
out.push(Session {
session_id: sid,
connect_unix,
username,
domain,
session_kind,
});
}
unsafe { WTSFreeMemory(sessions as *mut std::ffi::c_void) };
Ok(out)
}
/// Pull a WCHAR-string-shaped value (WTSUserName / WTSDomainName / …)
/// and convert to UTF-8. Returns None on failure or empty result.
#[cfg(target_os = "windows")]
fn query_wide(session_id: u32, info_class: i32) -> Option<String> {
let mut buf: *mut u16 = std::ptr::null_mut();
let mut bytes: u32 = 0;
let ok = unsafe {
WTSQuerySessionInformationW(
std::ptr::null_mut(),
session_id,
info_class,
&mut buf,
&mut bytes,
)
};
if ok == 0 || buf.is_null() {
return None;
}
let s = unsafe { wide_to_string(buf, bytes) };
unsafe { WTSFreeMemory(buf as *mut std::ffi::c_void) };
if s.is_empty() {
None
} else {
Some(s)
}
}
/// WTSClientProtocolType returns a single USHORT (2 bytes). Buffer is
/// allocated by Windows; we read the 16-bit value and free.
#[cfg(target_os = "windows")]
fn query_protocol_type(session_id: u32) -> Option<u16> {
let mut buf: *mut u16 = std::ptr::null_mut();
let mut bytes: u32 = 0;
let ok = unsafe {
WTSQuerySessionInformationW(
std::ptr::null_mut(),
session_id,
WTS_CLIENT_PROTOCOL_TYPE,
&mut buf,
&mut bytes,
)
};
if ok == 0 || buf.is_null() || bytes < 2 {
if !buf.is_null() {
unsafe { WTSFreeMemory(buf as *mut std::ffi::c_void) };
}
return None;
}
let val = unsafe { *buf };
unsafe { WTSFreeMemory(buf as *mut std::ffi::c_void) };
Some(val)
}
/// Pull ConnectTime from WTSINFOW (the per-session struct returned by
/// WTSQuerySessionInformation with class WTSSessionInfo). FILETIME 0
/// means "OS doesn't know" (rare — happens on the login-screen session
/// before a user signs in); we surface that as None and the caller
/// falls back to `now()`.
#[cfg(target_os = "windows")]
fn query_connect_time(session_id: u32) -> Option<i64> {
let mut buf: *mut u16 = std::ptr::null_mut();
let mut bytes: u32 = 0;
let ok = unsafe {
WTSQuerySessionInformationW(
std::ptr::null_mut(),
session_id,
WTS_SESSION_INFO,
&mut buf,
&mut bytes,
)
};
if ok == 0 || buf.is_null() || (bytes as usize) < std::mem::size_of::<WtsInfoW>() {
if !buf.is_null() {
unsafe { WTSFreeMemory(buf as *mut std::ffi::c_void) };
}
return None;
}
let info: &WtsInfoW = unsafe { &*(buf as *const WtsInfoW) };
let ft = info.connect_time;
unsafe { WTSFreeMemory(buf as *mut std::ffi::c_void) };
let unix = filetime_to_unix(ft);
if unix > 0 {
Some(unix)
} else {
None
}
}
/// FILETIME is 100-nanosecond ticks since 1601-01-01 UTC. Convert to
/// unix seconds; clamp to >=0 since the table column is signed but a
/// pre-epoch ConnectTime is nonsense for our purposes.
#[cfg(target_os = "windows")]
fn filetime_to_unix(ft: i64) -> i64 {
// 11644473600 = seconds between 1601-01-01 and 1970-01-01.
const TICKS_PER_SEC: i64 = 10_000_000;
const EPOCH_DIFF_SECS: i64 = 11_644_473_600;
if ft <= 0 {
return 0;
}
let secs_since_1601 = ft / TICKS_PER_SEC;
let unix = secs_since_1601 - EPOCH_DIFF_SECS;
unix.max(0)
}
/// Convert a NUL-terminated UTF-16 buffer to a Rust String. `bytes` is
/// the byte count Windows returned (NOT the char count); we trust the
/// trailing NUL but cap on `bytes / 2` so a malformed buffer can't run
/// us into unallocated memory.
#[cfg(target_os = "windows")]
unsafe fn wide_to_string(buf: *const u16, bytes: u32) -> String {
if buf.is_null() || bytes == 0 {
return String::new();
}
let max_chars = (bytes as usize) / 2;
let mut len = 0usize;
while len < max_chars && *buf.add(len) != 0 {
len += 1;
}
let slice = std::slice::from_raw_parts(buf, len);
String::from_utf16_lossy(slice)
}
+3
View File
@@ -29,6 +29,9 @@ mod inventory;
mod cm_popup; mod cm_popup;
#[cfg(target_os = "windows")] #[cfg(target_os = "windows")]
mod exec; mod exec;
mod login_events;
mod perf;
mod perf_events;
#[cfg(target_os = "windows")] #[cfg(target_os = "windows")]
mod service; mod service;
#[cfg(target_os = "windows")] #[cfg(target_os = "windows")]
+311
View File
@@ -0,0 +1,311 @@
// Continuous performance sampling.
//
// One sample per minute: overall CPU%, memory used / total, top
// process by CPU, top process by memory, uptime, process count.
// Posted in batches to /api/agent/metrics; surfaced on the admin
// device detail page as a 24 h sparkline plus a "live" snapshot card.
//
// Architecture mirrors `login_events`:
// * One background thread, its own current-thread Tokio runtime.
// * Sysinfo crate (vendored under hbb_common) does the cross-cutting
// work — we keep one `System` instance alive across iterations so
// its per-process CPU% accounting (which is differential against
// the previous refresh) stays accurate.
// * In-memory queue with retry-with-backoff and a hard drop cap so
// a permanently-misconfigured agent can't balloon memory.
// * Server-side `INSERT OR IGNORE` keyed on (peer_id, at) dedups any
// retries that get there twice.
//
// Process CPU% on Windows is reported by sysinfo as "% of one core",
// so a busy multi-threaded process can read >100%. We normalise by
// `cpus().len()` so the snapshot card's "top CPU: chrome.exe 18%"
// is comparable to the overall CPU%. Memory is in bytes from sysinfo;
// we convert to MB on the wire.
use anyhow::{anyhow, Result};
use std::sync::Mutex;
use std::time::Duration;
/// Sampling cadence. 60 s strikes a balance between resolution (enough
/// granularity to spot a 2-minute CPU spike) and storage (~1440 rows /
/// device / day on the server side).
const SAMPLE_INTERVAL: Duration = Duration::from_secs(60);
/// Flush cadence on the happy path. The reporter tries to flush after
/// every sample anyway; this is the floor used by the network-error
/// backoff before it doubles.
const FLUSH_INTERVAL_BASE: Duration = Duration::from_secs(60);
const FLUSH_INTERVAL_MAX: Duration = Duration::from_secs(15 * 60);
/// At backoff cap = 15 min, this is ~3 days of trying — enough to ride
/// out a long server-side outage. Beyond that we drop, on the same
/// reasoning as `login_events::DROP_AFTER`.
const DROP_AFTER: u32 = 300;
/// Must match the server's MAX_SAMPLES_PER_POST.
const MAX_SAMPLES_PER_POST: usize = 512;
#[derive(Clone, Debug, Default)]
struct PendingSample {
at: i64,
cpu_pct: f64,
mem_used_mb: i64,
mem_total_mb: i64,
proc_count: i64,
uptime_secs: i64,
top_cpu_name: String,
top_cpu_pct: f64,
top_mem_name: String,
top_mem_mb: i64,
attempts: u32,
}
#[cfg(target_os = "windows")]
static QUEUE: Mutex<Vec<PendingSample>> = Mutex::new(Vec::new());
/// Kick off the metrics sampler. Safe to call multiple times — guarded
/// by an `AtomicBool` so a stray second call is a no-op.
pub fn start() {
#[cfg(not(target_os = "windows"))]
{
// Cross-platform stub; the implementation only runs on Windows
// because that's the only OS hello-agent ships on. Sysinfo is
// cross-platform, so a future Linux build can drop the cfg-gate
// and reuse this module unchanged.
}
#[cfg(target_os = "windows")]
{
use std::sync::atomic::{AtomicBool, Ordering};
static STARTED: AtomicBool = AtomicBool::new(false);
if STARTED.swap(true, Ordering::SeqCst) {
return;
}
std::thread::spawn(move || {
let rt = match tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
{
Ok(rt) => rt,
Err(e) => {
log::warn!("perf: build runtime: {e}");
return;
}
};
rt.block_on(run_loop());
});
}
}
#[cfg(target_os = "windows")]
async fn run_loop() {
use hbb_common::sysinfo::System;
// One System instance, refreshed across iterations. Sysinfo's
// per-process CPU% is computed against the previous refresh, so a
// throwaway `System::new()` each tick would always read 0%.
let mut sys = System::new();
// Prime CPU + processes once so the first real sample isn't NaN.
// The MINIMUM_CPU_UPDATE_INTERVAL sleep is what sysinfo's docs
// recommend between back-to-back CPU refreshes; here we have the
// full SAMPLE_INTERVAL coming up so we just refresh-and-wait.
sys.refresh_cpu();
sys.refresh_processes();
tokio::time::sleep(SAMPLE_INTERVAL).await;
let mut flush_backoff = FLUSH_INTERVAL_BASE;
loop {
if let Some(sample) = collect_sample(&mut sys) {
QUEUE.lock().unwrap().push(sample);
}
match flush_once().await {
FlushOutcome::Idle | FlushOutcome::AllSent => {
flush_backoff = FLUSH_INTERVAL_BASE;
}
FlushOutcome::Failed => {
flush_backoff = (flush_backoff * 2).min(FLUSH_INTERVAL_MAX);
}
}
// We always sleep SAMPLE_INTERVAL — the network backoff slows
// the *retry* of pending events, not observation. Sampling
// continues at full cadence so a transient outage doesn't punch
// a hole in the chart on either side of the dropped window.
tokio::time::sleep(SAMPLE_INTERVAL.min(flush_backoff)).await;
}
}
#[cfg(target_os = "windows")]
fn collect_sample(sys: &mut hbb_common::sysinfo::System) -> Option<PendingSample> {
sys.refresh_cpu();
sys.refresh_memory();
sys.refresh_processes();
let at = hbb_common::chrono::Utc::now().timestamp();
let cpu_pct = sys.global_cpu_info().cpu_usage() as f64;
// Sysinfo returns bytes on every platform; the server schema stores
// MB to keep row sizes small at scale. Divide-then-cast bounds
// arithmetic to u64 territory.
let mem_total_mb = (sys.total_memory() / 1024 / 1024) as i64;
let mem_used_mb = (sys.used_memory() / 1024 / 1024) as i64;
let uptime_secs = sys.uptime() as i64;
// Per-process CPU% from sysinfo is normalised per core: a 4-thread
// process pinning 4 cores on a 4-core machine reads 400%. Divide
// by core count so the snapshot card's number matches the overall
// CPU% reading (both 0-100 of the whole machine).
let cpu_count = sys.cpus().len().max(1) as f64;
let mut top_cpu: Option<(&str, f32)> = None;
let mut top_mem: Option<(&str, u64)> = None;
let mut proc_count = 0i64;
for proc in sys.processes().values() {
proc_count += 1;
let name = proc.name();
// Some kernel-side rows show up with empty names on Windows;
// skip them so we don't ever render a top-CPU row with no
// label.
if name.is_empty() {
continue;
}
let cu = proc.cpu_usage();
if cu.is_finite() && cu > top_cpu.map(|(_, v)| v).unwrap_or(0.0) {
top_cpu = Some((name, cu));
}
let mu = proc.memory();
if mu > top_mem.map(|(_, v)| v).unwrap_or(0) {
top_mem = Some((name, mu));
}
}
let (top_cpu_name, top_cpu_pct) = top_cpu
.map(|(n, v)| (n.to_string(), (v as f64 / cpu_count).min(100.0)))
.unwrap_or_default();
let (top_mem_name, top_mem_mb) = top_mem
.map(|(n, v)| (n.to_string(), (v / 1024 / 1024) as i64))
.unwrap_or_default();
Some(PendingSample {
at,
cpu_pct,
mem_used_mb,
mem_total_mb,
proc_count,
uptime_secs,
top_cpu_name,
top_cpu_pct,
top_mem_name,
top_mem_mb,
attempts: 0,
})
}
#[cfg(target_os = "windows")]
enum FlushOutcome {
Idle,
AllSent,
Failed,
}
#[cfg(target_os = "windows")]
async fn flush_once() -> FlushOutcome {
let batch: Vec<PendingSample> = {
let mut q = QUEUE.lock().unwrap();
if q.is_empty() {
return FlushOutcome::Idle;
}
let take = q.len().min(MAX_SAMPLES_PER_POST);
q.drain(..take).collect()
};
match post_batch(&batch).await {
Ok(()) => FlushOutcome::AllSent,
Err(e) => {
log::warn!(
"perf: flush of {} sample(s) failed: {e:#}",
batch.len(),
);
let mut requeued: Vec<PendingSample> = batch
.into_iter()
.filter_map(|mut s| {
s.attempts = s.attempts.saturating_add(1);
if s.attempts >= DROP_AFTER {
log::warn!(
"perf: dropping sample after {} attempts: at={}",
s.attempts, s.at,
);
None
} else {
Some(s)
}
})
.collect();
let mut q = QUEUE.lock().unwrap();
let tail: Vec<PendingSample> = q.drain(..).collect();
requeued.extend(tail);
*q = requeued;
FlushOutcome::Failed
}
}
}
#[cfg(target_os = "windows")]
async fn post_batch(batch: &[PendingSample]) -> Result<()> {
let api = librustdesk::common::get_api_server(
hbb_common::config::Config::get_option("api-server"),
hbb_common::config::Config::get_option("custom-rendezvous-server"),
);
if api.is_empty() {
return Err(anyhow!("no api-server configured yet"));
}
let url = format!("{api}/api/agent/metrics");
let id = hbb_common::config::Config::get_id();
let uuid = librustdesk::common::encode64(hbb_common::get_uuid());
let samples: Vec<hbb_common::serde_json::Value> = batch
.iter()
.map(|s| {
hbb_common::serde_json::json!({
"at": s.at,
"cpu_pct": s.cpu_pct,
"mem_used_mb": s.mem_used_mb,
"mem_total_mb": s.mem_total_mb,
"proc_count": s.proc_count,
"uptime_secs": s.uptime_secs,
"top_cpu_name": s.top_cpu_name,
"top_cpu_pct": s.top_cpu_pct,
"top_mem_name": s.top_mem_name,
"top_mem_mb": s.top_mem_mb,
})
})
.collect();
let body = hbb_common::serde_json::json!({
"id": id,
"uuid": uuid,
"samples": samples,
})
.to_string();
let headers = librustdesk::hbbs_http::sign::build_signed_headers(
"POST",
"/api/agent/metrics",
body.as_bytes(),
)
.unwrap_or_default();
let resp = librustdesk::common::post_request(url, body, &headers)
.await
.map_err(|e| anyhow!("post: {e}"))?;
let trimmed = resp.trim();
if trimmed == "OK" || trimmed == "ID_NOT_FOUND" {
// ID_NOT_FOUND mirrors the unattended_password / login_events
// contract: server doesn't know the peer yet (rendezvous race
// on first boot). We drop the batch rather than retry forever;
// the next sample lands once /api/heartbeat has created the
// peer row.
Ok(())
} else {
Err(anyhow!("unexpected response: {trimmed}"))
}
}
+507
View File
@@ -0,0 +1,507 @@
// Windows event-log scraper for performance-related events.
//
// Pulls fresh entries from three channels Microsoft itself uses to
// flag "the OS noticed this machine was slow":
//
// * `Microsoft-Windows-Diagnostics-Performance/Operational` — boot,
// shutdown, standby, resume degradation (Event IDs 100/200/300/400
// family), each carrying total time + the component that caused it.
// * `Microsoft-Windows-Resource-Exhaustion-Detector/Operational` —
// Event ID 2004 fires when virtual memory hits the wall, with the
// top processes by working set.
// * `System` — IDs 41 (Kernel-Power unexpected reboot), 6008 (dirty
// shutdown), 1001 (BugCheck / BSOD).
//
// Cadence is intentionally low (5 min): these events are sparse — a
// healthy machine may produce zero per day, a sick one a handful.
//
// State machine per channel: a numeric RecordId cursor persisted in
// the agent config (`Config::set_option`). First run with no cursor
// pulls the trailing 7 days bounded by `MAX_FIRST_RUN_EVENTS`; every
// subsequent run pulls everything with `EventRecordID > cursor`. The
// cursor advances on observation, not on successful POST — see the
// `flush_once` comment for the tradeoff.
#![cfg_attr(not(target_os = "windows"), allow(dead_code))]
use anyhow::{anyhow, Result};
use serde::Deserialize;
use std::sync::Mutex;
use std::time::Duration;
/// How often to scrape. 5 min keeps the UI freshness reasonable
/// (operators looking at a complaint won't usually see a stale view)
/// without burning host CPU on PowerShell startup every minute.
const SCRAPE_INTERVAL: Duration = Duration::from_secs(5 * 60);
const FLUSH_INTERVAL_BASE: Duration = Duration::from_secs(60);
const FLUSH_INTERVAL_MAX: Duration = Duration::from_secs(15 * 60);
/// First-run lookback. A box that's been alive for months would
/// otherwise dump thousands of `Diagnostics-Performance` events on a
/// fresh install — useful in theory, but the UI shows the most recent
/// 20 and the older entries are mostly noise.
const MAX_FIRST_RUN_EVENTS: u32 = 100;
/// Per-scrape cap so a misbehaving event log can't blow up an
/// individual run. Anything beyond this on a single pass is dropped on
/// the floor and picked up on the next scrape (cursor advances to the
/// last seen, so we don't oscillate).
const MAX_PER_SCRAPE: u32 = 200;
/// Must match the server's MAX_EVENTS_PER_POST.
const MAX_EVENTS_PER_POST: usize = 128;
/// Drop pending events after this many retries — at 15 min cap this
/// is ~5 days, plenty for any realistic outage window.
const DROP_AFTER: u32 = 480;
/// One channel config: the WEL log name, the short provider tag stored
/// server-side (matches the `devices.perf_src_*` i18n keys), and an
/// optional ID allow-list. `None` means "everything in this channel";
/// `Some(&[…])` restricts to those event IDs (used for `System` to
/// avoid pulling boot / service-start chatter).
struct ChannelCfg {
provider: &'static str,
log_name: &'static str,
event_ids: Option<&'static [u32]>,
}
const CHANNELS: &[ChannelCfg] = &[
ChannelCfg {
provider: "diag-perf",
log_name: "Microsoft-Windows-Diagnostics-Performance/Operational",
event_ids: None,
},
ChannelCfg {
provider: "res-exh",
log_name: "Microsoft-Windows-Resource-Exhaustion-Detector/Operational",
event_ids: None,
},
ChannelCfg {
// Subset of `System` that's actually performance-relevant —
// 41 = Kernel-Power unexpected reboot, 6008 = dirty shutdown,
// 1001 = BugCheck (BSOD report). Adding more IDs is just a
// matter of extending this slice.
provider: "system",
log_name: "System",
event_ids: Some(&[41, 6008, 1001]),
},
];
/// JSON shape we ask PowerShell to emit. Mirrors `PerfEventIn` on the
/// server.
#[derive(Debug, Clone, Deserialize)]
struct RawEvent {
at: i64,
event_id: i64,
level: i64,
record_id: i64,
#[serde(default)]
summary: String,
#[serde(default)]
detail: String,
}
#[derive(Clone, Debug)]
struct PendingPerfEvent {
provider: &'static str,
at: i64,
event_id: i64,
level: i64,
record_id: i64,
summary: String,
detail: String,
attempts: u32,
}
#[cfg(target_os = "windows")]
static QUEUE: Mutex<Vec<PendingPerfEvent>> = Mutex::new(Vec::new());
pub fn start() {
#[cfg(not(target_os = "windows"))]
{}
#[cfg(target_os = "windows")]
{
use std::sync::atomic::{AtomicBool, Ordering};
static STARTED: AtomicBool = AtomicBool::new(false);
if STARTED.swap(true, Ordering::SeqCst) {
return;
}
std::thread::spawn(move || {
let rt = match tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
{
Ok(rt) => rt,
Err(e) => {
log::warn!("perf-events: build runtime: {e}");
return;
}
};
rt.block_on(run_loop());
});
}
}
#[cfg(target_os = "windows")]
async fn run_loop() {
// Stagger the first scrape so we don't pile on the
// unattended_password POST + login_events first poll + sysinfo
// upload all at the same boot moment. PowerShell first invocation
// is ~1 s of CPU; doing it 30 s in instead of immediately is
// friendlier on cold-boot CPU load.
tokio::time::sleep(Duration::from_secs(30)).await;
let mut flush_backoff = FLUSH_INTERVAL_BASE;
loop {
for ch in CHANNELS {
match scrape_channel(ch) {
Ok(events) if !events.is_empty() => {
log::info!(
"perf-events: {} new from {}",
events.len(),
ch.provider,
);
QUEUE.lock().unwrap().extend(events);
}
Ok(_) => {}
Err(e) => {
log::warn!(
"perf-events: scrape {} failed: {e:#}",
ch.provider,
);
}
}
}
match flush_once().await {
FlushOutcome::Idle | FlushOutcome::AllSent => {
flush_backoff = FLUSH_INTERVAL_BASE;
}
FlushOutcome::Failed => {
flush_backoff = (flush_backoff * 2).min(FLUSH_INTERVAL_MAX);
}
}
tokio::time::sleep(SCRAPE_INTERVAL.min(flush_backoff)).await;
}
}
// ─────────────────────────── per-channel scrape ───────────────────────────
#[cfg(target_os = "windows")]
fn scrape_channel(ch: &ChannelCfg) -> Result<Vec<PendingPerfEvent>> {
let cursor = read_cursor(ch.provider);
let script = build_script(ch, cursor);
let stdout = run_powershell(&script)?;
if stdout.is_empty() {
return Ok(Vec::new());
}
// ConvertTo-Json with a single-element array still emits a JSON
// object (PowerShell's "unrolling" quirk); coerce both shapes.
let trimmed = stdout.trim();
if trimmed == "[]" || trimmed.is_empty() {
return Ok(Vec::new());
}
let raw: Vec<RawEvent> = match hbb_common::serde_json::from_str(trimmed) {
Ok(v) => v,
Err(_) => {
// Single-object case
match hbb_common::serde_json::from_str::<RawEvent>(trimmed) {
Ok(single) => vec![single],
Err(e) => {
return Err(anyhow!(
"PowerShell output is not valid JSON: {e}; first 200 chars: {:.200}",
trimmed
));
}
}
}
};
let mut max_record_id = cursor;
let mut out = Vec::with_capacity(raw.len());
for ev in raw {
if (ev.record_id as u64) > max_record_id {
max_record_id = ev.record_id as u64;
}
out.push(PendingPerfEvent {
provider: ch.provider,
at: ev.at,
event_id: ev.event_id,
level: ev.level,
record_id: ev.record_id,
summary: ev.summary,
detail: ev.detail,
attempts: 0,
});
}
// Advance the cursor on observation, not on successful POST. The
// tradeoff: an agent that crashes between observing and POSTing
// loses those rows from the UI. Windows still has them in the
// event log, so the operator can fall back to Event Viewer if
// they really need them; we prefer that to repeatedly re-observing
// a backlog the server has already taken (unique-index dedup
// would absorb it, but the agent's queue would grow unbounded
// every scrape).
if max_record_id > cursor {
write_cursor(ch.provider, max_record_id);
}
Ok(out)
}
/// Build the PowerShell script for one channel. Two shapes:
///
/// * `cursor == 0` (first run): `FilterHashtable` with a 7-day
/// `StartTime` and the optional ID allow-list. The hashtable form
/// is the only one that accepts both a time bound and an `Id`
/// array in one call.
/// * `cursor > 0`: `FilterXPath` with `EventRecordID > $cursor` and
/// the optional ID-list expanded to `(EventID=A or EventID=B …)`.
#[cfg(target_os = "windows")]
fn build_script(ch: &ChannelCfg, cursor: u64) -> String {
// PowerShell-quote the log name (single-quote escape = doubled
// single quote).
let q = |s: &str| s.replace('\'', "''");
let log_name_quoted = q(ch.log_name);
let filter_clause = if cursor == 0 {
// FilterHashtable + StartTime + optional Id array.
match ch.event_ids {
Some(ids) => {
let id_list = ids
.iter()
.map(|i| i.to_string())
.collect::<Vec<_>>()
.join(",");
format!(
"-FilterHashtable @{{LogName='{ln}'; StartTime=(Get-Date).AddDays(-7); Id=@({ids})}} -MaxEvents {max}",
ln = log_name_quoted,
ids = id_list,
max = MAX_FIRST_RUN_EVENTS,
)
}
None => format!(
"-FilterHashtable @{{LogName='{ln}'; StartTime=(Get-Date).AddDays(-7)}} -MaxEvents {max}",
ln = log_name_quoted,
max = MAX_FIRST_RUN_EVENTS,
),
}
} else {
// FilterXPath. Note PowerShell expands `$cursor` from the
// outer script, so we splat the literal value into the XPath.
let id_clause = match ch.event_ids {
Some(ids) => {
let or = ids
.iter()
.map(|i| format!("System/EventID={i}"))
.collect::<Vec<_>>()
.join(" or ");
format!(" and ({or})")
}
None => String::new(),
};
// Double-curly to escape format!'s own `{}` interpolation.
format!(
"-LogName '{ln}' -FilterXPath \"*[System/EventRecordID>{cur}{id}]\" -MaxEvents {max}",
ln = log_name_quoted,
cur = cursor,
id = id_clause,
max = MAX_PER_SCRAPE,
)
};
// Single-quoted PowerShell here-doc would escape too aggressively;
// we stick to plain string concatenation. The `try / catch` block
// returns '[]' on any failure so the Rust side gets a parseable
// empty array rather than a stderr blob.
format!(
r#"$ErrorActionPreference = 'SilentlyContinue'
try {{
$events = Get-WinEvent {filter} 2>$null
if ($null -eq $events) {{ '[]' ; exit 0 }}
$arr = @($events | Sort-Object RecordId | ForEach-Object {{
$msg = $_.Message
if ($null -eq $msg) {{ $msg = '' }}
$oneline = ($msg -replace "(`r?`n)+", " ").Trim()
if ($oneline.Length -gt 300) {{ $oneline = $oneline.Substring(0,300) }}
$detail = $msg
if ($detail.Length -gt 4000) {{ $detail = $detail.Substring(0,4000) }}
[PSCustomObject]@{{
at = [int64](([System.DateTimeOffset]$_.TimeCreated.ToUniversalTime()).ToUnixTimeSeconds())
event_id = $_.Id
level = $_.Level
record_id = $_.RecordId
summary = $oneline
detail = $detail
}}
}})
$arr | ConvertTo-Json -Depth 3 -Compress
}} catch {{
'[]'
}}
"#,
filter = filter_clause,
)
}
#[cfg(target_os = "windows")]
fn run_powershell(script: &str) -> Result<String> {
use std::os::windows::process::CommandExt;
use std::process::Command;
// Matches inventory.rs — hides the brief console flash when the
// agent runs interactively (dev mode); no effect in service mode.
const CREATE_NO_WINDOW: u32 = 0x08000000;
let output = Command::new("powershell.exe")
.args([
"-NoProfile",
"-NonInteractive",
"-ExecutionPolicy",
"Bypass",
"-Command",
script,
])
.creation_flags(CREATE_NO_WINDOW)
.output()
.map_err(|e| anyhow!("spawn powershell: {e}"))?;
if !output.status.success() {
let stderr = String::from_utf8_lossy(&output.stderr);
return Err(anyhow!(
"powershell exited {:?}: {}",
output.status.code(),
stderr.trim()
));
}
Ok(String::from_utf8_lossy(&output.stdout).trim().to_string())
}
// ─────────────────────────────── cursor I/O ───────────────────────────────
#[cfg(target_os = "windows")]
fn cursor_key(provider: &str) -> String {
format!("perf-event-cursor-{provider}")
}
#[cfg(target_os = "windows")]
fn read_cursor(provider: &str) -> u64 {
let raw = hbb_common::config::Config::get_option(&cursor_key(provider));
raw.parse::<u64>().unwrap_or(0)
}
#[cfg(target_os = "windows")]
fn write_cursor(provider: &str, value: u64) {
hbb_common::config::Config::set_option(
cursor_key(provider),
value.to_string(),
);
}
// ───────────────────────────────── flush ──────────────────────────────────
#[cfg(target_os = "windows")]
enum FlushOutcome {
Idle,
AllSent,
Failed,
}
#[cfg(target_os = "windows")]
async fn flush_once() -> FlushOutcome {
let batch: Vec<PendingPerfEvent> = {
let mut q = QUEUE.lock().unwrap();
if q.is_empty() {
return FlushOutcome::Idle;
}
let take = q.len().min(MAX_EVENTS_PER_POST);
q.drain(..take).collect()
};
match post_batch(&batch).await {
Ok(()) => FlushOutcome::AllSent,
Err(e) => {
log::warn!(
"perf-events: flush of {} event(s) failed: {e:#}",
batch.len(),
);
let mut requeued: Vec<PendingPerfEvent> = batch
.into_iter()
.filter_map(|mut ev| {
ev.attempts = ev.attempts.saturating_add(1);
if ev.attempts >= DROP_AFTER {
log::warn!(
"perf-events: dropping event after {} attempts: \
provider={} record_id={}",
ev.attempts, ev.provider, ev.record_id,
);
None
} else {
Some(ev)
}
})
.collect();
let mut q = QUEUE.lock().unwrap();
let tail: Vec<PendingPerfEvent> = q.drain(..).collect();
requeued.extend(tail);
*q = requeued;
FlushOutcome::Failed
}
}
}
#[cfg(target_os = "windows")]
async fn post_batch(batch: &[PendingPerfEvent]) -> Result<()> {
let api = librustdesk::common::get_api_server(
hbb_common::config::Config::get_option("api-server"),
hbb_common::config::Config::get_option("custom-rendezvous-server"),
);
if api.is_empty() {
return Err(anyhow!("no api-server configured yet"));
}
let url = format!("{api}/api/agent/perf-events");
let id = hbb_common::config::Config::get_id();
let uuid = librustdesk::common::encode64(hbb_common::get_uuid());
let events: Vec<hbb_common::serde_json::Value> = batch
.iter()
.map(|ev| {
hbb_common::serde_json::json!({
"at": ev.at,
"provider": ev.provider,
"event_id": ev.event_id,
"level": ev.level,
"record_id": ev.record_id,
"summary": ev.summary,
"detail_json": ev.detail,
})
})
.collect();
let body = hbb_common::serde_json::json!({
"id": id,
"uuid": uuid,
"events": events,
})
.to_string();
let headers = librustdesk::hbbs_http::sign::build_signed_headers(
"POST",
"/api/agent/perf-events",
body.as_bytes(),
)
.unwrap_or_default();
let resp = librustdesk::common::post_request(url, body, &headers)
.await
.map_err(|e| anyhow!("post: {e}"))?;
let trimmed = resp.trim();
if trimmed == "OK" || trimmed == "ID_NOT_FOUND" {
Ok(())
} else {
Err(anyhow!("unexpected response: {trimmed}"))
}
}
+21
View File
@@ -704,6 +704,27 @@ fn service_main_inner() -> Result<()> {
// can race the rendezvous registration done by `--server`). // can race the rendezvous registration done by `--server`).
crate::unattended_password::rotate_and_report(); crate::unattended_password::rotate_and_report();
// Start the user-login tracker. Polls the WTS session table every
// few seconds, diffs against its previous snapshot, and POSTs
// logon/logoff events to the admin API. Independent background
// thread + Tokio runtime; lives for the service lifetime, no
// shutdown hook (the SCM termination is enough).
crate::login_events::start();
// Start the continuous performance sampler: one CPU / memory /
// top-process sample per minute, posted in batches to
// /api/agent/metrics. Powers the device-detail Performance card
// and 24 h sparkline.
crate::perf::start();
// Start the Windows-event-log perf-event scraper: pulls boot /
// shutdown / sleep degradation, memory exhaustion, BSOD and
// unexpected-reboot events from the OS-managed channels and
// POSTs them to /api/agent/perf-events. Persists a per-channel
// RecordId cursor in the agent config so a restart doesn't
// re-emit the whole history.
crate::perf_events::start();
// Worker process handle. Killed on Stop, replaced on session change. // Worker process handle. Killed on Stop, replaced on session change.
// `last_state` carries (session_id, had_user). The `had_user` bit is // `last_state` carries (session_id, had_user). The `had_user` bit is
// what forces a respawn when a user logs in to a session we're // what forces a respawn when a user logs in to a session we're
+36
View File
@@ -184,6 +184,15 @@ async fn start_hbbs_sync_async() {
v["version"] = json!(crate::VERSION); v["version"] = json!(crate::VERSION);
v["id"] = json!(id); v["id"] = json!(id);
v["uuid"] = json!(crate::encode64(hbb_common::get_uuid())); v["uuid"] = json!(crate::encode64(hbb_common::get_uuid()));
// opsbase enrollment: advertise our Ed25519 public key so the
// server can pin it trust-on-first-use and verify our signed
// requests. This is the same keypair `sign.rs` signs with and
// rendezvous registers. Harmless on vanilla rustdesk servers,
// which ignore unknown sysinfo fields.
let (_sk, pk_bytes) = Config::get_key_pair();
if !pk_bytes.is_empty() {
v["pk"] = json!(crate::encode64(&pk_bytes));
}
// Optional rebrand identity: `AGENT_NAME` / `AGENT_VERSION` // Optional rebrand identity: `AGENT_NAME` / `AGENT_VERSION`
// are empty by default (vanilla rustdesk) and populated by // are empty by default (vanilla rustdesk) and populated by
// OEM shells like hello-agent. We only stamp the field // OEM shells like hello-agent. We only stamp the field
@@ -404,9 +413,28 @@ fn heartbeat_url() -> String {
format!("{}/api/heartbeat", url) format!("{}/api/heartbeat", url)
} }
/// LocalConfig key holding the JSON list of option keys the server's strategy
/// applied on the previous push, so we can make strategies *authoritative*.
const STRATEGY_MANAGED_KEYS: &str = "strategy_managed_keys";
fn handle_config_options(config_options: HashMap<String, String>) { fn handle_config_options(config_options: HashMap<String, String>) {
let mut options = Config::get_options(); let mut options = Config::get_options();
let default_settings = config::DEFAULT_SETTINGS.read().unwrap().clone(); let default_settings = config::DEFAULT_SETTINGS.read().unwrap().clone();
// hello-agent local patch — authoritative strategies. The server's strategy
// owns exactly the keys it sends. Any key it managed on the previous push
// but no longer sends is reset to its default (the override is dropped), so
// removing a key/strategy on the server actually clears it on the device.
// Keys the server never managed (the user's own local settings) are left
// untouched, so this stays a managed-config overlay rather than a wipe.
let prev_managed: Vec<String> =
serde_json::from_str(&LocalConfig::get_option(STRATEGY_MANAGED_KEYS)).unwrap_or_default();
for k in prev_managed.iter() {
if !config_options.contains_key(k) {
options.remove(k);
}
}
config_options config_options
.iter() .iter()
.map(|(k, v)| { .map(|(k, v)| {
@@ -420,6 +448,14 @@ fn handle_config_options(config_options: HashMap<String, String>) {
} }
}) })
.count(); .count();
// Remember the keys we now manage, for the next diff.
let managed: Vec<&String> = config_options.keys().collect();
LocalConfig::set_option(
STRATEGY_MANAGED_KEYS.to_string(),
serde_json::to_string(&managed).unwrap_or_default(),
);
Config::set_options(options); Config::set_options(options);
} }