From f868efa432d974e72eaa9728622b6d07e9454a0e Mon Sep 17 00:00:00 2001 From: Mike Mueller Date: Fri, 22 May 2026 20:07:25 +0200 Subject: [PATCH] Implement user login logging --- Cargo.lock | 2 +- Cargo.toml | 2 +- src/login_events.rs | 617 ++++++++++++++++++++++++++++++++++++++++++++ src/main.rs | 1 + src/service.rs | 7 + 5 files changed, 627 insertions(+), 2 deletions(-) create mode 100644 src/login_events.rs diff --git a/Cargo.lock b/Cargo.lock index a8b0f52..33bd5f8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3197,7 +3197,7 @@ checksum = "2304e00983f87ffb38b55b444b5e3b60a884b5d30c0fca7d82fe33449bbe55ea" [[package]] name = "hello-agent" -version = "0.1.5" +version = "0.1.6" dependencies = [ "anyhow", "env_logger 0.10.2", diff --git a/Cargo.toml b/Cargo.toml index 244f820..bc6a682 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "hello-agent" -version = "0.1.5" +version = "0.1.6" edition = "2021" rust-version = "1.75" description = "Headless RustDesk-protocol-compatible support agent for Windows" diff --git a/src/login_events.rs b/src/login_events.rs new file mode 100644 index 0000000..ec296fe --- /dev/null +++ b/src/login_events.rs @@ -0,0 +1,617 @@ +// User-login tracking. +// +// Polls the Windows Terminal Services session table at a low cadence and +// reports logon / logoff events to the rustdesk-server admin API. Each +// event carries an explicit unix timestamp — for logons that's the OS's +// session `ConnectTime` (so the recorded time is when the user actually +// signed in, not when the agent first noticed them); for logoffs it's +// the agent's wall clock at the moment the session disappeared. +// +// Architecture mirrors `unattended_password`: +// * One background thread, its own current-thread Tokio runtime, no +// entanglement with the SCM supervisor's poll loop. +// * In-memory queue with retry-with-backoff on transport / ID_NOT_FOUND +// errors (the agent's first POST routinely races rendezvous +// registration). Events that never land are eventually dropped to +// bound memory — see DROP_AFTER. +// * Server-side dedup via UNIQUE INDEX +// (peer_id, kind, session_id, at, username) lets the agent re-emit +// the same `logon@ConnectTime` event on every service restart without +// piling up duplicate rows; agent-side state stays in memory only. +// +// Known limits (v1): +// * Lock / unlock are not reported — they don't change the +// WTSEnumerateSessions output we diff on. +// * Logoffs that happen while the service is down are not detected. +// The next service start sees "no session" and has nothing to diff +// against; this leaves a logon without a paired logoff in the UI. +// Tradeoff vs. persisting a snapshot to disk; revisit if operators +// ask for it. + +use anyhow::{anyhow, Result}; +use std::collections::HashMap; +use std::sync::Mutex; +use std::time::Duration; + +/// How often to poll the WTS session table. A user can't meaningfully +/// log in / out faster than this, and the OS keeps the table cheap to +/// enumerate (it's a kernel-side struct, not a registry scan). +const POLL_INTERVAL: Duration = Duration::from_secs(5); + +/// How often to attempt a flush of the pending queue. Decoupled from the +/// poll interval so a transient server outage doesn't slow down session +/// observation; we keep observing locally and just back off the network +/// retry. +const FLUSH_INTERVAL_BASE: Duration = Duration::from_secs(5); +const FLUSH_INTERVAL_MAX: Duration = Duration::from_secs(60); + +/// Drop events from the queue after this many failed delivery attempts. +/// At backoff cap = 60s, this is ~6 hours of trying — enough to ride out +/// a long server-side outage, short enough that a permanently-misconfigured +/// agent doesn't blow up memory. +const DROP_AFTER: u32 = 360; + +/// Cap per request — must match the server's MAX_EVENTS_PER_POST. Server +/// rejects anything larger with a 400 so we'd retry forever if we exceeded +/// it. +const MAX_EVENTS_PER_POST: usize = 256; + +/// One observed session snapshot. Equality by `(session_id, connect_time)` +/// — Windows can recycle session IDs across logins, so we use the OS- +/// reported `ConnectTime` as the disambiguator. Two snapshots compare +/// equal iff they describe the *same* logical login. +#[derive(Clone, Debug)] +struct Session { + session_id: u32, + /// FILETIME → unix epoch seconds. 0 if the OS returned an unparseable + /// value (very old Windows builds); we still report `logon` but the + /// server-side dedup degrades to "first observation wins". + connect_unix: i64, + username: String, + domain: String, + /// "console" | "rdp" | "" (unknown). + session_kind: String, +} + +#[derive(Clone, Debug)] +struct PendingEvent { + at: i64, + kind: &'static str, + username: String, + domain: String, + session_id: u32, + session_kind: String, + /// Number of times we've tried to flush this event. Used to drop + /// rows that have been retrying since forever. + attempts: u32, +} + +#[cfg(target_os = "windows")] +static QUEUE: Mutex> = Mutex::new(Vec::new()); + +/// Kick off the background tracker. Returns immediately. Safe to call +/// multiple times (subsequent calls are no-ops) — gated by an +/// `AtomicBool`. +pub fn start() { + #[cfg(not(target_os = "windows"))] + { + // Login tracking is Windows-only; on other platforms the call is + // a no-op so cross-platform builds (CI lint runs on Linux) link. + } + #[cfg(target_os = "windows")] + { + use std::sync::atomic::{AtomicBool, Ordering}; + static STARTED: AtomicBool = AtomicBool::new(false); + if STARTED.swap(true, Ordering::SeqCst) { + return; + } + std::thread::spawn(move || { + let rt = match tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + { + Ok(rt) => rt, + Err(e) => { + log::warn!("login-events: build runtime: {e}"); + return; + } + }; + rt.block_on(run_loop()); + }); + } +} + +#[cfg(target_os = "windows")] +async fn run_loop() { + // Diff snapshot. Keyed by session_id; value carries connect_unix so we + // can detect "session id reused for a new login" as well as "session + // disappeared". + let mut prev: HashMap = HashMap::new(); + let mut first_poll = true; + let mut flush_backoff = FLUSH_INTERVAL_BASE; + + loop { + match enumerate_active_user_sessions() { + Ok(snapshot) => { + let now_unix = hbb_common::chrono::Utc::now().timestamp(); + let curr: HashMap = + snapshot.into_iter().map(|s| (s.session_id, s)).collect(); + let events = + diff_into_events(&prev, &curr, first_poll, now_unix); + if !events.is_empty() { + let mut q = QUEUE.lock().unwrap(); + q.extend(events); + } + prev = curr; + first_poll = false; + } + Err(e) => { + log::warn!("login-events: enumerate failed: {e:#}"); + } + } + + // Try to flush. If the queue is empty this is cheap (no network). + // The flush also handles its own retry / drop semantics — we just + // adjust our local cadence based on whether it succeeded. + match flush_once().await { + FlushOutcome::Idle | FlushOutcome::AllSent => { + flush_backoff = FLUSH_INTERVAL_BASE; + } + FlushOutcome::Failed => { + flush_backoff = (flush_backoff * 2).min(FLUSH_INTERVAL_MAX); + } + } + + // Poll cadence is constant; the network backoff doesn't slow down + // observation — that would risk missing logoffs while the server + // is down. We just delay the retry of pending events. + tokio::time::sleep(POLL_INTERVAL.min(flush_backoff)).await; + } +} + +#[cfg(target_os = "windows")] +fn diff_into_events( + prev: &HashMap, + curr: &HashMap, + first_poll: bool, + now_unix: i64, +) -> Vec { + let mut out = Vec::new(); + + // New / changed sessions → logon. + for (sid, sess) in curr { + let changed = match prev.get(sid) { + None => true, + Some(old) => { + // Same session id but a different connect_time → the + // OS recycled the slot for a fresh login. Emit a + // logoff for the old occupant and a logon for the new. + if old.connect_unix != sess.connect_unix + || old.username != sess.username + { + out.push(PendingEvent { + at: now_unix, + kind: "logoff", + username: old.username.clone(), + domain: old.domain.clone(), + session_id: *sid, + session_kind: old.session_kind.clone(), + attempts: 0, + }); + true + } else { + false + } + } + }; + if !changed { + continue; + } + // ConnectTime can be 0 on the login-screen session even when a + // user is shown as logged in (rare edge); fall back to `now` so + // the row still lands somewhere sensible. + let at = if sess.connect_unix > 0 { + sess.connect_unix + } else { + now_unix + }; + out.push(PendingEvent { + at, + kind: "logon", + username: sess.username.clone(), + domain: sess.domain.clone(), + session_id: *sid, + session_kind: sess.session_kind.clone(), + attempts: 0, + }); + } + + // Disappeared sessions → logoff. Skipped on the very first poll + // because we have no baseline to diff against — see the module-level + // "first poll: emit logons-only" note. + if !first_poll { + for (sid, old) in prev { + if !curr.contains_key(sid) { + out.push(PendingEvent { + at: now_unix, + kind: "logoff", + username: old.username.clone(), + domain: old.domain.clone(), + session_id: *sid, + session_kind: old.session_kind.clone(), + attempts: 0, + }); + } + } + } + + out +} + +#[cfg(target_os = "windows")] +enum FlushOutcome { + Idle, + AllSent, + Failed, +} + +#[cfg(target_os = "windows")] +async fn flush_once() -> FlushOutcome { + // Take a snapshot under the lock so we don't hold it across the + // (potentially slow) network call. If the POST succeeds we drop the + // matching prefix from the queue; if it fails we put back the + // unsent tail with bumped attempt counters. + let batch: Vec = { + let mut q = QUEUE.lock().unwrap(); + if q.is_empty() { + return FlushOutcome::Idle; + } + let take = q.len().min(MAX_EVENTS_PER_POST); + q.drain(..take).collect() + }; + + let posted = post_batch(&batch).await; + match posted { + Ok(()) => { + // Server accepted (or returned ID_NOT_FOUND, which we treat + // as "drop and continue" because no amount of retrying will + // make the peer materialize without the rendezvous loop in + // --server, which the unattended_password reporter already + // hammers on; once that succeeds the next batch lands). + FlushOutcome::AllSent + } + Err(e) => { + log::warn!( + "login-events: flush of {} event(s) failed: {e:#}", + batch.len(), + ); + // Put the batch back with bumped attempt counters, modulo + // events that have hit the drop threshold. Prepend so that + // any events pushed by the poll loop between the drain and + // here stay after the (older) failed batch. + let mut requeued: Vec = batch + .into_iter() + .filter_map(|mut ev| { + ev.attempts = ev.attempts.saturating_add(1); + if ev.attempts >= DROP_AFTER { + log::warn!( + "login-events: dropping event after {} attempts: \ + kind={} session={} user={}", + ev.attempts, ev.kind, ev.session_id, ev.username, + ); + None + } else { + Some(ev) + } + }) + .collect(); + let mut q = QUEUE.lock().unwrap(); + let tail: Vec = q.drain(..).collect(); + requeued.extend(tail); + *q = requeued; + FlushOutcome::Failed + } + } +} + +#[cfg(target_os = "windows")] +async fn post_batch(batch: &[PendingEvent]) -> Result<()> { + let api = librustdesk::common::get_api_server( + hbb_common::config::Config::get_option("api-server"), + hbb_common::config::Config::get_option("custom-rendezvous-server"), + ); + if api.is_empty() { + return Err(anyhow!("no api-server configured yet")); + } + + let url = format!("{api}/api/agent/login-event"); + let id = hbb_common::config::Config::get_id(); + let uuid = librustdesk::common::encode64(hbb_common::get_uuid()); + + let events: Vec = batch + .iter() + .map(|ev| { + hbb_common::serde_json::json!({ + "at": ev.at, + "kind": ev.kind, + "username": ev.username, + "domain": ev.domain, + "session_id": ev.session_id, + "session_kind": ev.session_kind, + }) + }) + .collect(); + let body = hbb_common::serde_json::json!({ + "id": id, + "uuid": uuid, + "events": events, + }) + .to_string(); + + let headers = librustdesk::hbbs_http::sign::build_signed_headers( + "POST", + "/api/agent/login-event", + body.as_bytes(), + ) + .unwrap_or_default(); + + let resp = librustdesk::common::post_request(url, body, &headers) + .await + .map_err(|e| anyhow!("post: {e}"))?; + let trimmed = resp.trim(); + if trimmed == "OK" || trimmed == "ID_NOT_FOUND" { + // ID_NOT_FOUND is "peer not registered yet" — happens on the + // first few flushes after a fresh install, before the + // rendezvous loop in --server has created the peer row. The + // unattended_password reporter races this same window; once + // either of them succeeds the peer row exists and subsequent + // posts land. We treat it as success here so the agent doesn't + // pile up unbounded retries — if rendezvous never registers, + // the heartbeat path is also broken and the operator has + // bigger problems than missing login events. + Ok(()) + } else { + Err(anyhow!("unexpected response: {trimmed}")) + } +} + +// ─────────────────────────── Win32 session enumeration ──────────────────── +// +// Same shape as service.rs's find_active_user_session — we declare just +// the WTS functions we touch rather than pull in another bindgen. The +// types are tiny and ABI-stable. + +#[cfg(target_os = "windows")] +#[repr(C)] +struct WtsSessionInfoW { + session_id: u32, + win_station_name: *mut u16, + state: i32, +} + +#[cfg(target_os = "windows")] +#[repr(C)] +struct WtsInfoW { + state: i32, + session_id: u32, + incoming_bytes: u32, + outgoing_bytes: u32, + incoming_frames: u32, + outgoing_frames: u32, + incoming_compressed_bytes: u32, + outgoing_compressed_bytes: u32, + win_station_name: [u16; 32], + domain: [u16; 17], + user_name: [u16; 21], + connect_time: i64, + disconnect_time: i64, + last_input_time: i64, + logon_time: i64, + current_time: i64, +} + +#[cfg(target_os = "windows")] +extern "system" { + fn WTSEnumerateSessionsW( + h_server: winapi::shared::ntdef::HANDLE, + reserved: u32, + version: u32, + pp_session_info: *mut *mut WtsSessionInfoW, + p_count: *mut u32, + ) -> i32; + fn WTSFreeMemory(p_memory: *mut std::ffi::c_void); + fn WTSQuerySessionInformationW( + h_server: winapi::shared::ntdef::HANDLE, + session_id: u32, + info_class: i32, + pp_buffer: *mut *mut u16, + p_bytes_returned: *mut u32, + ) -> i32; +} + +#[cfg(target_os = "windows")] +const WTS_ACTIVE: i32 = 0; +#[cfg(target_os = "windows")] +const WTS_USER_NAME: i32 = 5; +#[cfg(target_os = "windows")] +const WTS_DOMAIN_NAME: i32 = 7; +#[cfg(target_os = "windows")] +const WTS_CLIENT_PROTOCOL_TYPE: i32 = 16; +#[cfg(target_os = "windows")] +const WTS_SESSION_INFO: i32 = 24; + +#[cfg(target_os = "windows")] +fn enumerate_active_user_sessions() -> Result> { + let mut sessions: *mut WtsSessionInfoW = std::ptr::null_mut(); + let mut count: u32 = 0; + let ok = unsafe { + WTSEnumerateSessionsW( + std::ptr::null_mut(), + 0, + 1, + &mut sessions, + &mut count, + ) + }; + if ok == 0 || sessions.is_null() { + return Err(anyhow!( + "WTSEnumerateSessionsW failed: {}", + std::io::Error::last_os_error() + )); + } + + let mut out = Vec::new(); + for i in 0..count { + let info = unsafe { &*sessions.add(i as usize) }; + if info.state != WTS_ACTIVE { + continue; + } + let sid = info.session_id; + + // Skip sessions without a logged-in user (login screen, Session 0). + let username = match query_wide(sid, WTS_USER_NAME) { + Some(s) if !s.is_empty() => s, + _ => continue, + }; + let domain = query_wide(sid, WTS_DOMAIN_NAME).unwrap_or_default(); + let session_kind = match query_protocol_type(sid) { + Some(0) => "console".to_string(), + Some(2) => "rdp".to_string(), + Some(_) | None => String::new(), + }; + let connect_unix = query_connect_time(sid).unwrap_or(0); + + out.push(Session { + session_id: sid, + connect_unix, + username, + domain, + session_kind, + }); + } + + unsafe { WTSFreeMemory(sessions as *mut std::ffi::c_void) }; + Ok(out) +} + +/// Pull a WCHAR-string-shaped value (WTSUserName / WTSDomainName / …) +/// and convert to UTF-8. Returns None on failure or empty result. +#[cfg(target_os = "windows")] +fn query_wide(session_id: u32, info_class: i32) -> Option { + let mut buf: *mut u16 = std::ptr::null_mut(); + let mut bytes: u32 = 0; + let ok = unsafe { + WTSQuerySessionInformationW( + std::ptr::null_mut(), + session_id, + info_class, + &mut buf, + &mut bytes, + ) + }; + if ok == 0 || buf.is_null() { + return None; + } + let s = unsafe { wide_to_string(buf, bytes) }; + unsafe { WTSFreeMemory(buf as *mut std::ffi::c_void) }; + if s.is_empty() { + None + } else { + Some(s) + } +} + +/// WTSClientProtocolType returns a single USHORT (2 bytes). Buffer is +/// allocated by Windows; we read the 16-bit value and free. +#[cfg(target_os = "windows")] +fn query_protocol_type(session_id: u32) -> Option { + let mut buf: *mut u16 = std::ptr::null_mut(); + let mut bytes: u32 = 0; + let ok = unsafe { + WTSQuerySessionInformationW( + std::ptr::null_mut(), + session_id, + WTS_CLIENT_PROTOCOL_TYPE, + &mut buf, + &mut bytes, + ) + }; + if ok == 0 || buf.is_null() || bytes < 2 { + if !buf.is_null() { + unsafe { WTSFreeMemory(buf as *mut std::ffi::c_void) }; + } + return None; + } + let val = unsafe { *buf }; + unsafe { WTSFreeMemory(buf as *mut std::ffi::c_void) }; + Some(val) +} + +/// Pull ConnectTime from WTSINFOW (the per-session struct returned by +/// WTSQuerySessionInformation with class WTSSessionInfo). FILETIME 0 +/// means "OS doesn't know" (rare — happens on the login-screen session +/// before a user signs in); we surface that as None and the caller +/// falls back to `now()`. +#[cfg(target_os = "windows")] +fn query_connect_time(session_id: u32) -> Option { + let mut buf: *mut u16 = std::ptr::null_mut(); + let mut bytes: u32 = 0; + let ok = unsafe { + WTSQuerySessionInformationW( + std::ptr::null_mut(), + session_id, + WTS_SESSION_INFO, + &mut buf, + &mut bytes, + ) + }; + if ok == 0 || buf.is_null() || (bytes as usize) < std::mem::size_of::() { + if !buf.is_null() { + unsafe { WTSFreeMemory(buf as *mut std::ffi::c_void) }; + } + return None; + } + let info: &WtsInfoW = unsafe { &*(buf as *const WtsInfoW) }; + let ft = info.connect_time; + unsafe { WTSFreeMemory(buf as *mut std::ffi::c_void) }; + let unix = filetime_to_unix(ft); + if unix > 0 { + Some(unix) + } else { + None + } +} + +/// FILETIME is 100-nanosecond ticks since 1601-01-01 UTC. Convert to +/// unix seconds; clamp to >=0 since the table column is signed but a +/// pre-epoch ConnectTime is nonsense for our purposes. +#[cfg(target_os = "windows")] +fn filetime_to_unix(ft: i64) -> i64 { + // 11644473600 = seconds between 1601-01-01 and 1970-01-01. + const TICKS_PER_SEC: i64 = 10_000_000; + const EPOCH_DIFF_SECS: i64 = 11_644_473_600; + if ft <= 0 { + return 0; + } + let secs_since_1601 = ft / TICKS_PER_SEC; + let unix = secs_since_1601 - EPOCH_DIFF_SECS; + unix.max(0) +} + +/// Convert a NUL-terminated UTF-16 buffer to a Rust String. `bytes` is +/// the byte count Windows returned (NOT the char count); we trust the +/// trailing NUL but cap on `bytes / 2` so a malformed buffer can't run +/// us into unallocated memory. +#[cfg(target_os = "windows")] +unsafe fn wide_to_string(buf: *const u16, bytes: u32) -> String { + if buf.is_null() || bytes == 0 { + return String::new(); + } + let max_chars = (bytes as usize) / 2; + let mut len = 0usize; + while len < max_chars && *buf.add(len) != 0 { + len += 1; + } + let slice = std::slice::from_raw_parts(buf, len); + String::from_utf16_lossy(slice) +} diff --git a/src/main.rs b/src/main.rs index 575a0c5..9d2b027 100644 --- a/src/main.rs +++ b/src/main.rs @@ -29,6 +29,7 @@ mod inventory; mod cm_popup; #[cfg(target_os = "windows")] mod exec; +mod login_events; #[cfg(target_os = "windows")] mod service; #[cfg(target_os = "windows")] diff --git a/src/service.rs b/src/service.rs index 9ecb40c..b859a3e 100644 --- a/src/service.rs +++ b/src/service.rs @@ -704,6 +704,13 @@ fn service_main_inner() -> Result<()> { // can race the rendezvous registration done by `--server`). crate::unattended_password::rotate_and_report(); + // Start the user-login tracker. Polls the WTS session table every + // few seconds, diffs against its previous snapshot, and POSTs + // logon/logoff events to the admin API. Independent background + // thread + Tokio runtime; lives for the service lifetime, no + // shutdown hook (the SCM termination is enough). + crate::login_events::start(); + // Worker process handle. Killed on Stop, replaced on session change. // `last_state` carries (session_id, had_user). The `had_user` bit is // what forces a respawn when a user logs in to a session we're