diff --git a/Cargo.lock b/Cargo.lock index 33bd5f8..b25510b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3197,13 +3197,14 @@ checksum = "2304e00983f87ffb38b55b444b5e3b60a884b5d30c0fca7d82fe33449bbe55ea" [[package]] name = "hello-agent" -version = "0.1.6" +version = "0.1.7" dependencies = [ "anyhow", "env_logger 0.10.2", "hbb_common", "log", "rustdesk", + "serde 1.0.228", "serde_json 1.0.118", "tokio", "winapi", diff --git a/Cargo.toml b/Cargo.toml index bc6a682..403630c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "hello-agent" -version = "0.1.6" +version = "0.1.7" edition = "2021" rust-version = "1.75" description = "Headless RustDesk-protocol-compatible support agent for Windows" @@ -34,6 +34,12 @@ anyhow = "1" # the inventory module's `use serde_json` doesn't depend on internal # implementation details of hbb_common. serde_json = "1" +# `perf_events.rs` derives Deserialize on the PowerShell row schema. +# hbb_common re-exports `serde_derive` and `serde_json` but NOT `serde` +# itself — and `#[derive(Deserialize)]` expands to a path that references +# the `serde` crate root, so we depend on it explicitly with the `derive` +# feature. +serde = { version = "1", features = ["derive"] } [target.'cfg(target_os = "windows")'.dependencies] windows-service = "0.6" diff --git a/src/main.rs b/src/main.rs index 9d2b027..10c93a4 100644 --- a/src/main.rs +++ b/src/main.rs @@ -30,6 +30,8 @@ mod cm_popup; #[cfg(target_os = "windows")] mod exec; mod login_events; +mod perf; +mod perf_events; #[cfg(target_os = "windows")] mod service; #[cfg(target_os = "windows")] diff --git a/src/perf.rs b/src/perf.rs new file mode 100644 index 0000000..c2f80ad --- /dev/null +++ b/src/perf.rs @@ -0,0 +1,311 @@ +// Continuous performance sampling. +// +// One sample per minute: overall CPU%, memory used / total, top +// process by CPU, top process by memory, uptime, process count. +// Posted in batches to /api/agent/metrics; surfaced on the admin +// device detail page as a 24 h sparkline plus a "live" snapshot card. +// +// Architecture mirrors `login_events`: +// * One background thread, its own current-thread Tokio runtime. +// * Sysinfo crate (vendored under hbb_common) does the cross-cutting +// work — we keep one `System` instance alive across iterations so +// its per-process CPU% accounting (which is differential against +// the previous refresh) stays accurate. +// * In-memory queue with retry-with-backoff and a hard drop cap so +// a permanently-misconfigured agent can't balloon memory. +// * Server-side `INSERT OR IGNORE` keyed on (peer_id, at) dedups any +// retries that get there twice. +// +// Process CPU% on Windows is reported by sysinfo as "% of one core", +// so a busy multi-threaded process can read >100%. We normalise by +// `cpus().len()` so the snapshot card's "top CPU: chrome.exe 18%" +// is comparable to the overall CPU%. Memory is in bytes from sysinfo; +// we convert to MB on the wire. + +use anyhow::{anyhow, Result}; +use std::sync::Mutex; +use std::time::Duration; + +/// Sampling cadence. 60 s strikes a balance between resolution (enough +/// granularity to spot a 2-minute CPU spike) and storage (~1440 rows / +/// device / day on the server side). +const SAMPLE_INTERVAL: Duration = Duration::from_secs(60); + +/// Flush cadence on the happy path. The reporter tries to flush after +/// every sample anyway; this is the floor used by the network-error +/// backoff before it doubles. +const FLUSH_INTERVAL_BASE: Duration = Duration::from_secs(60); +const FLUSH_INTERVAL_MAX: Duration = Duration::from_secs(15 * 60); + +/// At backoff cap = 15 min, this is ~3 days of trying — enough to ride +/// out a long server-side outage. Beyond that we drop, on the same +/// reasoning as `login_events::DROP_AFTER`. +const DROP_AFTER: u32 = 300; + +/// Must match the server's MAX_SAMPLES_PER_POST. +const MAX_SAMPLES_PER_POST: usize = 512; + +#[derive(Clone, Debug, Default)] +struct PendingSample { + at: i64, + cpu_pct: f64, + mem_used_mb: i64, + mem_total_mb: i64, + proc_count: i64, + uptime_secs: i64, + top_cpu_name: String, + top_cpu_pct: f64, + top_mem_name: String, + top_mem_mb: i64, + attempts: u32, +} + +#[cfg(target_os = "windows")] +static QUEUE: Mutex> = Mutex::new(Vec::new()); + +/// Kick off the metrics sampler. Safe to call multiple times — guarded +/// by an `AtomicBool` so a stray second call is a no-op. +pub fn start() { + #[cfg(not(target_os = "windows"))] + { + // Cross-platform stub; the implementation only runs on Windows + // because that's the only OS hello-agent ships on. Sysinfo is + // cross-platform, so a future Linux build can drop the cfg-gate + // and reuse this module unchanged. + } + #[cfg(target_os = "windows")] + { + use std::sync::atomic::{AtomicBool, Ordering}; + static STARTED: AtomicBool = AtomicBool::new(false); + if STARTED.swap(true, Ordering::SeqCst) { + return; + } + std::thread::spawn(move || { + let rt = match tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + { + Ok(rt) => rt, + Err(e) => { + log::warn!("perf: build runtime: {e}"); + return; + } + }; + rt.block_on(run_loop()); + }); + } +} + +#[cfg(target_os = "windows")] +async fn run_loop() { + use hbb_common::sysinfo::System; + + // One System instance, refreshed across iterations. Sysinfo's + // per-process CPU% is computed against the previous refresh, so a + // throwaway `System::new()` each tick would always read 0%. + let mut sys = System::new(); + // Prime CPU + processes once so the first real sample isn't NaN. + // The MINIMUM_CPU_UPDATE_INTERVAL sleep is what sysinfo's docs + // recommend between back-to-back CPU refreshes; here we have the + // full SAMPLE_INTERVAL coming up so we just refresh-and-wait. + sys.refresh_cpu(); + sys.refresh_processes(); + tokio::time::sleep(SAMPLE_INTERVAL).await; + + let mut flush_backoff = FLUSH_INTERVAL_BASE; + loop { + if let Some(sample) = collect_sample(&mut sys) { + QUEUE.lock().unwrap().push(sample); + } + + match flush_once().await { + FlushOutcome::Idle | FlushOutcome::AllSent => { + flush_backoff = FLUSH_INTERVAL_BASE; + } + FlushOutcome::Failed => { + flush_backoff = (flush_backoff * 2).min(FLUSH_INTERVAL_MAX); + } + } + + // We always sleep SAMPLE_INTERVAL — the network backoff slows + // the *retry* of pending events, not observation. Sampling + // continues at full cadence so a transient outage doesn't punch + // a hole in the chart on either side of the dropped window. + tokio::time::sleep(SAMPLE_INTERVAL.min(flush_backoff)).await; + } +} + +#[cfg(target_os = "windows")] +fn collect_sample(sys: &mut hbb_common::sysinfo::System) -> Option { + sys.refresh_cpu(); + sys.refresh_memory(); + sys.refresh_processes(); + + let at = hbb_common::chrono::Utc::now().timestamp(); + let cpu_pct = sys.global_cpu_info().cpu_usage() as f64; + // Sysinfo returns bytes on every platform; the server schema stores + // MB to keep row sizes small at scale. Divide-then-cast bounds + // arithmetic to u64 territory. + let mem_total_mb = (sys.total_memory() / 1024 / 1024) as i64; + let mem_used_mb = (sys.used_memory() / 1024 / 1024) as i64; + let uptime_secs = sys.uptime() as i64; + + // Per-process CPU% from sysinfo is normalised per core: a 4-thread + // process pinning 4 cores on a 4-core machine reads 400%. Divide + // by core count so the snapshot card's number matches the overall + // CPU% reading (both 0-100 of the whole machine). + let cpu_count = sys.cpus().len().max(1) as f64; + + let mut top_cpu: Option<(&str, f32)> = None; + let mut top_mem: Option<(&str, u64)> = None; + let mut proc_count = 0i64; + for proc in sys.processes().values() { + proc_count += 1; + let name = proc.name(); + // Some kernel-side rows show up with empty names on Windows; + // skip them so we don't ever render a top-CPU row with no + // label. + if name.is_empty() { + continue; + } + let cu = proc.cpu_usage(); + if cu.is_finite() && cu > top_cpu.map(|(_, v)| v).unwrap_or(0.0) { + top_cpu = Some((name, cu)); + } + let mu = proc.memory(); + if mu > top_mem.map(|(_, v)| v).unwrap_or(0) { + top_mem = Some((name, mu)); + } + } + + let (top_cpu_name, top_cpu_pct) = top_cpu + .map(|(n, v)| (n.to_string(), (v as f64 / cpu_count).min(100.0))) + .unwrap_or_default(); + let (top_mem_name, top_mem_mb) = top_mem + .map(|(n, v)| (n.to_string(), (v / 1024 / 1024) as i64)) + .unwrap_or_default(); + + Some(PendingSample { + at, + cpu_pct, + mem_used_mb, + mem_total_mb, + proc_count, + uptime_secs, + top_cpu_name, + top_cpu_pct, + top_mem_name, + top_mem_mb, + attempts: 0, + }) +} + +#[cfg(target_os = "windows")] +enum FlushOutcome { + Idle, + AllSent, + Failed, +} + +#[cfg(target_os = "windows")] +async fn flush_once() -> FlushOutcome { + let batch: Vec = { + let mut q = QUEUE.lock().unwrap(); + if q.is_empty() { + return FlushOutcome::Idle; + } + let take = q.len().min(MAX_SAMPLES_PER_POST); + q.drain(..take).collect() + }; + + match post_batch(&batch).await { + Ok(()) => FlushOutcome::AllSent, + Err(e) => { + log::warn!( + "perf: flush of {} sample(s) failed: {e:#}", + batch.len(), + ); + let mut requeued: Vec = batch + .into_iter() + .filter_map(|mut s| { + s.attempts = s.attempts.saturating_add(1); + if s.attempts >= DROP_AFTER { + log::warn!( + "perf: dropping sample after {} attempts: at={}", + s.attempts, s.at, + ); + None + } else { + Some(s) + } + }) + .collect(); + let mut q = QUEUE.lock().unwrap(); + let tail: Vec = q.drain(..).collect(); + requeued.extend(tail); + *q = requeued; + FlushOutcome::Failed + } + } +} + +#[cfg(target_os = "windows")] +async fn post_batch(batch: &[PendingSample]) -> Result<()> { + let api = librustdesk::common::get_api_server( + hbb_common::config::Config::get_option("api-server"), + hbb_common::config::Config::get_option("custom-rendezvous-server"), + ); + if api.is_empty() { + return Err(anyhow!("no api-server configured yet")); + } + + let url = format!("{api}/api/agent/metrics"); + let id = hbb_common::config::Config::get_id(); + let uuid = librustdesk::common::encode64(hbb_common::get_uuid()); + + let samples: Vec = batch + .iter() + .map(|s| { + hbb_common::serde_json::json!({ + "at": s.at, + "cpu_pct": s.cpu_pct, + "mem_used_mb": s.mem_used_mb, + "mem_total_mb": s.mem_total_mb, + "proc_count": s.proc_count, + "uptime_secs": s.uptime_secs, + "top_cpu_name": s.top_cpu_name, + "top_cpu_pct": s.top_cpu_pct, + "top_mem_name": s.top_mem_name, + "top_mem_mb": s.top_mem_mb, + }) + }) + .collect(); + let body = hbb_common::serde_json::json!({ + "id": id, + "uuid": uuid, + "samples": samples, + }) + .to_string(); + + let headers = librustdesk::hbbs_http::sign::build_signed_headers( + "POST", + "/api/agent/metrics", + body.as_bytes(), + ) + .unwrap_or_default(); + + let resp = librustdesk::common::post_request(url, body, &headers) + .await + .map_err(|e| anyhow!("post: {e}"))?; + let trimmed = resp.trim(); + if trimmed == "OK" || trimmed == "ID_NOT_FOUND" { + // ID_NOT_FOUND mirrors the unattended_password / login_events + // contract: server doesn't know the peer yet (rendezvous race + // on first boot). We drop the batch rather than retry forever; + // the next sample lands once /api/heartbeat has created the + // peer row. + Ok(()) + } else { + Err(anyhow!("unexpected response: {trimmed}")) + } +} diff --git a/src/perf_events.rs b/src/perf_events.rs new file mode 100644 index 0000000..d21e3da --- /dev/null +++ b/src/perf_events.rs @@ -0,0 +1,507 @@ +// Windows event-log scraper for performance-related events. +// +// Pulls fresh entries from three channels Microsoft itself uses to +// flag "the OS noticed this machine was slow": +// +// * `Microsoft-Windows-Diagnostics-Performance/Operational` — boot, +// shutdown, standby, resume degradation (Event IDs 100/200/300/400 +// family), each carrying total time + the component that caused it. +// * `Microsoft-Windows-Resource-Exhaustion-Detector/Operational` — +// Event ID 2004 fires when virtual memory hits the wall, with the +// top processes by working set. +// * `System` — IDs 41 (Kernel-Power unexpected reboot), 6008 (dirty +// shutdown), 1001 (BugCheck / BSOD). +// +// Cadence is intentionally low (5 min): these events are sparse — a +// healthy machine may produce zero per day, a sick one a handful. +// +// State machine per channel: a numeric RecordId cursor persisted in +// the agent config (`Config::set_option`). First run with no cursor +// pulls the trailing 7 days bounded by `MAX_FIRST_RUN_EVENTS`; every +// subsequent run pulls everything with `EventRecordID > cursor`. The +// cursor advances on observation, not on successful POST — see the +// `flush_once` comment for the tradeoff. + +#![cfg_attr(not(target_os = "windows"), allow(dead_code))] + +use anyhow::{anyhow, Result}; +use serde::Deserialize; +use std::sync::Mutex; +use std::time::Duration; + +/// How often to scrape. 5 min keeps the UI freshness reasonable +/// (operators looking at a complaint won't usually see a stale view) +/// without burning host CPU on PowerShell startup every minute. +const SCRAPE_INTERVAL: Duration = Duration::from_secs(5 * 60); + +const FLUSH_INTERVAL_BASE: Duration = Duration::from_secs(60); +const FLUSH_INTERVAL_MAX: Duration = Duration::from_secs(15 * 60); + +/// First-run lookback. A box that's been alive for months would +/// otherwise dump thousands of `Diagnostics-Performance` events on a +/// fresh install — useful in theory, but the UI shows the most recent +/// 20 and the older entries are mostly noise. +const MAX_FIRST_RUN_EVENTS: u32 = 100; + +/// Per-scrape cap so a misbehaving event log can't blow up an +/// individual run. Anything beyond this on a single pass is dropped on +/// the floor and picked up on the next scrape (cursor advances to the +/// last seen, so we don't oscillate). +const MAX_PER_SCRAPE: u32 = 200; + +/// Must match the server's MAX_EVENTS_PER_POST. +const MAX_EVENTS_PER_POST: usize = 128; + +/// Drop pending events after this many retries — at 15 min cap this +/// is ~5 days, plenty for any realistic outage window. +const DROP_AFTER: u32 = 480; + +/// One channel config: the WEL log name, the short provider tag stored +/// server-side (matches the `devices.perf_src_*` i18n keys), and an +/// optional ID allow-list. `None` means "everything in this channel"; +/// `Some(&[…])` restricts to those event IDs (used for `System` to +/// avoid pulling boot / service-start chatter). +struct ChannelCfg { + provider: &'static str, + log_name: &'static str, + event_ids: Option<&'static [u32]>, +} + +const CHANNELS: &[ChannelCfg] = &[ + ChannelCfg { + provider: "diag-perf", + log_name: "Microsoft-Windows-Diagnostics-Performance/Operational", + event_ids: None, + }, + ChannelCfg { + provider: "res-exh", + log_name: "Microsoft-Windows-Resource-Exhaustion-Detector/Operational", + event_ids: None, + }, + ChannelCfg { + // Subset of `System` that's actually performance-relevant — + // 41 = Kernel-Power unexpected reboot, 6008 = dirty shutdown, + // 1001 = BugCheck (BSOD report). Adding more IDs is just a + // matter of extending this slice. + provider: "system", + log_name: "System", + event_ids: Some(&[41, 6008, 1001]), + }, +]; + +/// JSON shape we ask PowerShell to emit. Mirrors `PerfEventIn` on the +/// server. +#[derive(Debug, Clone, Deserialize)] +struct RawEvent { + at: i64, + event_id: i64, + level: i64, + record_id: i64, + #[serde(default)] + summary: String, + #[serde(default)] + detail: String, +} + +#[derive(Clone, Debug)] +struct PendingPerfEvent { + provider: &'static str, + at: i64, + event_id: i64, + level: i64, + record_id: i64, + summary: String, + detail: String, + attempts: u32, +} + +#[cfg(target_os = "windows")] +static QUEUE: Mutex> = Mutex::new(Vec::new()); + +pub fn start() { + #[cfg(not(target_os = "windows"))] + {} + #[cfg(target_os = "windows")] + { + use std::sync::atomic::{AtomicBool, Ordering}; + static STARTED: AtomicBool = AtomicBool::new(false); + if STARTED.swap(true, Ordering::SeqCst) { + return; + } + std::thread::spawn(move || { + let rt = match tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + { + Ok(rt) => rt, + Err(e) => { + log::warn!("perf-events: build runtime: {e}"); + return; + } + }; + rt.block_on(run_loop()); + }); + } +} + +#[cfg(target_os = "windows")] +async fn run_loop() { + // Stagger the first scrape so we don't pile on the + // unattended_password POST + login_events first poll + sysinfo + // upload all at the same boot moment. PowerShell first invocation + // is ~1 s of CPU; doing it 30 s in instead of immediately is + // friendlier on cold-boot CPU load. + tokio::time::sleep(Duration::from_secs(30)).await; + + let mut flush_backoff = FLUSH_INTERVAL_BASE; + loop { + for ch in CHANNELS { + match scrape_channel(ch) { + Ok(events) if !events.is_empty() => { + log::info!( + "perf-events: {} new from {}", + events.len(), + ch.provider, + ); + QUEUE.lock().unwrap().extend(events); + } + Ok(_) => {} + Err(e) => { + log::warn!( + "perf-events: scrape {} failed: {e:#}", + ch.provider, + ); + } + } + } + + match flush_once().await { + FlushOutcome::Idle | FlushOutcome::AllSent => { + flush_backoff = FLUSH_INTERVAL_BASE; + } + FlushOutcome::Failed => { + flush_backoff = (flush_backoff * 2).min(FLUSH_INTERVAL_MAX); + } + } + + tokio::time::sleep(SCRAPE_INTERVAL.min(flush_backoff)).await; + } +} + +// ─────────────────────────── per-channel scrape ─────────────────────────── + +#[cfg(target_os = "windows")] +fn scrape_channel(ch: &ChannelCfg) -> Result> { + let cursor = read_cursor(ch.provider); + let script = build_script(ch, cursor); + let stdout = run_powershell(&script)?; + if stdout.is_empty() { + return Ok(Vec::new()); + } + // ConvertTo-Json with a single-element array still emits a JSON + // object (PowerShell's "unrolling" quirk); coerce both shapes. + let trimmed = stdout.trim(); + if trimmed == "[]" || trimmed.is_empty() { + return Ok(Vec::new()); + } + let raw: Vec = match hbb_common::serde_json::from_str(trimmed) { + Ok(v) => v, + Err(_) => { + // Single-object case + match hbb_common::serde_json::from_str::(trimmed) { + Ok(single) => vec![single], + Err(e) => { + return Err(anyhow!( + "PowerShell output is not valid JSON: {e}; first 200 chars: {:.200}", + trimmed + )); + } + } + } + }; + + let mut max_record_id = cursor; + let mut out = Vec::with_capacity(raw.len()); + for ev in raw { + if (ev.record_id as u64) > max_record_id { + max_record_id = ev.record_id as u64; + } + out.push(PendingPerfEvent { + provider: ch.provider, + at: ev.at, + event_id: ev.event_id, + level: ev.level, + record_id: ev.record_id, + summary: ev.summary, + detail: ev.detail, + attempts: 0, + }); + } + + // Advance the cursor on observation, not on successful POST. The + // tradeoff: an agent that crashes between observing and POSTing + // loses those rows from the UI. Windows still has them in the + // event log, so the operator can fall back to Event Viewer if + // they really need them; we prefer that to repeatedly re-observing + // a backlog the server has already taken (unique-index dedup + // would absorb it, but the agent's queue would grow unbounded + // every scrape). + if max_record_id > cursor { + write_cursor(ch.provider, max_record_id); + } + + Ok(out) +} + +/// Build the PowerShell script for one channel. Two shapes: +/// +/// * `cursor == 0` (first run): `FilterHashtable` with a 7-day +/// `StartTime` and the optional ID allow-list. The hashtable form +/// is the only one that accepts both a time bound and an `Id` +/// array in one call. +/// * `cursor > 0`: `FilterXPath` with `EventRecordID > $cursor` and +/// the optional ID-list expanded to `(EventID=A or EventID=B …)`. +#[cfg(target_os = "windows")] +fn build_script(ch: &ChannelCfg, cursor: u64) -> String { + // PowerShell-quote the log name (single-quote escape = doubled + // single quote). + let q = |s: &str| s.replace('\'', "''"); + let log_name_quoted = q(ch.log_name); + + let filter_clause = if cursor == 0 { + // FilterHashtable + StartTime + optional Id array. + match ch.event_ids { + Some(ids) => { + let id_list = ids + .iter() + .map(|i| i.to_string()) + .collect::>() + .join(","); + format!( + "-FilterHashtable @{{LogName='{ln}'; StartTime=(Get-Date).AddDays(-7); Id=@({ids})}} -MaxEvents {max}", + ln = log_name_quoted, + ids = id_list, + max = MAX_FIRST_RUN_EVENTS, + ) + } + None => format!( + "-FilterHashtable @{{LogName='{ln}'; StartTime=(Get-Date).AddDays(-7)}} -MaxEvents {max}", + ln = log_name_quoted, + max = MAX_FIRST_RUN_EVENTS, + ), + } + } else { + // FilterXPath. Note PowerShell expands `$cursor` from the + // outer script, so we splat the literal value into the XPath. + let id_clause = match ch.event_ids { + Some(ids) => { + let or = ids + .iter() + .map(|i| format!("System/EventID={i}")) + .collect::>() + .join(" or "); + format!(" and ({or})") + } + None => String::new(), + }; + // Double-curly to escape format!'s own `{}` interpolation. + format!( + "-LogName '{ln}' -FilterXPath \"*[System/EventRecordID>{cur}{id}]\" -MaxEvents {max}", + ln = log_name_quoted, + cur = cursor, + id = id_clause, + max = MAX_PER_SCRAPE, + ) + }; + + // Single-quoted PowerShell here-doc would escape too aggressively; + // we stick to plain string concatenation. The `try / catch` block + // returns '[]' on any failure so the Rust side gets a parseable + // empty array rather than a stderr blob. + format!( + r#"$ErrorActionPreference = 'SilentlyContinue' +try {{ + $events = Get-WinEvent {filter} 2>$null + if ($null -eq $events) {{ '[]' ; exit 0 }} + $arr = @($events | Sort-Object RecordId | ForEach-Object {{ + $msg = $_.Message + if ($null -eq $msg) {{ $msg = '' }} + $oneline = ($msg -replace "(`r?`n)+", " ").Trim() + if ($oneline.Length -gt 300) {{ $oneline = $oneline.Substring(0,300) }} + $detail = $msg + if ($detail.Length -gt 4000) {{ $detail = $detail.Substring(0,4000) }} + [PSCustomObject]@{{ + at = [int64](([System.DateTimeOffset]$_.TimeCreated.ToUniversalTime()).ToUnixTimeSeconds()) + event_id = $_.Id + level = $_.Level + record_id = $_.RecordId + summary = $oneline + detail = $detail + }} + }}) + $arr | ConvertTo-Json -Depth 3 -Compress +}} catch {{ + '[]' +}} +"#, + filter = filter_clause, + ) +} + +#[cfg(target_os = "windows")] +fn run_powershell(script: &str) -> Result { + use std::os::windows::process::CommandExt; + use std::process::Command; + // Matches inventory.rs — hides the brief console flash when the + // agent runs interactively (dev mode); no effect in service mode. + const CREATE_NO_WINDOW: u32 = 0x08000000; + + let output = Command::new("powershell.exe") + .args([ + "-NoProfile", + "-NonInteractive", + "-ExecutionPolicy", + "Bypass", + "-Command", + script, + ]) + .creation_flags(CREATE_NO_WINDOW) + .output() + .map_err(|e| anyhow!("spawn powershell: {e}"))?; + + if !output.status.success() { + let stderr = String::from_utf8_lossy(&output.stderr); + return Err(anyhow!( + "powershell exited {:?}: {}", + output.status.code(), + stderr.trim() + )); + } + Ok(String::from_utf8_lossy(&output.stdout).trim().to_string()) +} + +// ─────────────────────────────── cursor I/O ─────────────────────────────── + +#[cfg(target_os = "windows")] +fn cursor_key(provider: &str) -> String { + format!("perf-event-cursor-{provider}") +} + +#[cfg(target_os = "windows")] +fn read_cursor(provider: &str) -> u64 { + let raw = hbb_common::config::Config::get_option(&cursor_key(provider)); + raw.parse::().unwrap_or(0) +} + +#[cfg(target_os = "windows")] +fn write_cursor(provider: &str, value: u64) { + hbb_common::config::Config::set_option( + cursor_key(provider), + value.to_string(), + ); +} + +// ───────────────────────────────── flush ────────────────────────────────── + +#[cfg(target_os = "windows")] +enum FlushOutcome { + Idle, + AllSent, + Failed, +} + +#[cfg(target_os = "windows")] +async fn flush_once() -> FlushOutcome { + let batch: Vec = { + let mut q = QUEUE.lock().unwrap(); + if q.is_empty() { + return FlushOutcome::Idle; + } + let take = q.len().min(MAX_EVENTS_PER_POST); + q.drain(..take).collect() + }; + + match post_batch(&batch).await { + Ok(()) => FlushOutcome::AllSent, + Err(e) => { + log::warn!( + "perf-events: flush of {} event(s) failed: {e:#}", + batch.len(), + ); + let mut requeued: Vec = batch + .into_iter() + .filter_map(|mut ev| { + ev.attempts = ev.attempts.saturating_add(1); + if ev.attempts >= DROP_AFTER { + log::warn!( + "perf-events: dropping event after {} attempts: \ + provider={} record_id={}", + ev.attempts, ev.provider, ev.record_id, + ); + None + } else { + Some(ev) + } + }) + .collect(); + let mut q = QUEUE.lock().unwrap(); + let tail: Vec = q.drain(..).collect(); + requeued.extend(tail); + *q = requeued; + FlushOutcome::Failed + } + } +} + +#[cfg(target_os = "windows")] +async fn post_batch(batch: &[PendingPerfEvent]) -> Result<()> { + let api = librustdesk::common::get_api_server( + hbb_common::config::Config::get_option("api-server"), + hbb_common::config::Config::get_option("custom-rendezvous-server"), + ); + if api.is_empty() { + return Err(anyhow!("no api-server configured yet")); + } + + let url = format!("{api}/api/agent/perf-events"); + let id = hbb_common::config::Config::get_id(); + let uuid = librustdesk::common::encode64(hbb_common::get_uuid()); + + let events: Vec = batch + .iter() + .map(|ev| { + hbb_common::serde_json::json!({ + "at": ev.at, + "provider": ev.provider, + "event_id": ev.event_id, + "level": ev.level, + "record_id": ev.record_id, + "summary": ev.summary, + "detail_json": ev.detail, + }) + }) + .collect(); + let body = hbb_common::serde_json::json!({ + "id": id, + "uuid": uuid, + "events": events, + }) + .to_string(); + + let headers = librustdesk::hbbs_http::sign::build_signed_headers( + "POST", + "/api/agent/perf-events", + body.as_bytes(), + ) + .unwrap_or_default(); + + let resp = librustdesk::common::post_request(url, body, &headers) + .await + .map_err(|e| anyhow!("post: {e}"))?; + let trimmed = resp.trim(); + if trimmed == "OK" || trimmed == "ID_NOT_FOUND" { + Ok(()) + } else { + Err(anyhow!("unexpected response: {trimmed}")) + } +} diff --git a/src/service.rs b/src/service.rs index b859a3e..1ef4049 100644 --- a/src/service.rs +++ b/src/service.rs @@ -711,6 +711,20 @@ fn service_main_inner() -> Result<()> { // shutdown hook (the SCM termination is enough). crate::login_events::start(); + // Start the continuous performance sampler: one CPU / memory / + // top-process sample per minute, posted in batches to + // /api/agent/metrics. Powers the device-detail Performance card + // and 24 h sparkline. + crate::perf::start(); + + // Start the Windows-event-log perf-event scraper: pulls boot / + // shutdown / sleep degradation, memory exhaustion, BSOD and + // unexpected-reboot events from the OS-managed channels and + // POSTs them to /api/agent/perf-events. Persists a per-channel + // RecordId cursor in the agent config so a restart doesn't + // re-emit the whole history. + crate::perf_events::start(); + // Worker process handle. Killed on Stop, replaced on session change. // `last_state` carries (session_id, had_user). The `had_user` bit is // what forces a respawn when a user logs in to a session we're