net: monitor and report connectivity changes
Add a network-change monitor that polls public IP, per-interface LAN IPv4 (every adapter), and Wi-Fi SSID/BSSID once a minute, diffs against a snapshot persisted in LocalConfig, and POSTs only the changes (old -> new) to /api/agent/network-events. Restart-safe (no re-emit) with the same queue/retry/backoff as the perf + login-event monitors. Powers the opsbase device Network-history tab and the fleet network log. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -30,6 +30,8 @@ mod cm_popup;
|
||||
#[cfg(target_os = "windows")]
|
||||
mod exec;
|
||||
mod login_events;
|
||||
#[cfg(target_os = "windows")]
|
||||
mod net_monitor;
|
||||
mod perf;
|
||||
mod perf_events;
|
||||
#[cfg(target_os = "windows")]
|
||||
|
||||
@@ -0,0 +1,358 @@
|
||||
// Network-change monitor. Polls the device's connectivity state — public IP,
|
||||
// per-interface LAN IPv4, and the current Wi-Fi SSID/BSSID — diffs it against
|
||||
// the last-known snapshot, and POSTs only the *changes* to
|
||||
// `<api-server>/api/agent/network-events`.
|
||||
//
|
||||
// The snapshot is persisted in LocalConfig so an agent restart doesn't re-emit
|
||||
// the whole state as "changes"; only a genuine change since the last poll
|
||||
// produces events. The very first run (no persisted snapshot) records the
|
||||
// initial state with an empty `old` value so the history has a starting point.
|
||||
//
|
||||
// Mirrors `perf.rs`: a dedicated thread with a current-thread tokio runtime, an
|
||||
// in-memory queue, and signed POSTs with capped backoff + bounded retry.
|
||||
|
||||
#![cfg(target_os = "windows")]
|
||||
|
||||
use anyhow::{anyhow, Result};
|
||||
use hbb_common::config::{Config, LocalConfig};
|
||||
use hbb_common::serde_json::{self, json, Value};
|
||||
use std::collections::BTreeMap;
|
||||
use std::sync::Mutex;
|
||||
use std::time::Duration;
|
||||
|
||||
/// Poll cadence for the cheap local state (LAN + Wi-Fi).
|
||||
const SAMPLE_INTERVAL: Duration = Duration::from_secs(60);
|
||||
/// Refresh the (external) public IP only every Nth poll to limit egress.
|
||||
const PUBLIC_IP_EVERY: u32 = 5; // ~5 minutes
|
||||
const FLUSH_BASE: Duration = Duration::from_secs(15);
|
||||
const FLUSH_MAX: Duration = Duration::from_secs(15 * 60);
|
||||
const MAX_EVENTS_PER_POST: usize = 256;
|
||||
const DROP_AFTER: u32 = 300;
|
||||
const STATE_KEY: &str = "net_monitor_state";
|
||||
const CREATE_NO_WINDOW: u32 = 0x0800_0000;
|
||||
|
||||
#[derive(Clone, Default, Debug)]
|
||||
struct PendingEvent {
|
||||
at: i64,
|
||||
kind: String,
|
||||
iface: String,
|
||||
old: String,
|
||||
new: String,
|
||||
attempts: u32,
|
||||
}
|
||||
|
||||
static QUEUE: Mutex<Vec<PendingEvent>> = Mutex::new(Vec::new());
|
||||
|
||||
/// Current connectivity snapshot. Persisted to LocalConfig as JSON.
|
||||
#[derive(Clone, Default)]
|
||||
struct Snapshot {
|
||||
public_ip: String,
|
||||
lan: BTreeMap<String, Vec<String>>,
|
||||
ssid: String,
|
||||
bssid: String,
|
||||
}
|
||||
|
||||
impl Snapshot {
|
||||
fn to_value(&self) -> Value {
|
||||
let lan: serde_json::Map<String, Value> = self
|
||||
.lan
|
||||
.iter()
|
||||
.map(|(k, v)| (k.clone(), json!(v)))
|
||||
.collect();
|
||||
json!({
|
||||
"public_ip": self.public_ip,
|
||||
"lan": Value::Object(lan),
|
||||
"ssid": self.ssid,
|
||||
"bssid": self.bssid,
|
||||
})
|
||||
}
|
||||
|
||||
fn from_value(v: &Value) -> Snapshot {
|
||||
let mut lan = BTreeMap::new();
|
||||
if let Some(obj) = v.get("lan").and_then(|x| x.as_object()) {
|
||||
for (k, val) in obj {
|
||||
let ips: Vec<String> = val
|
||||
.as_array()
|
||||
.map(|a| a.iter().filter_map(|x| x.as_str().map(String::from)).collect())
|
||||
.unwrap_or_default();
|
||||
lan.insert(k.clone(), ips);
|
||||
}
|
||||
}
|
||||
Snapshot {
|
||||
public_ip: v.get("public_ip").and_then(|x| x.as_str()).unwrap_or("").to_string(),
|
||||
lan,
|
||||
ssid: v.get("ssid").and_then(|x| x.as_str()).unwrap_or("").to_string(),
|
||||
bssid: v.get("bssid").and_then(|x| x.as_str()).unwrap_or("").to_string(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn start() {
|
||||
use std::sync::atomic::{AtomicBool, Ordering};
|
||||
static STARTED: AtomicBool = AtomicBool::new(false);
|
||||
if STARTED.swap(true, Ordering::SeqCst) {
|
||||
return;
|
||||
}
|
||||
std::thread::spawn(move || {
|
||||
let rt = match tokio::runtime::Builder::new_current_thread().enable_all().build() {
|
||||
Ok(rt) => rt,
|
||||
Err(e) => {
|
||||
hbb_common::log::warn!("net_monitor: build runtime: {e}");
|
||||
return;
|
||||
}
|
||||
};
|
||||
rt.block_on(run_loop());
|
||||
});
|
||||
}
|
||||
|
||||
async fn run_loop() {
|
||||
// Load the last-known snapshot so a restart doesn't re-emit everything.
|
||||
let mut prev: Snapshot = serde_json::from_str::<Value>(&LocalConfig::get_option(STATE_KEY))
|
||||
.ok()
|
||||
.map(|v| Snapshot::from_value(&v))
|
||||
.unwrap_or_default();
|
||||
// On the very first run there's no persisted state; emit the initial state.
|
||||
let mut first_run = LocalConfig::get_option(STATE_KEY).is_empty();
|
||||
|
||||
let mut tick: u32 = 0;
|
||||
let mut flush_backoff = FLUSH_BASE;
|
||||
loop {
|
||||
let refresh_public = first_run || tick % PUBLIC_IP_EVERY == 0;
|
||||
let cur = collect(&prev, refresh_public);
|
||||
let at = hbb_common::chrono::Utc::now().timestamp();
|
||||
|
||||
let events = diff(&prev, &cur, at);
|
||||
if !events.is_empty() {
|
||||
// Skip the empty→empty noise on a brand-new machine with no data.
|
||||
let mut q = QUEUE.lock().unwrap();
|
||||
q.extend(events);
|
||||
}
|
||||
|
||||
// Advance + persist the baseline regardless of whether the flush
|
||||
// succeeds, so a network outage doesn't make us re-emit on recovery.
|
||||
prev = cur;
|
||||
LocalConfig::set_option(STATE_KEY.to_string(), prev.to_value().to_string());
|
||||
first_run = false;
|
||||
|
||||
match flush_once().await {
|
||||
FlushOutcome::Idle | FlushOutcome::AllSent => flush_backoff = FLUSH_BASE,
|
||||
FlushOutcome::Failed => flush_backoff = (flush_backoff * 2).min(FLUSH_MAX),
|
||||
}
|
||||
|
||||
tick = tick.wrapping_add(1);
|
||||
tokio::time::sleep(SAMPLE_INTERVAL.min(flush_backoff)).await;
|
||||
}
|
||||
}
|
||||
|
||||
/// Build the current snapshot. `prev` supplies the carried-over public IP when
|
||||
/// we're not refreshing it this tick.
|
||||
fn collect(prev: &Snapshot, refresh_public: bool) -> Snapshot {
|
||||
let lan = collect_lan();
|
||||
let (ssid, bssid) = collect_wifi();
|
||||
let public_ip = if refresh_public {
|
||||
collect_public_ip().unwrap_or_else(|| prev.public_ip.clone())
|
||||
} else {
|
||||
prev.public_ip.clone()
|
||||
};
|
||||
Snapshot { public_ip, lan, ssid, bssid }
|
||||
}
|
||||
|
||||
/// Per-interface IPv4 addresses (every adapter, loopback excluded), sorted.
|
||||
fn collect_lan() -> BTreeMap<String, Vec<String>> {
|
||||
const PS: &str = r#"
|
||||
$rows = Get-NetIPAddress -AddressFamily IPv4 -ErrorAction SilentlyContinue |
|
||||
Where-Object { $_.IPAddress -notlike '127.*' } |
|
||||
Group-Object InterfaceAlias |
|
||||
ForEach-Object { [pscustomobject]@{ iface = $_.Name; ips = @($_.Group.IPAddress) } }
|
||||
ConvertTo-Json -Compress -Depth 4 -InputObject @($rows)
|
||||
"#;
|
||||
let mut map = BTreeMap::new();
|
||||
let Some(out) = run_ps(PS) else { return map };
|
||||
let Ok(v) = serde_json::from_str::<Value>(out.trim()) else { return map };
|
||||
if let Some(arr) = v.as_array() {
|
||||
for row in arr {
|
||||
let iface = row.get("iface").and_then(|x| x.as_str()).unwrap_or("").to_string();
|
||||
if iface.is_empty() {
|
||||
continue;
|
||||
}
|
||||
let mut ips: Vec<String> = row
|
||||
.get("ips")
|
||||
.and_then(|x| x.as_array())
|
||||
.map(|a| a.iter().filter_map(|x| x.as_str().map(String::from)).collect())
|
||||
.unwrap_or_default();
|
||||
ips.sort();
|
||||
ips.dedup();
|
||||
map.insert(iface, ips);
|
||||
}
|
||||
}
|
||||
map
|
||||
}
|
||||
|
||||
fn collect_public_ip() -> Option<String> {
|
||||
const PS: &str =
|
||||
"try { (Invoke-RestMethod -Uri 'https://api.ipify.org' -TimeoutSec 5).Trim() } catch { '' }";
|
||||
let out = run_ps(PS)?;
|
||||
let ip = out.trim().to_string();
|
||||
if ip.is_empty() {
|
||||
None
|
||||
} else {
|
||||
Some(ip)
|
||||
}
|
||||
}
|
||||
|
||||
fn collect_wifi() -> (String, String) {
|
||||
let (current, _nearby) = crate::wifi_native::collect();
|
||||
let get = |k: &str| {
|
||||
current
|
||||
.as_ref()
|
||||
.and_then(|c| c.get(k))
|
||||
.and_then(|v| v.as_str())
|
||||
.unwrap_or("")
|
||||
.to_string()
|
||||
};
|
||||
(get("ssid"), get("bssid"))
|
||||
}
|
||||
|
||||
fn run_ps(script: &str) -> Option<String> {
|
||||
use std::os::windows::process::CommandExt;
|
||||
use std::process::Command;
|
||||
let out = Command::new("powershell.exe")
|
||||
.args([
|
||||
"-NoProfile",
|
||||
"-NonInteractive",
|
||||
"-ExecutionPolicy",
|
||||
"Bypass",
|
||||
"-Command",
|
||||
script,
|
||||
])
|
||||
.creation_flags(CREATE_NO_WINDOW)
|
||||
.output()
|
||||
.ok()?;
|
||||
if !out.status.success() {
|
||||
return None;
|
||||
}
|
||||
Some(String::from_utf8_lossy(&out.stdout).into_owned())
|
||||
}
|
||||
|
||||
fn ev(at: i64, kind: &str, iface: &str, old: &str, new: &str) -> PendingEvent {
|
||||
PendingEvent {
|
||||
at,
|
||||
kind: kind.to_string(),
|
||||
iface: iface.to_string(),
|
||||
old: old.to_string(),
|
||||
new: new.to_string(),
|
||||
attempts: 0,
|
||||
}
|
||||
}
|
||||
|
||||
fn diff(prev: &Snapshot, cur: &Snapshot, at: i64) -> Vec<PendingEvent> {
|
||||
let mut events = Vec::new();
|
||||
|
||||
if prev.public_ip != cur.public_ip && !(prev.public_ip.is_empty() && cur.public_ip.is_empty()) {
|
||||
events.push(ev(at, "public_ip", "", &prev.public_ip, &cur.public_ip));
|
||||
}
|
||||
if prev.ssid != cur.ssid && !(prev.ssid.is_empty() && cur.ssid.is_empty()) {
|
||||
events.push(ev(at, "wifi_ssid", "", &prev.ssid, &cur.ssid));
|
||||
}
|
||||
if prev.bssid != cur.bssid && !(prev.bssid.is_empty() && cur.bssid.is_empty()) {
|
||||
events.push(ev(at, "wifi_bssid", "", &prev.bssid, &cur.bssid));
|
||||
}
|
||||
|
||||
// LAN IPv4 per interface, over the union of interface names.
|
||||
let mut ifaces: Vec<&String> = prev.lan.keys().chain(cur.lan.keys()).collect();
|
||||
ifaces.sort();
|
||||
ifaces.dedup();
|
||||
for iface in ifaces {
|
||||
let old = prev.lan.get(iface).map(|v| v.join(", ")).unwrap_or_default();
|
||||
let new = cur.lan.get(iface).map(|v| v.join(", ")).unwrap_or_default();
|
||||
if old != new {
|
||||
events.push(ev(at, "lan_ipv4", iface, &old, &new));
|
||||
}
|
||||
}
|
||||
|
||||
events
|
||||
}
|
||||
|
||||
enum FlushOutcome {
|
||||
Idle,
|
||||
AllSent,
|
||||
Failed,
|
||||
}
|
||||
|
||||
async fn flush_once() -> FlushOutcome {
|
||||
let batch: Vec<PendingEvent> = {
|
||||
let mut q = QUEUE.lock().unwrap();
|
||||
if q.is_empty() {
|
||||
return FlushOutcome::Idle;
|
||||
}
|
||||
let take = q.len().min(MAX_EVENTS_PER_POST);
|
||||
q.drain(..take).collect()
|
||||
};
|
||||
|
||||
match post_batch(&batch).await {
|
||||
Ok(()) => FlushOutcome::AllSent,
|
||||
Err(e) => {
|
||||
hbb_common::log::warn!("net_monitor: flush of {} event(s) failed: {e:#}", batch.len());
|
||||
let mut requeued: Vec<PendingEvent> = batch
|
||||
.into_iter()
|
||||
.filter_map(|mut s| {
|
||||
s.attempts = s.attempts.saturating_add(1);
|
||||
if s.attempts >= DROP_AFTER {
|
||||
None
|
||||
} else {
|
||||
Some(s)
|
||||
}
|
||||
})
|
||||
.collect();
|
||||
let mut q = QUEUE.lock().unwrap();
|
||||
let tail: Vec<PendingEvent> = q.drain(..).collect();
|
||||
requeued.extend(tail);
|
||||
*q = requeued;
|
||||
FlushOutcome::Failed
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn post_batch(batch: &[PendingEvent]) -> Result<()> {
|
||||
let api = librustdesk::common::get_api_server(
|
||||
Config::get_option("api-server"),
|
||||
Config::get_option("custom-rendezvous-server"),
|
||||
);
|
||||
if api.is_empty() {
|
||||
return Err(anyhow!("no api-server configured yet"));
|
||||
}
|
||||
let url = format!("{api}/api/agent/network-events");
|
||||
let id = Config::get_id();
|
||||
let uuid = librustdesk::common::encode64(hbb_common::get_uuid());
|
||||
|
||||
let events: Vec<Value> = batch
|
||||
.iter()
|
||||
.map(|e| {
|
||||
json!({
|
||||
"at": e.at,
|
||||
"kind": e.kind,
|
||||
"iface": e.iface,
|
||||
"old": e.old,
|
||||
"new": e.new,
|
||||
})
|
||||
})
|
||||
.collect();
|
||||
let body = json!({ "id": id, "uuid": uuid, "events": events }).to_string();
|
||||
|
||||
let headers = librustdesk::hbbs_http::sign::build_signed_headers(
|
||||
"POST",
|
||||
"/api/agent/network-events",
|
||||
body.as_bytes(),
|
||||
)
|
||||
.unwrap_or_default();
|
||||
|
||||
let resp = librustdesk::common::post_request(url, body, &headers)
|
||||
.await
|
||||
.map_err(|e| anyhow!("post: {e}"))?;
|
||||
let trimmed = resp.trim();
|
||||
if trimmed == "OK" || trimmed == "ID_NOT_FOUND" {
|
||||
Ok(())
|
||||
} else {
|
||||
Err(anyhow!("unexpected response: {trimmed}"))
|
||||
}
|
||||
}
|
||||
@@ -725,6 +725,13 @@ fn service_main_inner() -> Result<()> {
|
||||
// re-emit the whole history.
|
||||
crate::perf_events::start();
|
||||
|
||||
// Start the network-change monitor: polls public IP, per-interface LAN
|
||||
// IPv4, and Wi-Fi SSID/BSSID once a minute, diffs against the persisted
|
||||
// snapshot, and POSTs only changes to /api/agent/network-events. Powers
|
||||
// the device Network-history tab and the fleet network log.
|
||||
#[cfg(target_os = "windows")]
|
||||
crate::net_monitor::start();
|
||||
|
||||
// Worker process handle. Killed on Stop, replaced on session change.
|
||||
// `last_state` carries (session_id, had_user). The `had_user` bit is
|
||||
// what forces a respawn when a user logs in to a session we're
|
||||
|
||||
Reference in New Issue
Block a user