Implement performance monitor
build-windows / build-hello-agent-x64 (push) Failing after 2m52s
build-windows / sign-hello-agent-x64 (push) Has been skipped
build-windows / validate-hello-agent-x64 (push) Has been skipped

This commit is contained in:
2026-05-22 21:43:51 +02:00
parent f868efa432
commit a0aa84dd2f
6 changed files with 836 additions and 2 deletions
Generated
+1 -1
View File
@@ -3197,7 +3197,7 @@ checksum = "2304e00983f87ffb38b55b444b5e3b60a884b5d30c0fca7d82fe33449bbe55ea"
[[package]] [[package]]
name = "hello-agent" name = "hello-agent"
version = "0.1.6" version = "0.1.7"
dependencies = [ dependencies = [
"anyhow", "anyhow",
"env_logger 0.10.2", "env_logger 0.10.2",
+1 -1
View File
@@ -1,6 +1,6 @@
[package] [package]
name = "hello-agent" name = "hello-agent"
version = "0.1.6" version = "0.1.7"
edition = "2021" edition = "2021"
rust-version = "1.75" rust-version = "1.75"
description = "Headless RustDesk-protocol-compatible support agent for Windows" description = "Headless RustDesk-protocol-compatible support agent for Windows"
+2
View File
@@ -30,6 +30,8 @@ mod cm_popup;
#[cfg(target_os = "windows")] #[cfg(target_os = "windows")]
mod exec; mod exec;
mod login_events; mod login_events;
mod perf;
mod perf_events;
#[cfg(target_os = "windows")] #[cfg(target_os = "windows")]
mod service; mod service;
#[cfg(target_os = "windows")] #[cfg(target_os = "windows")]
+311
View File
@@ -0,0 +1,311 @@
// Continuous performance sampling.
//
// One sample per minute: overall CPU%, memory used / total, top
// process by CPU, top process by memory, uptime, process count.
// Posted in batches to /api/agent/metrics; surfaced on the admin
// device detail page as a 24 h sparkline plus a "live" snapshot card.
//
// Architecture mirrors `login_events`:
// * One background thread, its own current-thread Tokio runtime.
// * Sysinfo crate (vendored under hbb_common) does the cross-cutting
// work — we keep one `System` instance alive across iterations so
// its per-process CPU% accounting (which is differential against
// the previous refresh) stays accurate.
// * In-memory queue with retry-with-backoff and a hard drop cap so
// a permanently-misconfigured agent can't balloon memory.
// * Server-side `INSERT OR IGNORE` keyed on (peer_id, at) dedups any
// retries that get there twice.
//
// Process CPU% on Windows is reported by sysinfo as "% of one core",
// so a busy multi-threaded process can read >100%. We normalise by
// `cpus().len()` so the snapshot card's "top CPU: chrome.exe 18%"
// is comparable to the overall CPU%. Memory is in bytes from sysinfo;
// we convert to MB on the wire.
use anyhow::{anyhow, Result};
use std::sync::Mutex;
use std::time::Duration;
/// Sampling cadence. 60 s strikes a balance between resolution (enough
/// granularity to spot a 2-minute CPU spike) and storage (~1440 rows /
/// device / day on the server side).
const SAMPLE_INTERVAL: Duration = Duration::from_secs(60);
/// Flush cadence on the happy path. The reporter tries to flush after
/// every sample anyway; this is the floor used by the network-error
/// backoff before it doubles.
const FLUSH_INTERVAL_BASE: Duration = Duration::from_secs(60);
const FLUSH_INTERVAL_MAX: Duration = Duration::from_secs(15 * 60);
/// At backoff cap = 15 min, this is ~3 days of trying — enough to ride
/// out a long server-side outage. Beyond that we drop, on the same
/// reasoning as `login_events::DROP_AFTER`.
const DROP_AFTER: u32 = 300;
/// Must match the server's MAX_SAMPLES_PER_POST.
const MAX_SAMPLES_PER_POST: usize = 512;
#[derive(Clone, Debug, Default)]
struct PendingSample {
at: i64,
cpu_pct: f64,
mem_used_mb: i64,
mem_total_mb: i64,
proc_count: i64,
uptime_secs: i64,
top_cpu_name: String,
top_cpu_pct: f64,
top_mem_name: String,
top_mem_mb: i64,
attempts: u32,
}
#[cfg(target_os = "windows")]
static QUEUE: Mutex<Vec<PendingSample>> = Mutex::new(Vec::new());
/// Kick off the metrics sampler. Safe to call multiple times — guarded
/// by an `AtomicBool` so a stray second call is a no-op.
pub fn start() {
#[cfg(not(target_os = "windows"))]
{
// Cross-platform stub; the implementation only runs on Windows
// because that's the only OS hello-agent ships on. Sysinfo is
// cross-platform, so a future Linux build can drop the cfg-gate
// and reuse this module unchanged.
}
#[cfg(target_os = "windows")]
{
use std::sync::atomic::{AtomicBool, Ordering};
static STARTED: AtomicBool = AtomicBool::new(false);
if STARTED.swap(true, Ordering::SeqCst) {
return;
}
std::thread::spawn(move || {
let rt = match tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
{
Ok(rt) => rt,
Err(e) => {
log::warn!("perf: build runtime: {e}");
return;
}
};
rt.block_on(run_loop());
});
}
}
#[cfg(target_os = "windows")]
async fn run_loop() {
use hbb_common::sysinfo::System;
// One System instance, refreshed across iterations. Sysinfo's
// per-process CPU% is computed against the previous refresh, so a
// throwaway `System::new()` each tick would always read 0%.
let mut sys = System::new();
// Prime CPU + processes once so the first real sample isn't NaN.
// The MINIMUM_CPU_UPDATE_INTERVAL sleep is what sysinfo's docs
// recommend between back-to-back CPU refreshes; here we have the
// full SAMPLE_INTERVAL coming up so we just refresh-and-wait.
sys.refresh_cpu();
sys.refresh_processes();
tokio::time::sleep(SAMPLE_INTERVAL).await;
let mut flush_backoff = FLUSH_INTERVAL_BASE;
loop {
if let Some(sample) = collect_sample(&mut sys) {
QUEUE.lock().unwrap().push(sample);
}
match flush_once().await {
FlushOutcome::Idle | FlushOutcome::AllSent => {
flush_backoff = FLUSH_INTERVAL_BASE;
}
FlushOutcome::Failed => {
flush_backoff = (flush_backoff * 2).min(FLUSH_INTERVAL_MAX);
}
}
// We always sleep SAMPLE_INTERVAL — the network backoff slows
// the *retry* of pending events, not observation. Sampling
// continues at full cadence so a transient outage doesn't punch
// a hole in the chart on either side of the dropped window.
tokio::time::sleep(SAMPLE_INTERVAL.min(flush_backoff)).await;
}
}
#[cfg(target_os = "windows")]
fn collect_sample(sys: &mut hbb_common::sysinfo::System) -> Option<PendingSample> {
sys.refresh_cpu();
sys.refresh_memory();
sys.refresh_processes();
let at = hbb_common::chrono::Utc::now().timestamp();
let cpu_pct = sys.global_cpu_info().cpu_usage() as f64;
// Sysinfo returns bytes on every platform; the server schema stores
// MB to keep row sizes small at scale. Divide-then-cast bounds
// arithmetic to u64 territory.
let mem_total_mb = (sys.total_memory() / 1024 / 1024) as i64;
let mem_used_mb = (sys.used_memory() / 1024 / 1024) as i64;
let uptime_secs = sys.uptime() as i64;
// Per-process CPU% from sysinfo is normalised per core: a 4-thread
// process pinning 4 cores on a 4-core machine reads 400%. Divide
// by core count so the snapshot card's number matches the overall
// CPU% reading (both 0-100 of the whole machine).
let cpu_count = sys.cpus().len().max(1) as f64;
let mut top_cpu: Option<(&str, f32)> = None;
let mut top_mem: Option<(&str, u64)> = None;
let mut proc_count = 0i64;
for proc in sys.processes().values() {
proc_count += 1;
let name = proc.name();
// Some kernel-side rows show up with empty names on Windows;
// skip them so we don't ever render a top-CPU row with no
// label.
if name.is_empty() {
continue;
}
let cu = proc.cpu_usage();
if cu.is_finite() && cu > top_cpu.map(|(_, v)| v).unwrap_or(0.0) {
top_cpu = Some((name, cu));
}
let mu = proc.memory();
if mu > top_mem.map(|(_, v)| v).unwrap_or(0) {
top_mem = Some((name, mu));
}
}
let (top_cpu_name, top_cpu_pct) = top_cpu
.map(|(n, v)| (n.to_string(), (v as f64 / cpu_count).min(100.0)))
.unwrap_or_default();
let (top_mem_name, top_mem_mb) = top_mem
.map(|(n, v)| (n.to_string(), (v / 1024 / 1024) as i64))
.unwrap_or_default();
Some(PendingSample {
at,
cpu_pct,
mem_used_mb,
mem_total_mb,
proc_count,
uptime_secs,
top_cpu_name,
top_cpu_pct,
top_mem_name,
top_mem_mb,
attempts: 0,
})
}
#[cfg(target_os = "windows")]
enum FlushOutcome {
Idle,
AllSent,
Failed,
}
#[cfg(target_os = "windows")]
async fn flush_once() -> FlushOutcome {
let batch: Vec<PendingSample> = {
let mut q = QUEUE.lock().unwrap();
if q.is_empty() {
return FlushOutcome::Idle;
}
let take = q.len().min(MAX_SAMPLES_PER_POST);
q.drain(..take).collect()
};
match post_batch(&batch).await {
Ok(()) => FlushOutcome::AllSent,
Err(e) => {
log::warn!(
"perf: flush of {} sample(s) failed: {e:#}",
batch.len(),
);
let mut requeued: Vec<PendingSample> = batch
.into_iter()
.filter_map(|mut s| {
s.attempts = s.attempts.saturating_add(1);
if s.attempts >= DROP_AFTER {
log::warn!(
"perf: dropping sample after {} attempts: at={}",
s.attempts, s.at,
);
None
} else {
Some(s)
}
})
.collect();
let mut q = QUEUE.lock().unwrap();
let tail: Vec<PendingSample> = q.drain(..).collect();
requeued.extend(tail);
*q = requeued;
FlushOutcome::Failed
}
}
}
#[cfg(target_os = "windows")]
async fn post_batch(batch: &[PendingSample]) -> Result<()> {
let api = librustdesk::common::get_api_server(
hbb_common::config::Config::get_option("api-server"),
hbb_common::config::Config::get_option("custom-rendezvous-server"),
);
if api.is_empty() {
return Err(anyhow!("no api-server configured yet"));
}
let url = format!("{api}/api/agent/metrics");
let id = hbb_common::config::Config::get_id();
let uuid = librustdesk::common::encode64(hbb_common::get_uuid());
let samples: Vec<hbb_common::serde_json::Value> = batch
.iter()
.map(|s| {
hbb_common::serde_json::json!({
"at": s.at,
"cpu_pct": s.cpu_pct,
"mem_used_mb": s.mem_used_mb,
"mem_total_mb": s.mem_total_mb,
"proc_count": s.proc_count,
"uptime_secs": s.uptime_secs,
"top_cpu_name": s.top_cpu_name,
"top_cpu_pct": s.top_cpu_pct,
"top_mem_name": s.top_mem_name,
"top_mem_mb": s.top_mem_mb,
})
})
.collect();
let body = hbb_common::serde_json::json!({
"id": id,
"uuid": uuid,
"samples": samples,
})
.to_string();
let headers = librustdesk::hbbs_http::sign::build_signed_headers(
"POST",
"/api/agent/metrics",
body.as_bytes(),
)
.unwrap_or_default();
let resp = librustdesk::common::post_request(url, body, &headers)
.await
.map_err(|e| anyhow!("post: {e}"))?;
let trimmed = resp.trim();
if trimmed == "OK" || trimmed == "ID_NOT_FOUND" {
// ID_NOT_FOUND mirrors the unattended_password / login_events
// contract: server doesn't know the peer yet (rendezvous race
// on first boot). We drop the batch rather than retry forever;
// the next sample lands once /api/heartbeat has created the
// peer row.
Ok(())
} else {
Err(anyhow!("unexpected response: {trimmed}"))
}
}
+507
View File
@@ -0,0 +1,507 @@
// Windows event-log scraper for performance-related events.
//
// Pulls fresh entries from three channels Microsoft itself uses to
// flag "the OS noticed this machine was slow":
//
// * `Microsoft-Windows-Diagnostics-Performance/Operational` — boot,
// shutdown, standby, resume degradation (Event IDs 100/200/300/400
// family), each carrying total time + the component that caused it.
// * `Microsoft-Windows-Resource-Exhaustion-Detector/Operational` —
// Event ID 2004 fires when virtual memory hits the wall, with the
// top processes by working set.
// * `System` — IDs 41 (Kernel-Power unexpected reboot), 6008 (dirty
// shutdown), 1001 (BugCheck / BSOD).
//
// Cadence is intentionally low (5 min): these events are sparse — a
// healthy machine may produce zero per day, a sick one a handful.
//
// State machine per channel: a numeric RecordId cursor persisted in
// the agent config (`Config::set_option`). First run with no cursor
// pulls the trailing 7 days bounded by `MAX_FIRST_RUN_EVENTS`; every
// subsequent run pulls everything with `EventRecordID > cursor`. The
// cursor advances on observation, not on successful POST — see the
// `flush_once` comment for the tradeoff.
#![cfg_attr(not(target_os = "windows"), allow(dead_code))]
use anyhow::{anyhow, Result};
use serde::Deserialize;
use std::sync::Mutex;
use std::time::Duration;
/// How often to scrape. 5 min keeps the UI freshness reasonable
/// (operators looking at a complaint won't usually see a stale view)
/// without burning host CPU on PowerShell startup every minute.
const SCRAPE_INTERVAL: Duration = Duration::from_secs(5 * 60);
const FLUSH_INTERVAL_BASE: Duration = Duration::from_secs(60);
const FLUSH_INTERVAL_MAX: Duration = Duration::from_secs(15 * 60);
/// First-run lookback. A box that's been alive for months would
/// otherwise dump thousands of `Diagnostics-Performance` events on a
/// fresh install — useful in theory, but the UI shows the most recent
/// 20 and the older entries are mostly noise.
const MAX_FIRST_RUN_EVENTS: u32 = 100;
/// Per-scrape cap so a misbehaving event log can't blow up an
/// individual run. Anything beyond this on a single pass is dropped on
/// the floor and picked up on the next scrape (cursor advances to the
/// last seen, so we don't oscillate).
const MAX_PER_SCRAPE: u32 = 200;
/// Must match the server's MAX_EVENTS_PER_POST.
const MAX_EVENTS_PER_POST: usize = 128;
/// Drop pending events after this many retries — at 15 min cap this
/// is ~5 days, plenty for any realistic outage window.
const DROP_AFTER: u32 = 480;
/// One channel config: the WEL log name, the short provider tag stored
/// server-side (matches the `devices.perf_src_*` i18n keys), and an
/// optional ID allow-list. `None` means "everything in this channel";
/// `Some(&[…])` restricts to those event IDs (used for `System` to
/// avoid pulling boot / service-start chatter).
struct ChannelCfg {
provider: &'static str,
log_name: &'static str,
event_ids: Option<&'static [u32]>,
}
const CHANNELS: &[ChannelCfg] = &[
ChannelCfg {
provider: "diag-perf",
log_name: "Microsoft-Windows-Diagnostics-Performance/Operational",
event_ids: None,
},
ChannelCfg {
provider: "res-exh",
log_name: "Microsoft-Windows-Resource-Exhaustion-Detector/Operational",
event_ids: None,
},
ChannelCfg {
// Subset of `System` that's actually performance-relevant —
// 41 = Kernel-Power unexpected reboot, 6008 = dirty shutdown,
// 1001 = BugCheck (BSOD report). Adding more IDs is just a
// matter of extending this slice.
provider: "system",
log_name: "System",
event_ids: Some(&[41, 6008, 1001]),
},
];
/// JSON shape we ask PowerShell to emit. Mirrors `PerfEventIn` on the
/// server.
#[derive(Debug, Clone, Deserialize)]
struct RawEvent {
at: i64,
event_id: i64,
level: i64,
record_id: i64,
#[serde(default)]
summary: String,
#[serde(default)]
detail: String,
}
#[derive(Clone, Debug)]
struct PendingPerfEvent {
provider: &'static str,
at: i64,
event_id: i64,
level: i64,
record_id: i64,
summary: String,
detail: String,
attempts: u32,
}
#[cfg(target_os = "windows")]
static QUEUE: Mutex<Vec<PendingPerfEvent>> = Mutex::new(Vec::new());
pub fn start() {
#[cfg(not(target_os = "windows"))]
{}
#[cfg(target_os = "windows")]
{
use std::sync::atomic::{AtomicBool, Ordering};
static STARTED: AtomicBool = AtomicBool::new(false);
if STARTED.swap(true, Ordering::SeqCst) {
return;
}
std::thread::spawn(move || {
let rt = match tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
{
Ok(rt) => rt,
Err(e) => {
log::warn!("perf-events: build runtime: {e}");
return;
}
};
rt.block_on(run_loop());
});
}
}
#[cfg(target_os = "windows")]
async fn run_loop() {
// Stagger the first scrape so we don't pile on the
// unattended_password POST + login_events first poll + sysinfo
// upload all at the same boot moment. PowerShell first invocation
// is ~1 s of CPU; doing it 30 s in instead of immediately is
// friendlier on cold-boot CPU load.
tokio::time::sleep(Duration::from_secs(30)).await;
let mut flush_backoff = FLUSH_INTERVAL_BASE;
loop {
for ch in CHANNELS {
match scrape_channel(ch) {
Ok(events) if !events.is_empty() => {
log::info!(
"perf-events: {} new from {}",
events.len(),
ch.provider,
);
QUEUE.lock().unwrap().extend(events);
}
Ok(_) => {}
Err(e) => {
log::warn!(
"perf-events: scrape {} failed: {e:#}",
ch.provider,
);
}
}
}
match flush_once().await {
FlushOutcome::Idle | FlushOutcome::AllSent => {
flush_backoff = FLUSH_INTERVAL_BASE;
}
FlushOutcome::Failed => {
flush_backoff = (flush_backoff * 2).min(FLUSH_INTERVAL_MAX);
}
}
tokio::time::sleep(SCRAPE_INTERVAL.min(flush_backoff)).await;
}
}
// ─────────────────────────── per-channel scrape ───────────────────────────
#[cfg(target_os = "windows")]
fn scrape_channel(ch: &ChannelCfg) -> Result<Vec<PendingPerfEvent>> {
let cursor = read_cursor(ch.provider);
let script = build_script(ch, cursor);
let stdout = run_powershell(&script)?;
if stdout.is_empty() {
return Ok(Vec::new());
}
// ConvertTo-Json with a single-element array still emits a JSON
// object (PowerShell's "unrolling" quirk); coerce both shapes.
let trimmed = stdout.trim();
if trimmed == "[]" || trimmed.is_empty() {
return Ok(Vec::new());
}
let raw: Vec<RawEvent> = match hbb_common::serde_json::from_str(trimmed) {
Ok(v) => v,
Err(_) => {
// Single-object case
match hbb_common::serde_json::from_str::<RawEvent>(trimmed) {
Ok(single) => vec![single],
Err(e) => {
return Err(anyhow!(
"PowerShell output is not valid JSON: {e}; first 200 chars: {:.200}",
trimmed
));
}
}
}
};
let mut max_record_id = cursor;
let mut out = Vec::with_capacity(raw.len());
for ev in raw {
if (ev.record_id as u64) > max_record_id {
max_record_id = ev.record_id as u64;
}
out.push(PendingPerfEvent {
provider: ch.provider,
at: ev.at,
event_id: ev.event_id,
level: ev.level,
record_id: ev.record_id,
summary: ev.summary,
detail: ev.detail,
attempts: 0,
});
}
// Advance the cursor on observation, not on successful POST. The
// tradeoff: an agent that crashes between observing and POSTing
// loses those rows from the UI. Windows still has them in the
// event log, so the operator can fall back to Event Viewer if
// they really need them; we prefer that to repeatedly re-observing
// a backlog the server has already taken (unique-index dedup
// would absorb it, but the agent's queue would grow unbounded
// every scrape).
if max_record_id > cursor {
write_cursor(ch.provider, max_record_id);
}
Ok(out)
}
/// Build the PowerShell script for one channel. Two shapes:
///
/// * `cursor == 0` (first run): `FilterHashtable` with a 7-day
/// `StartTime` and the optional ID allow-list. The hashtable form
/// is the only one that accepts both a time bound and an `Id`
/// array in one call.
/// * `cursor > 0`: `FilterXPath` with `EventRecordID > $cursor` and
/// the optional ID-list expanded to `(EventID=A or EventID=B …)`.
#[cfg(target_os = "windows")]
fn build_script(ch: &ChannelCfg, cursor: u64) -> String {
// PowerShell-quote the log name (single-quote escape = doubled
// single quote).
let q = |s: &str| s.replace('\'', "''");
let log_name_quoted = q(ch.log_name);
let filter_clause = if cursor == 0 {
// FilterHashtable + StartTime + optional Id array.
match ch.event_ids {
Some(ids) => {
let id_list = ids
.iter()
.map(|i| i.to_string())
.collect::<Vec<_>>()
.join(",");
format!(
"-FilterHashtable @{{LogName='{ln}'; StartTime=(Get-Date).AddDays(-7); Id=@({ids})}} -MaxEvents {max}",
ln = log_name_quoted,
ids = id_list,
max = MAX_FIRST_RUN_EVENTS,
)
}
None => format!(
"-FilterHashtable @{{LogName='{ln}'; StartTime=(Get-Date).AddDays(-7)}} -MaxEvents {max}",
ln = log_name_quoted,
max = MAX_FIRST_RUN_EVENTS,
),
}
} else {
// FilterXPath. Note PowerShell expands `$cursor` from the
// outer script, so we splat the literal value into the XPath.
let id_clause = match ch.event_ids {
Some(ids) => {
let or = ids
.iter()
.map(|i| format!("System/EventID={i}"))
.collect::<Vec<_>>()
.join(" or ");
format!(" and ({or})")
}
None => String::new(),
};
// Double-curly to escape format!'s own `{}` interpolation.
format!(
"-LogName '{ln}' -FilterXPath \"*[System/EventRecordID>{cur}{id}]\" -MaxEvents {max}",
ln = log_name_quoted,
cur = cursor,
id = id_clause,
max = MAX_PER_SCRAPE,
)
};
// Single-quoted PowerShell here-doc would escape too aggressively;
// we stick to plain string concatenation. The `try / catch` block
// returns '[]' on any failure so the Rust side gets a parseable
// empty array rather than a stderr blob.
format!(
r#"$ErrorActionPreference = 'SilentlyContinue'
try {{
$events = Get-WinEvent {filter} 2>$null
if ($null -eq $events) {{ '[]' ; exit 0 }}
$arr = @($events | Sort-Object RecordId | ForEach-Object {{
$msg = $_.Message
if ($null -eq $msg) {{ $msg = '' }}
$oneline = ($msg -replace "(`r?`n)+", " ").Trim()
if ($oneline.Length -gt 300) {{ $oneline = $oneline.Substring(0,300) }}
$detail = $msg
if ($detail.Length -gt 4000) {{ $detail = $detail.Substring(0,4000) }}
[PSCustomObject]@{{
at = [int64](([System.DateTimeOffset]$_.TimeCreated.ToUniversalTime()).ToUnixTimeSeconds())
event_id = $_.Id
level = $_.Level
record_id = $_.RecordId
summary = $oneline
detail = $detail
}}
}})
$arr | ConvertTo-Json -Depth 3 -Compress
}} catch {{
'[]'
}}
"#,
filter = filter_clause,
)
}
#[cfg(target_os = "windows")]
fn run_powershell(script: &str) -> Result<String> {
use std::os::windows::process::CommandExt;
use std::process::Command;
// Matches inventory.rs — hides the brief console flash when the
// agent runs interactively (dev mode); no effect in service mode.
const CREATE_NO_WINDOW: u32 = 0x08000000;
let output = Command::new("powershell.exe")
.args([
"-NoProfile",
"-NonInteractive",
"-ExecutionPolicy",
"Bypass",
"-Command",
script,
])
.creation_flags(CREATE_NO_WINDOW)
.output()
.map_err(|e| anyhow!("spawn powershell: {e}"))?;
if !output.status.success() {
let stderr = String::from_utf8_lossy(&output.stderr);
return Err(anyhow!(
"powershell exited {:?}: {}",
output.status.code(),
stderr.trim()
));
}
Ok(String::from_utf8_lossy(&output.stdout).trim().to_string())
}
// ─────────────────────────────── cursor I/O ───────────────────────────────
#[cfg(target_os = "windows")]
fn cursor_key(provider: &str) -> String {
format!("perf-event-cursor-{provider}")
}
#[cfg(target_os = "windows")]
fn read_cursor(provider: &str) -> u64 {
let raw = hbb_common::config::Config::get_option(&cursor_key(provider));
raw.parse::<u64>().unwrap_or(0)
}
#[cfg(target_os = "windows")]
fn write_cursor(provider: &str, value: u64) {
hbb_common::config::Config::set_option(
cursor_key(provider),
value.to_string(),
);
}
// ───────────────────────────────── flush ──────────────────────────────────
#[cfg(target_os = "windows")]
enum FlushOutcome {
Idle,
AllSent,
Failed,
}
#[cfg(target_os = "windows")]
async fn flush_once() -> FlushOutcome {
let batch: Vec<PendingPerfEvent> = {
let mut q = QUEUE.lock().unwrap();
if q.is_empty() {
return FlushOutcome::Idle;
}
let take = q.len().min(MAX_EVENTS_PER_POST);
q.drain(..take).collect()
};
match post_batch(&batch).await {
Ok(()) => FlushOutcome::AllSent,
Err(e) => {
log::warn!(
"perf-events: flush of {} event(s) failed: {e:#}",
batch.len(),
);
let mut requeued: Vec<PendingPerfEvent> = batch
.into_iter()
.filter_map(|mut ev| {
ev.attempts = ev.attempts.saturating_add(1);
if ev.attempts >= DROP_AFTER {
log::warn!(
"perf-events: dropping event after {} attempts: \
provider={} record_id={}",
ev.attempts, ev.provider, ev.record_id,
);
None
} else {
Some(ev)
}
})
.collect();
let mut q = QUEUE.lock().unwrap();
let tail: Vec<PendingPerfEvent> = q.drain(..).collect();
requeued.extend(tail);
*q = requeued;
FlushOutcome::Failed
}
}
}
#[cfg(target_os = "windows")]
async fn post_batch(batch: &[PendingPerfEvent]) -> Result<()> {
let api = librustdesk::common::get_api_server(
hbb_common::config::Config::get_option("api-server"),
hbb_common::config::Config::get_option("custom-rendezvous-server"),
);
if api.is_empty() {
return Err(anyhow!("no api-server configured yet"));
}
let url = format!("{api}/api/agent/perf-events");
let id = hbb_common::config::Config::get_id();
let uuid = librustdesk::common::encode64(hbb_common::get_uuid());
let events: Vec<hbb_common::serde_json::Value> = batch
.iter()
.map(|ev| {
hbb_common::serde_json::json!({
"at": ev.at,
"provider": ev.provider,
"event_id": ev.event_id,
"level": ev.level,
"record_id": ev.record_id,
"summary": ev.summary,
"detail_json": ev.detail,
})
})
.collect();
let body = hbb_common::serde_json::json!({
"id": id,
"uuid": uuid,
"events": events,
})
.to_string();
let headers = librustdesk::hbbs_http::sign::build_signed_headers(
"POST",
"/api/agent/perf-events",
body.as_bytes(),
)
.unwrap_or_default();
let resp = librustdesk::common::post_request(url, body, &headers)
.await
.map_err(|e| anyhow!("post: {e}"))?;
let trimmed = resp.trim();
if trimmed == "OK" || trimmed == "ID_NOT_FOUND" {
Ok(())
} else {
Err(anyhow!("unexpected response: {trimmed}"))
}
}
+14
View File
@@ -711,6 +711,20 @@ fn service_main_inner() -> Result<()> {
// shutdown hook (the SCM termination is enough). // shutdown hook (the SCM termination is enough).
crate::login_events::start(); crate::login_events::start();
// Start the continuous performance sampler: one CPU / memory /
// top-process sample per minute, posted in batches to
// /api/agent/metrics. Powers the device-detail Performance card
// and 24 h sparkline.
crate::perf::start();
// Start the Windows-event-log perf-event scraper: pulls boot /
// shutdown / sleep degradation, memory exhaustion, BSOD and
// unexpected-reboot events from the OS-managed channels and
// POSTs them to /api/agent/perf-events. Persists a per-channel
// RecordId cursor in the agent config so a restart doesn't
// re-emit the whole history.
crate::perf_events::start();
// Worker process handle. Killed on Stop, replaced on session change. // Worker process handle. Killed on Stop, replaced on session change.
// `last_state` carries (session_id, had_user). The `had_user` bit is // `last_state` carries (session_id, had_user). The `had_user` bit is
// what forces a respawn when a user logs in to a session we're // what forces a respawn when a user logs in to a session we're