diff --git a/src/main.rs b/src/main.rs index 10c93a4..1f2062a 100644 --- a/src/main.rs +++ b/src/main.rs @@ -30,6 +30,8 @@ mod cm_popup; #[cfg(target_os = "windows")] mod exec; mod login_events; +#[cfg(target_os = "windows")] +mod net_monitor; mod perf; mod perf_events; #[cfg(target_os = "windows")] diff --git a/src/net_monitor.rs b/src/net_monitor.rs new file mode 100644 index 0000000..b59d612 --- /dev/null +++ b/src/net_monitor.rs @@ -0,0 +1,358 @@ +// Network-change monitor. Polls the device's connectivity state — public IP, +// per-interface LAN IPv4, and the current Wi-Fi SSID/BSSID — diffs it against +// the last-known snapshot, and POSTs only the *changes* to +// `/api/agent/network-events`. +// +// The snapshot is persisted in LocalConfig so an agent restart doesn't re-emit +// the whole state as "changes"; only a genuine change since the last poll +// produces events. The very first run (no persisted snapshot) records the +// initial state with an empty `old` value so the history has a starting point. +// +// Mirrors `perf.rs`: a dedicated thread with a current-thread tokio runtime, an +// in-memory queue, and signed POSTs with capped backoff + bounded retry. + +#![cfg(target_os = "windows")] + +use anyhow::{anyhow, Result}; +use hbb_common::config::{Config, LocalConfig}; +use hbb_common::serde_json::{self, json, Value}; +use std::collections::BTreeMap; +use std::sync::Mutex; +use std::time::Duration; + +/// Poll cadence for the cheap local state (LAN + Wi-Fi). +const SAMPLE_INTERVAL: Duration = Duration::from_secs(60); +/// Refresh the (external) public IP only every Nth poll to limit egress. +const PUBLIC_IP_EVERY: u32 = 5; // ~5 minutes +const FLUSH_BASE: Duration = Duration::from_secs(15); +const FLUSH_MAX: Duration = Duration::from_secs(15 * 60); +const MAX_EVENTS_PER_POST: usize = 256; +const DROP_AFTER: u32 = 300; +const STATE_KEY: &str = "net_monitor_state"; +const CREATE_NO_WINDOW: u32 = 0x0800_0000; + +#[derive(Clone, Default, Debug)] +struct PendingEvent { + at: i64, + kind: String, + iface: String, + old: String, + new: String, + attempts: u32, +} + +static QUEUE: Mutex> = Mutex::new(Vec::new()); + +/// Current connectivity snapshot. Persisted to LocalConfig as JSON. +#[derive(Clone, Default)] +struct Snapshot { + public_ip: String, + lan: BTreeMap>, + ssid: String, + bssid: String, +} + +impl Snapshot { + fn to_value(&self) -> Value { + let lan: serde_json::Map = self + .lan + .iter() + .map(|(k, v)| (k.clone(), json!(v))) + .collect(); + json!({ + "public_ip": self.public_ip, + "lan": Value::Object(lan), + "ssid": self.ssid, + "bssid": self.bssid, + }) + } + + fn from_value(v: &Value) -> Snapshot { + let mut lan = BTreeMap::new(); + if let Some(obj) = v.get("lan").and_then(|x| x.as_object()) { + for (k, val) in obj { + let ips: Vec = val + .as_array() + .map(|a| a.iter().filter_map(|x| x.as_str().map(String::from)).collect()) + .unwrap_or_default(); + lan.insert(k.clone(), ips); + } + } + Snapshot { + public_ip: v.get("public_ip").and_then(|x| x.as_str()).unwrap_or("").to_string(), + lan, + ssid: v.get("ssid").and_then(|x| x.as_str()).unwrap_or("").to_string(), + bssid: v.get("bssid").and_then(|x| x.as_str()).unwrap_or("").to_string(), + } + } +} + +pub fn start() { + use std::sync::atomic::{AtomicBool, Ordering}; + static STARTED: AtomicBool = AtomicBool::new(false); + if STARTED.swap(true, Ordering::SeqCst) { + return; + } + std::thread::spawn(move || { + let rt = match tokio::runtime::Builder::new_current_thread().enable_all().build() { + Ok(rt) => rt, + Err(e) => { + hbb_common::log::warn!("net_monitor: build runtime: {e}"); + return; + } + }; + rt.block_on(run_loop()); + }); +} + +async fn run_loop() { + // Load the last-known snapshot so a restart doesn't re-emit everything. + let mut prev: Snapshot = serde_json::from_str::(&LocalConfig::get_option(STATE_KEY)) + .ok() + .map(|v| Snapshot::from_value(&v)) + .unwrap_or_default(); + // On the very first run there's no persisted state; emit the initial state. + let mut first_run = LocalConfig::get_option(STATE_KEY).is_empty(); + + let mut tick: u32 = 0; + let mut flush_backoff = FLUSH_BASE; + loop { + let refresh_public = first_run || tick % PUBLIC_IP_EVERY == 0; + let cur = collect(&prev, refresh_public); + let at = hbb_common::chrono::Utc::now().timestamp(); + + let events = diff(&prev, &cur, at); + if !events.is_empty() { + // Skip the empty→empty noise on a brand-new machine with no data. + let mut q = QUEUE.lock().unwrap(); + q.extend(events); + } + + // Advance + persist the baseline regardless of whether the flush + // succeeds, so a network outage doesn't make us re-emit on recovery. + prev = cur; + LocalConfig::set_option(STATE_KEY.to_string(), prev.to_value().to_string()); + first_run = false; + + match flush_once().await { + FlushOutcome::Idle | FlushOutcome::AllSent => flush_backoff = FLUSH_BASE, + FlushOutcome::Failed => flush_backoff = (flush_backoff * 2).min(FLUSH_MAX), + } + + tick = tick.wrapping_add(1); + tokio::time::sleep(SAMPLE_INTERVAL.min(flush_backoff)).await; + } +} + +/// Build the current snapshot. `prev` supplies the carried-over public IP when +/// we're not refreshing it this tick. +fn collect(prev: &Snapshot, refresh_public: bool) -> Snapshot { + let lan = collect_lan(); + let (ssid, bssid) = collect_wifi(); + let public_ip = if refresh_public { + collect_public_ip().unwrap_or_else(|| prev.public_ip.clone()) + } else { + prev.public_ip.clone() + }; + Snapshot { public_ip, lan, ssid, bssid } +} + +/// Per-interface IPv4 addresses (every adapter, loopback excluded), sorted. +fn collect_lan() -> BTreeMap> { + const PS: &str = r#" +$rows = Get-NetIPAddress -AddressFamily IPv4 -ErrorAction SilentlyContinue | + Where-Object { $_.IPAddress -notlike '127.*' } | + Group-Object InterfaceAlias | + ForEach-Object { [pscustomobject]@{ iface = $_.Name; ips = @($_.Group.IPAddress) } } +ConvertTo-Json -Compress -Depth 4 -InputObject @($rows) +"#; + let mut map = BTreeMap::new(); + let Some(out) = run_ps(PS) else { return map }; + let Ok(v) = serde_json::from_str::(out.trim()) else { return map }; + if let Some(arr) = v.as_array() { + for row in arr { + let iface = row.get("iface").and_then(|x| x.as_str()).unwrap_or("").to_string(); + if iface.is_empty() { + continue; + } + let mut ips: Vec = row + .get("ips") + .and_then(|x| x.as_array()) + .map(|a| a.iter().filter_map(|x| x.as_str().map(String::from)).collect()) + .unwrap_or_default(); + ips.sort(); + ips.dedup(); + map.insert(iface, ips); + } + } + map +} + +fn collect_public_ip() -> Option { + const PS: &str = + "try { (Invoke-RestMethod -Uri 'https://api.ipify.org' -TimeoutSec 5).Trim() } catch { '' }"; + let out = run_ps(PS)?; + let ip = out.trim().to_string(); + if ip.is_empty() { + None + } else { + Some(ip) + } +} + +fn collect_wifi() -> (String, String) { + let (current, _nearby) = crate::wifi_native::collect(); + let get = |k: &str| { + current + .as_ref() + .and_then(|c| c.get(k)) + .and_then(|v| v.as_str()) + .unwrap_or("") + .to_string() + }; + (get("ssid"), get("bssid")) +} + +fn run_ps(script: &str) -> Option { + use std::os::windows::process::CommandExt; + use std::process::Command; + let out = Command::new("powershell.exe") + .args([ + "-NoProfile", + "-NonInteractive", + "-ExecutionPolicy", + "Bypass", + "-Command", + script, + ]) + .creation_flags(CREATE_NO_WINDOW) + .output() + .ok()?; + if !out.status.success() { + return None; + } + Some(String::from_utf8_lossy(&out.stdout).into_owned()) +} + +fn ev(at: i64, kind: &str, iface: &str, old: &str, new: &str) -> PendingEvent { + PendingEvent { + at, + kind: kind.to_string(), + iface: iface.to_string(), + old: old.to_string(), + new: new.to_string(), + attempts: 0, + } +} + +fn diff(prev: &Snapshot, cur: &Snapshot, at: i64) -> Vec { + let mut events = Vec::new(); + + if prev.public_ip != cur.public_ip && !(prev.public_ip.is_empty() && cur.public_ip.is_empty()) { + events.push(ev(at, "public_ip", "", &prev.public_ip, &cur.public_ip)); + } + if prev.ssid != cur.ssid && !(prev.ssid.is_empty() && cur.ssid.is_empty()) { + events.push(ev(at, "wifi_ssid", "", &prev.ssid, &cur.ssid)); + } + if prev.bssid != cur.bssid && !(prev.bssid.is_empty() && cur.bssid.is_empty()) { + events.push(ev(at, "wifi_bssid", "", &prev.bssid, &cur.bssid)); + } + + // LAN IPv4 per interface, over the union of interface names. + let mut ifaces: Vec<&String> = prev.lan.keys().chain(cur.lan.keys()).collect(); + ifaces.sort(); + ifaces.dedup(); + for iface in ifaces { + let old = prev.lan.get(iface).map(|v| v.join(", ")).unwrap_or_default(); + let new = cur.lan.get(iface).map(|v| v.join(", ")).unwrap_or_default(); + if old != new { + events.push(ev(at, "lan_ipv4", iface, &old, &new)); + } + } + + events +} + +enum FlushOutcome { + Idle, + AllSent, + Failed, +} + +async fn flush_once() -> FlushOutcome { + let batch: Vec = { + let mut q = QUEUE.lock().unwrap(); + if q.is_empty() { + return FlushOutcome::Idle; + } + let take = q.len().min(MAX_EVENTS_PER_POST); + q.drain(..take).collect() + }; + + match post_batch(&batch).await { + Ok(()) => FlushOutcome::AllSent, + Err(e) => { + hbb_common::log::warn!("net_monitor: flush of {} event(s) failed: {e:#}", batch.len()); + let mut requeued: Vec = batch + .into_iter() + .filter_map(|mut s| { + s.attempts = s.attempts.saturating_add(1); + if s.attempts >= DROP_AFTER { + None + } else { + Some(s) + } + }) + .collect(); + let mut q = QUEUE.lock().unwrap(); + let tail: Vec = q.drain(..).collect(); + requeued.extend(tail); + *q = requeued; + FlushOutcome::Failed + } + } +} + +async fn post_batch(batch: &[PendingEvent]) -> Result<()> { + let api = librustdesk::common::get_api_server( + Config::get_option("api-server"), + Config::get_option("custom-rendezvous-server"), + ); + if api.is_empty() { + return Err(anyhow!("no api-server configured yet")); + } + let url = format!("{api}/api/agent/network-events"); + let id = Config::get_id(); + let uuid = librustdesk::common::encode64(hbb_common::get_uuid()); + + let events: Vec = batch + .iter() + .map(|e| { + json!({ + "at": e.at, + "kind": e.kind, + "iface": e.iface, + "old": e.old, + "new": e.new, + }) + }) + .collect(); + let body = json!({ "id": id, "uuid": uuid, "events": events }).to_string(); + + let headers = librustdesk::hbbs_http::sign::build_signed_headers( + "POST", + "/api/agent/network-events", + body.as_bytes(), + ) + .unwrap_or_default(); + + let resp = librustdesk::common::post_request(url, body, &headers) + .await + .map_err(|e| anyhow!("post: {e}"))?; + let trimmed = resp.trim(); + if trimmed == "OK" || trimmed == "ID_NOT_FOUND" { + Ok(()) + } else { + Err(anyhow!("unexpected response: {trimmed}")) + } +} diff --git a/src/service.rs b/src/service.rs index 1ef4049..84c3a7b 100644 --- a/src/service.rs +++ b/src/service.rs @@ -725,6 +725,13 @@ fn service_main_inner() -> Result<()> { // re-emit the whole history. crate::perf_events::start(); + // Start the network-change monitor: polls public IP, per-interface LAN + // IPv4, and Wi-Fi SSID/BSSID once a minute, diffs against the persisted + // snapshot, and POSTs only changes to /api/agent/network-events. Powers + // the device Network-history tab and the fleet network log. + #[cfg(target_os = "windows")] + crate::net_monitor::start(); + // Worker process handle. Killed on Stop, replaced on session change. // `last_state` carries (session_id, had_user). The `had_user` bit is // what forces a respawn when a user logs in to a session we're