use async_trait::async_trait; use hbb_common::{log, ResultType}; use sqlx::{ sqlite::SqliteConnectOptions, ConnectOptions, Connection, Error as SqlxError, Row, SqliteConnection, }; use std::{ops::DerefMut, str::FromStr}; //use sqlx::postgres::PgPoolOptions; //use sqlx::mysql::MySqlPoolOptions; type Pool = deadpool::managed::Pool; pub struct DbPool { url: String, } #[async_trait] impl deadpool::managed::Manager for DbPool { type Type = SqliteConnection; type Error = SqlxError; async fn create(&self) -> Result { let mut opt = SqliteConnectOptions::from_str(&self.url).unwrap(); opt.log_statements(log::LevelFilter::Debug); SqliteConnection::connect_with(&opt).await } async fn recycle( &self, obj: &mut SqliteConnection, ) -> deadpool::managed::RecycleResult { Ok(obj.ping().await?) } } #[derive(Clone)] pub struct Database { pool: Pool, } #[derive(Default)] pub struct Peer { pub guid: Vec, pub id: String, pub uuid: Vec, pub pk: Vec, pub user: Option>, pub info: String, pub status: Option, } #[derive(Debug, Clone)] pub struct UserRow { pub id: i64, pub username: String, pub password_hash: String, pub display_name: String, pub email: String, pub note: String, pub avatar: String, pub status: i64, pub is_admin: bool, /// `Some(sub)` when the user was provisioned via or linked to an OIDC /// provider — they sign in via the IdP, and the dashboard treats local /// password / TOTP changes as the operator's responsibility to think /// twice about (the IdP is the source of truth for credentials). pub oidc_subject: Option, } impl UserRow { /// True iff this account was provisioned via — or has been linked to — /// an OIDC provider. Convenience for branching UI: hide local password /// changes for these users so we don't accidentally let an admin /// re-enable password sign-in (which bypasses the IdP's MFA). pub fn is_oidc_linked(&self) -> bool { self.oidc_subject .as_deref() .map(|s| !s.is_empty()) .unwrap_or(false) } } pub struct NewUser<'a> { pub username: &'a str, pub password_hash: &'a str, pub display_name: &'a str, pub is_admin: bool, } #[derive(Debug, Clone)] pub struct DeviceSysinfoRow { pub payload: String, pub sysinfo_ver_seen: String, } #[derive(Debug, Clone)] pub struct AbProfileRow { pub guid: String, pub name: String, pub owner: String, pub note: String, pub rule: i64, pub info_json: Option, } #[derive(Debug, Clone, Default)] pub struct AbPeerRow { pub id: String, pub alias: String, pub note: String, pub password: String, pub hash: String, pub username: String, pub hostname: String, pub platform: String, pub tags: Vec, } #[derive(Debug, Clone)] pub struct AbTagRow { pub name: String, pub color: i64, } #[derive(Debug, Clone)] pub struct DeviceGroupRow { pub id: i64, pub name: String, } #[derive(Debug, Clone)] pub struct AbOverviewRow { pub guid: String, pub name: String, pub kind: i64, // 0=personal, 1=shared pub owner_username: String, pub peer_count: i64, pub created_at: i64, } #[derive(Debug, Clone)] pub struct AbShareDetailRow { pub user_id: i64, pub username: String, /// 1=read, 2=read+write, 3=full (matches the desktop client's enum /// in src/hbbs_http/account.rs and the §4.3 wire contract). pub rule: i64, } #[derive(Debug, Clone)] pub struct StrategyRow { pub id: i64, pub name: String, pub modified_at: i64, pub config_options_json: String, pub extra_json: String, } #[derive(Debug, Clone)] pub struct AuditConnRow { pub guid: String, pub peer_id: String, pub conn_id: i64, pub session_id: i64, pub ip: String, pub action: String, pub note: String, pub started_at: i64, } #[derive(Debug, Clone)] pub struct AuditFileRow { pub id: i64, pub peer_id: String, pub remote_peer: String, pub direction: i64, pub path: String, pub is_file: bool, pub info_json: String, pub at: i64, } #[derive(Debug, Clone)] pub struct AuditAlarmRow { pub id: i64, pub peer_id: String, pub typ: i64, pub info_json: String, pub at: i64, } #[derive(Debug, Clone)] pub struct RecordingRow { pub filename: String, pub peer_id: String, pub size: i64, pub state: String, pub started_at: i64, pub finished_at: Option, } #[derive(Debug, Clone, Default)] pub struct DashboardDeviceRow { pub id: String, pub uuid: String, pub owner_username: String, pub last_heartbeat_at: String, pub sysinfo_payload: String, pub conns_json: String, /// Plaintext per-boot password reported by the agent for unattended /// access. Empty when the agent hasn't reported one (vanilla rustdesk /// or hello-agent that hasn't called the endpoint yet). The admin UI /// only surfaces this when the row is online AND no interactive user /// is logged in. pub unattended_password: String, pub unattended_password_set_at: String, } #[derive(Debug, Clone, Default)] pub struct PeerListRow { pub id: String, pub owner_username: String, pub owner_display_name: String, pub device_group_name: String, pub note: String, pub status: i64, /// Raw sysinfo JSON; the handler parses and emits a trimmed `info` object. pub sysinfo_payload: String, } pub struct AbPeerInsert<'a> { pub id: &'a str, pub alias: Option<&'a str>, pub note: Option<&'a str>, pub password: Option<&'a str>, pub hash: Option<&'a str>, pub username: Option<&'a str>, pub hostname: Option<&'a str>, pub platform: Option<&'a str>, } #[derive(Debug, Clone, Default)] pub struct ResolvedStrategy { pub modified_at: i64, /// JSON object map; passed straight into the heartbeat response. pub config_options_json: String, pub extra_json: String, } #[derive(Debug, Clone)] pub struct HeartbeatCommand { pub kind: String, pub payload: Option, } #[derive(Debug, Clone)] pub struct RecordingFile { pub size: i64, pub state: String, } #[derive(Debug, Clone)] pub struct OidcProviderRow { pub name: String, pub display_name: Option, pub icon_url: Option, pub issuer_url: String, pub client_id: String, pub client_secret: String, pub scopes: String, pub redirect_url: String, pub enabled: bool, /// If `Some`, every successful sign-in via this provider sets the local /// user's `is_admin` to whether the userinfo claim at `roles_claim` /// contains this role name. `None` means "don't touch is_admin" /// (existing behavior — admins are managed in the dashboard). pub admin_role: Option, /// Userinfo claim that holds the user's roles. Defaults to `"roles"` /// when `admin_role` is set but this is not. Zitadel's default format /// lives at `"urn:zitadel:iam:org:project:roles"`. pub roles_claim: Option, } pub struct OidcSessionInsert<'a> { pub code: &'a str, pub provider: &'a str, pub state: &'a str, pub client_id_str: &'a str, pub client_uuid: &'a str, pub device_info_json: &'a str, pub expires_at: i64, } #[derive(Debug, Clone)] pub struct OidcSessionRow { pub code: String, pub provider: String, pub state: String, pub client_id_str: String, pub client_uuid: String, pub device_info_json: String, pub expires_at: i64, pub status: String, pub access_token: Option, pub user_id: Option, pub error: Option, } impl Database { pub async fn new(url: &str) -> ResultType { if !std::path::Path::new(url).exists() { std::fs::File::create(url).ok(); } let n: usize = std::env::var("MAX_DATABASE_CONNECTIONS") .unwrap_or_else(|_| "1".to_owned()) .parse() .unwrap_or(1); log::debug!("MAX_DATABASE_CONNECTIONS={}", n); let pool = Pool::new( DbPool { url: url.to_owned(), }, n, ); let _ = pool.get().await?; // test let db = Database { pool }; db.create_tables().await?; Ok(db) } async fn create_tables(&self) -> ResultType<()> { sqlx::query!( " create table if not exists peer ( guid blob primary key not null, id varchar(100) not null, uuid blob not null, pk blob not null, created_at datetime not null default(current_timestamp), user blob, status tinyint, note varchar(300), info text not null ) without rowid; create unique index if not exists index_peer_id on peer (id); create index if not exists index_peer_user on peer (user); create index if not exists index_peer_created_at on peer (created_at); create index if not exists index_peer_status on peer (status); " ) .execute(self.pool.get().await?.deref_mut()) .await?; // M1 schema: users, tokens, device_sysinfo. Runtime form so first-time // builds don't require DATABASE_URL to already contain these tables. for stmt in M1_SCHEMA { sqlx::query(stmt) .execute(self.pool.get().await?.deref_mut()) .await?; } // M2 schema: address books, tags, device groups, accessibility view. for stmt in M2_SCHEMA { sqlx::query(stmt) .execute(self.pool.get().await?.deref_mut()) .await?; } // M3 schema: audit log, recordings, strategies, heartbeat commands. for stmt in M3_SCHEMA { sqlx::query(stmt) .execute(self.pool.get().await?.deref_mut()) .await?; } // M4 schema: 2FA / email-code / OIDC scaffolding. for stmt in M4_SCHEMA { sqlx::query(stmt) .execute(self.pool.get().await?.deref_mut()) .await?; } // Soft-ALTERs run after schema creation. SQLite < 3.35 lacks // `ADD COLUMN IF NOT EXISTS`; swallow the duplicate-column error // so re-runs are idempotent. Newly-added soft alters get appended // to the same list — order doesn't matter beyond "after the table // they touch exists in M*_SCHEMA". for stmt in M2_SOFT_ALTERS { self.try_alter(stmt).await; } Ok(()) } async fn try_alter(&self, sql: &str) { match sqlx::query(sql) .execute(self.pool.get().await.unwrap().deref_mut()) .await { Ok(_) => {} Err(e) => { let msg = e.to_string(); if !msg.contains("duplicate column name") { log::warn!("schema migration `{}` failed: {}", sql, msg); } } } } pub async fn count_users(&self) -> ResultType { let row = sqlx::query("SELECT COUNT(*) AS c FROM users") .fetch_one(self.pool.get().await?.deref_mut()) .await?; Ok(row.try_get::("c")?) } pub async fn warn_if_no_users(&self) { match self.count_users().await { Ok(0) => log::warn!( "users table is empty and no --bootstrap-admin-username/password supplied; \ /api/login will reject every request" ), Ok(_) => {} Err(e) => log::warn!("count_users failed: {}", e), } } pub async fn bootstrap_admin(&self, username: &str, password_plain: &str) -> ResultType<()> { if self.count_users().await? > 0 { return Ok(()); } let plain = password_plain.to_owned(); let hash = hbb_common::tokio::task::spawn_blocking(move || bcrypt::hash(plain, 10)).await??; let display = "Admin"; let is_admin: i64 = 1; let status: i64 = 1; sqlx::query( "INSERT INTO users(username, password_hash, display_name, status, is_admin) \ VALUES(?, ?, ?, ?, ?)", ) .bind(username) .bind(&hash) .bind(display) .bind(status) .bind(is_admin) .execute(self.pool.get().await?.deref_mut()) .await?; log::info!("bootstrap admin '{}' created", username); Ok(()) } pub async fn user_find_by_username(&self, username: &str) -> ResultType> { let row = sqlx::query( "SELECT id, username, password_hash, display_name, email, note, avatar, status, is_admin, oidc_subject \ FROM users WHERE username = ?", ) .bind(username) .fetch_optional(self.pool.get().await?.deref_mut()) .await?; Ok(row.map(row_to_user)) } pub async fn user_find_by_id(&self, id: i64) -> ResultType> { let row = sqlx::query( "SELECT id, username, password_hash, display_name, email, note, avatar, status, is_admin, oidc_subject \ FROM users WHERE id = ?", ) .bind(id) .fetch_optional(self.pool.get().await?.deref_mut()) .await?; Ok(row.map(row_to_user)) } /// All users, including disabled ones — distinct from /// `users_list_accessible`, which the API uses (filtering by status=1 /// and visibility through device-groups). The dashboard wants the /// full picture. /// Per-user "last seen" — the most recent `tokens.last_used_at` for /// each user, since every authenticated request bumps that column /// (api/middleware on Bearer auth, and the dashboard cookie path). /// Users with no token (never logged in, or all expired+pruned) are /// absent from the map. Returns datetime strings as SQLite stores /// them; the caller decides how to format. pub async fn users_last_seen_map( &self, ) -> ResultType> { let rows = sqlx::query( "SELECT user_id, MAX(last_used_at) AS last_seen \ FROM tokens GROUP BY user_id", ) .fetch_all(self.pool.get().await?.deref_mut()) .await?; let mut out = std::collections::HashMap::with_capacity(rows.len()); for r in rows { let id: i64 = r.try_get("user_id")?; let ts: Option = r.try_get("last_seen").ok(); if let Some(s) = ts { out.insert(id, s); } } Ok(out) } pub async fn users_list_all( &self, offset: i64, limit: i64, ) -> ResultType<(i64, Vec)> { let total: i64 = sqlx::query("SELECT COUNT(*) AS c FROM users") .fetch_one(self.pool.get().await?.deref_mut()) .await? .try_get("c")?; let rows = sqlx::query( "SELECT id, username, password_hash, display_name, email, note, avatar, status, is_admin, oidc_subject \ FROM users ORDER BY username LIMIT ? OFFSET ?", ) .bind(limit) .bind(offset) .fetch_all(self.pool.get().await?.deref_mut()) .await?; Ok((total, rows.into_iter().map(row_to_user).collect())) } pub async fn user_set_status(&self, id: i64, status: i64) -> ResultType { let res = sqlx::query("UPDATE users SET status = ?, updated_at = current_timestamp WHERE id = ?") .bind(status) .bind(id) .execute(self.pool.get().await?.deref_mut()) .await?; Ok(res.rows_affected() > 0) } pub async fn user_set_admin(&self, id: i64, is_admin: bool) -> ResultType { let res = sqlx::query( "UPDATE users SET is_admin = ?, updated_at = current_timestamp WHERE id = ?", ) .bind(if is_admin { 1i64 } else { 0i64 }) .bind(id) .execute(self.pool.get().await?.deref_mut()) .await?; Ok(res.rows_affected() > 0) } pub async fn user_set_password(&self, id: i64, hash: &str) -> ResultType { let res = sqlx::query( "UPDATE users SET password_hash = ?, updated_at = current_timestamp WHERE id = ?", ) .bind(hash) .bind(id) .execute(self.pool.get().await?.deref_mut()) .await?; Ok(res.rows_affected() > 0) } /// Deletes the user row. Cascade hits `tokens` (FK ON DELETE CASCADE) /// — TOTP secrets and AB ownership are best-effort cleaned by separate /// queries below. pub async fn user_delete(&self, id: i64) -> ResultType { let _ = sqlx::query("DELETE FROM user_totp_secrets WHERE user_id = ?") .bind(id) .execute(self.pool.get().await?.deref_mut()) .await; let _ = sqlx::query("DELETE FROM device_group_members WHERE user_id = ?") .bind(id) .execute(self.pool.get().await?.deref_mut()) .await; let _ = sqlx::query("DELETE FROM address_book_shares WHERE user_id = ?") .bind(id) .execute(self.pool.get().await?.deref_mut()) .await; let res = sqlx::query("DELETE FROM users WHERE id = ?") .bind(id) .execute(self.pool.get().await?.deref_mut()) .await?; Ok(res.rows_affected() > 0) } /// Drop a device by `peer_id`. Removes the dashboard-visible row /// (`device_sysinfo`), the rendezvous-side identity (`peer` — so a /// stale public key doesn't reject a fresh reinstall under the same /// ID), and any pending peer-scoped operational state /// (`heartbeat_commands`, peer-scoped `strategy_assignments`). /// Audit rows, recordings, and address-book entries are intentionally /// preserved — they're historical/manual data the operator may still /// want even after the device is gone. Returns `true` iff a /// `device_sysinfo` row actually existed. pub async fn device_delete(&self, peer_id: &str) -> ResultType { let _ = sqlx::query("DELETE FROM heartbeat_commands WHERE peer_id = ?") .bind(peer_id) .execute(self.pool.get().await?.deref_mut()) .await; let _ = sqlx::query("DELETE FROM strategy_assignments WHERE peer_id = ?") .bind(peer_id) .execute(self.pool.get().await?.deref_mut()) .await; let _ = sqlx::query("DELETE FROM peer WHERE id = ?") .bind(peer_id) .execute(self.pool.get().await?.deref_mut()) .await; let res = sqlx::query("DELETE FROM device_sysinfo WHERE id = ?") .bind(peer_id) .execute(self.pool.get().await?.deref_mut()) .await?; Ok(res.rows_affected() > 0) } /// Devices listed for the dashboard. Returns each row of device_sysinfo /// joined to its owner's username, sorted by recency. pub async fn devices_list_all( &self, offset: i64, limit: i64, ) -> ResultType<(i64, Vec)> { let total: i64 = sqlx::query("SELECT COUNT(*) AS c FROM device_sysinfo") .fetch_one(self.pool.get().await?.deref_mut()) .await? .try_get("c")?; let rows = sqlx::query( "SELECT ds.id AS pid, ds.uuid AS puuid, \ COALESCE(u.username, '') AS owner_username, \ ds.last_heartbeat_at AS last_hb, \ ds.payload AS payload, \ ds.conns AS conns, \ COALESCE(ds.unattended_password, '') AS u_pw, \ COALESCE(ds.unattended_password_set_at, '') AS u_pw_at \ FROM device_sysinfo ds \ LEFT JOIN users u ON u.id = ds.user_id \ ORDER BY ds.last_heartbeat_at DESC LIMIT ? OFFSET ?", ) .bind(limit) .bind(offset) .fetch_all(self.pool.get().await?.deref_mut()) .await?; let data = rows .into_iter() .map(|r| DashboardDeviceRow { id: r.try_get("pid").unwrap_or_default(), uuid: r.try_get("puuid").unwrap_or_default(), owner_username: r.try_get("owner_username").unwrap_or_default(), last_heartbeat_at: r.try_get("last_hb").unwrap_or_default(), sysinfo_payload: r.try_get("payload").unwrap_or_default(), conns_json: r.try_get("conns").unwrap_or_default(), unattended_password: r.try_get("u_pw").unwrap_or_default(), unattended_password_set_at: r.try_get("u_pw_at").unwrap_or_default(), }) .collect(); Ok((total, data)) } pub async fn device_sysinfo_get_conns(&self, peer_id: &str) -> ResultType { let row = sqlx::query("SELECT conns FROM device_sysinfo WHERE id = ? LIMIT 1") .bind(peer_id) .fetch_optional(self.pool.get().await?.deref_mut()) .await?; Ok(row .and_then(|r| r.try_get::("conns").ok()) .unwrap_or_else(|| "[]".to_string())) } pub async fn heartbeat_command_queue( &self, peer_id: &str, kind: &str, payload: Option<&str>, ) -> ResultType<()> { sqlx::query( "INSERT OR REPLACE INTO heartbeat_commands(peer_id, kind, payload, created_at) \ VALUES(?, ?, ?, strftime('%s','now'))", ) .bind(peer_id) .bind(kind) .bind(payload) .execute(self.pool.get().await?.deref_mut()) .await?; Ok(()) } /// All address books, with the owner's username and an optional /// per-AB peer count. Used by the dashboard's read-only AB overview. pub async fn ab_list_all_with_owner(&self) -> ResultType> { let rows = sqlx::query( "SELECT ab.guid, ab.name, ab.kind, ab.created_at, \ COALESCE(u.username, '') AS owner_username, \ (SELECT COUNT(*) FROM address_book_peers abp WHERE abp.ab_guid = ab.guid) AS peer_count \ FROM address_books ab LEFT JOIN users u ON u.id = ab.owner_user_id \ ORDER BY ab.kind, owner_username, ab.name", ) .fetch_all(self.pool.get().await?.deref_mut()) .await?; Ok(rows .into_iter() .map(|r| AbOverviewRow { guid: r.try_get("guid").unwrap_or_default(), name: r.try_get("name").unwrap_or_default(), kind: r.try_get("kind").unwrap_or(0), owner_username: r.try_get("owner_username").unwrap_or_default(), peer_count: r.try_get("peer_count").unwrap_or(0), created_at: r.try_get("created_at").unwrap_or(0), }) .collect()) } // ---- M5 dashboard helpers: groups / strategies / audit / recordings ---- pub async fn device_groups_list_all(&self) -> ResultType> { let rows = sqlx::query("SELECT id, name FROM device_groups ORDER BY name") .fetch_all(self.pool.get().await?.deref_mut()) .await?; Ok(rows .into_iter() .map(|r| DeviceGroupRow { id: r.try_get("id").unwrap_or(0), name: r.try_get("name").unwrap_or_default(), }) .collect()) } pub async fn device_group_members(&self, group_id: i64) -> ResultType> { let rows = sqlx::query( "SELECT u.id, u.username, u.password_hash, u.display_name, u.email, u.note, u.avatar, u.status, u.is_admin \ FROM users u JOIN device_group_members m ON m.user_id = u.id \ WHERE m.device_group_id = ? ORDER BY u.username", ) .bind(group_id) .fetch_all(self.pool.get().await?.deref_mut()) .await?; Ok(rows.into_iter().map(row_to_user).collect()) } pub async fn device_group_create(&self, name: &str) -> ResultType { sqlx::query("INSERT OR IGNORE INTO device_groups(name) VALUES(?)") .bind(name) .execute(self.pool.get().await?.deref_mut()) .await?; let row = sqlx::query("SELECT id FROM device_groups WHERE name = ?") .bind(name) .fetch_one(self.pool.get().await?.deref_mut()) .await?; Ok(row.try_get("id")?) } pub async fn device_group_delete(&self, group_id: i64) -> ResultType { let _ = sqlx::query("DELETE FROM device_group_members WHERE device_group_id = ?") .bind(group_id) .execute(self.pool.get().await?.deref_mut()) .await; let res = sqlx::query("DELETE FROM device_groups WHERE id = ?") .bind(group_id) .execute(self.pool.get().await?.deref_mut()) .await?; Ok(res.rows_affected() > 0) } pub async fn device_group_add_member( &self, group_id: i64, user_id: i64, ) -> ResultType<()> { sqlx::query( "INSERT OR IGNORE INTO device_group_members(device_group_id, user_id) VALUES(?, ?)", ) .bind(group_id) .bind(user_id) .execute(self.pool.get().await?.deref_mut()) .await?; Ok(()) } pub async fn device_group_remove_member( &self, group_id: i64, user_id: i64, ) -> ResultType<()> { sqlx::query( "DELETE FROM device_group_members WHERE device_group_id = ? AND user_id = ?", ) .bind(group_id) .bind(user_id) .execute(self.pool.get().await?.deref_mut()) .await?; Ok(()) } pub async fn strategies_list_all(&self) -> ResultType> { let rows = sqlx::query( "SELECT id, name, modified_at, config_options_json, extra_json \ FROM strategies ORDER BY name", ) .fetch_all(self.pool.get().await?.deref_mut()) .await?; Ok(rows .into_iter() .map(|r| StrategyRow { id: r.try_get("id").unwrap_or(0), name: r.try_get("name").unwrap_or_default(), modified_at: r.try_get("modified_at").unwrap_or(0), config_options_json: r .try_get("config_options_json") .unwrap_or_else(|_| "{}".to_string()), extra_json: r.try_get("extra_json").unwrap_or_else(|_| "{}".to_string()), }) .collect()) } pub async fn strategy_create( &self, name: &str, config_options_json: &str, extra_json: &str, ) -> ResultType { let res = sqlx::query( "INSERT INTO strategies(name, modified_at, config_options_json, extra_json) \ VALUES(?, strftime('%s','now'), ?, ?)", ) .bind(name) .bind(config_options_json) .bind(extra_json) .execute(self.pool.get().await?.deref_mut()) .await?; Ok(res.last_insert_rowid()) } pub async fn strategy_update_config( &self, id: i64, config_options_json: &str, ) -> ResultType<()> { sqlx::query( "UPDATE strategies SET config_options_json = ?, modified_at = strftime('%s','now') \ WHERE id = ?", ) .bind(config_options_json) .bind(id) .execute(self.pool.get().await?.deref_mut()) .await?; Ok(()) } pub async fn strategy_delete(&self, id: i64) -> ResultType { let _ = sqlx::query("DELETE FROM strategy_assignments WHERE strategy_id = ?") .bind(id) .execute(self.pool.get().await?.deref_mut()) .await; let res = sqlx::query("DELETE FROM strategies WHERE id = ?") .bind(id) .execute(self.pool.get().await?.deref_mut()) .await?; Ok(res.rows_affected() > 0) } /// Audit listings (newest first) — used by the dashboard browser. Each /// returns at most `limit` rows; the dashboard caps at a few hundred. pub async fn audit_conn_list(&self, limit: i64) -> ResultType> { let rows = sqlx::query( "SELECT guid, peer_id, conn_id, session_id, ip, action, note, started_at \ FROM audit_conn ORDER BY started_at DESC LIMIT ?", ) .bind(limit) .fetch_all(self.pool.get().await?.deref_mut()) .await?; Ok(rows .into_iter() .map(|r| AuditConnRow { guid: r.try_get("guid").unwrap_or_default(), peer_id: r.try_get("peer_id").unwrap_or_default(), conn_id: r.try_get("conn_id").unwrap_or(0), session_id: r.try_get("session_id").unwrap_or(0), ip: r.try_get::, _>("ip").ok().flatten().unwrap_or_default(), action: r.try_get("action").unwrap_or_default(), note: r.try_get::, _>("note").ok().flatten().unwrap_or_default(), started_at: r.try_get("started_at").unwrap_or(0), }) .collect()) } pub async fn audit_file_list(&self, limit: i64) -> ResultType> { let rows = sqlx::query( "SELECT id, peer_id, remote_peer, direction, path, is_file, info_json, at \ FROM audit_file ORDER BY at DESC LIMIT ?", ) .bind(limit) .fetch_all(self.pool.get().await?.deref_mut()) .await?; Ok(rows .into_iter() .map(|r| AuditFileRow { id: r.try_get("id").unwrap_or(0), peer_id: r.try_get("peer_id").unwrap_or_default(), remote_peer: r.try_get::, _>("remote_peer").ok().flatten().unwrap_or_default(), direction: r.try_get("direction").unwrap_or(0), path: r.try_get("path").unwrap_or_default(), is_file: r.try_get::("is_file").unwrap_or(0) != 0, info_json: r.try_get("info_json").unwrap_or_default(), at: r.try_get("at").unwrap_or(0), }) .collect()) } pub async fn audit_alarm_list(&self, limit: i64) -> ResultType> { let rows = sqlx::query( "SELECT id, peer_id, typ, info_json, at \ FROM audit_alarm ORDER BY at DESC LIMIT ?", ) .bind(limit) .fetch_all(self.pool.get().await?.deref_mut()) .await?; Ok(rows .into_iter() .map(|r| AuditAlarmRow { id: r.try_get("id").unwrap_or(0), peer_id: r.try_get("peer_id").unwrap_or_default(), typ: r.try_get("typ").unwrap_or(0), info_json: r.try_get("info_json").unwrap_or_default(), at: r.try_get("at").unwrap_or(0), }) .collect()) } pub async fn recordings_list(&self, limit: i64) -> ResultType> { let rows = sqlx::query( "SELECT filename, peer_id, size, state, started_at, finished_at \ FROM recordings ORDER BY started_at DESC LIMIT ?", ) .bind(limit) .fetch_all(self.pool.get().await?.deref_mut()) .await?; Ok(rows .into_iter() .map(|r| RecordingRow { filename: r.try_get("filename").unwrap_or_default(), peer_id: r.try_get("peer_id").unwrap_or_default(), size: r.try_get("size").unwrap_or(0), state: r.try_get("state").unwrap_or_default(), started_at: r.try_get("started_at").unwrap_or(0), finished_at: r.try_get::, _>("finished_at").ok().flatten(), }) .collect()) } pub async fn raw_update_user_email(&self, user_id: i64, email: &str) -> ResultType<()> { sqlx::query("UPDATE users SET email = ?, updated_at = current_timestamp WHERE id = ?") .bind(email) .bind(user_id) .execute(self.pool.get().await?.deref_mut()) .await?; Ok(()) } pub async fn user_set_display_name( &self, user_id: i64, display_name: &str, ) -> ResultType<()> { sqlx::query( "UPDATE users SET display_name = ?, updated_at = current_timestamp WHERE id = ?", ) .bind(display_name) .bind(user_id) .execute(self.pool.get().await?.deref_mut()) .await?; Ok(()) } pub async fn user_has_totp(&self, user_id: i64) -> ResultType { let row = sqlx::query("SELECT 1 AS ok FROM user_totp_secrets WHERE user_id = ?") .bind(user_id) .fetch_optional(self.pool.get().await?.deref_mut()) .await?; Ok(row.is_some()) } pub async fn user_insert(&self, u: NewUser<'_>) -> ResultType { let admin_int: i64 = if u.is_admin { 1 } else { 0 }; let res = sqlx::query( "INSERT INTO users(username, password_hash, display_name, is_admin, status) \ VALUES(?, ?, ?, ?, 1)", ) .bind(u.username) .bind(u.password_hash) .bind(u.display_name) .bind(admin_int) .execute(self.pool.get().await?.deref_mut()) .await?; Ok(res.last_insert_rowid()) } pub async fn token_insert( &self, user_id: i64, sha: &[u8], peer_id: &str, peer_uuid: &str, device_info: &str, ttl_secs: i64, ) -> ResultType<()> { let expires_at = chrono::Utc::now() + chrono::Duration::seconds(ttl_secs); sqlx::query( "INSERT INTO tokens(user_id, token_sha256, peer_id, peer_uuid, device_info, expires_at) \ VALUES(?, ?, ?, ?, ?, ?)", ) .bind(user_id) .bind(sha) .bind(peer_id) .bind(peer_uuid) .bind(device_info) .bind(expires_at) .execute(self.pool.get().await?.deref_mut()) .await?; Ok(()) } /// Return (user_id, expires_at_unix) for a token still valid at `now`. pub async fn token_lookup(&self, sha: &[u8]) -> ResultType> { let row = sqlx::query( "SELECT user_id, strftime('%s', expires_at) AS exp \ FROM tokens WHERE token_sha256 = ?", ) .bind(sha) .fetch_optional(self.pool.get().await?.deref_mut()) .await?; let Some(row) = row else { return Ok(None) }; let user_id: i64 = row.try_get("user_id")?; let exp_str: String = row.try_get("exp")?; let exp: i64 = exp_str.parse().unwrap_or(0); if exp <= chrono::Utc::now().timestamp() { return Ok(None); } Ok(Some((user_id, exp))) } pub async fn token_touch(&self, sha: &[u8], ttl_secs: i64) -> ResultType<()> { let expires_at = chrono::Utc::now() + chrono::Duration::seconds(ttl_secs); sqlx::query( "UPDATE tokens SET last_used_at = current_timestamp, expires_at = ? \ WHERE token_sha256 = ?", ) .bind(expires_at) .bind(sha) .execute(self.pool.get().await?.deref_mut()) .await?; Ok(()) } pub async fn token_delete(&self, sha: &[u8]) -> ResultType<()> { sqlx::query("DELETE FROM tokens WHERE token_sha256 = ?") .bind(sha) .execute(self.pool.get().await?.deref_mut()) .await?; Ok(()) } pub async fn token_purge_expired(&self) -> ResultType { let res = sqlx::query("DELETE FROM tokens WHERE expires_at <= current_timestamp") .execute(self.pool.get().await?.deref_mut()) .await?; Ok(res.rows_affected()) } /// Update last_heartbeat_at for a device, inserting the row if missing. /// Returns true when the cached `sysinfo_ver_seen` differs from `cfg_ver`, /// signaling the client to re-upload sysinfo. pub async fn sysinfo_heartbeat( &self, id: &str, uuid: &str, version: i64, conns_json: &str, cfg_ver: &str, ) -> ResultType { let existing = sqlx::query( "SELECT sysinfo_ver_seen FROM device_sysinfo WHERE id = ? AND uuid = ?", ) .bind(id) .bind(uuid) .fetch_optional(self.pool.get().await?.deref_mut()) .await?; match existing { Some(row) => { let seen: String = row.try_get("sysinfo_ver_seen")?; sqlx::query( "UPDATE device_sysinfo SET version = ?, conns = ?, \ last_heartbeat_at = current_timestamp, last_seen_at = current_timestamp \ WHERE id = ? AND uuid = ?", ) .bind(version) .bind(conns_json) .bind(id) .bind(uuid) .execute(self.pool.get().await?.deref_mut()) .await?; Ok(seen != cfg_ver) } None => { sqlx::query( "INSERT INTO device_sysinfo(id, uuid, version, conns) VALUES(?, ?, ?, ?)", ) .bind(id) .bind(uuid) .bind(version) .bind(conns_json) .execute(self.pool.get().await?.deref_mut()) .await?; Ok(true) } } } /// Persist the full sysinfo payload and mark the device as having acked /// the current `cfg_ver`. Caller must check that a `peer` row exists for /// `id` before calling — see api::sysinfo for the ID_NOT_FOUND case. pub async fn sysinfo_upsert( &self, id: &str, uuid: &str, payload: &str, cfg_ver: &str, version: i64, ) -> ResultType<()> { sqlx::query( "INSERT INTO device_sysinfo(id, uuid, payload, sysinfo_ver_seen, version, updated_at) \ VALUES(?, ?, ?, ?, ?, current_timestamp) \ ON CONFLICT(id, uuid) DO UPDATE SET \ payload = excluded.payload, \ sysinfo_ver_seen = excluded.sysinfo_ver_seen, \ version = excluded.version, \ updated_at = current_timestamp", ) .bind(id) .bind(uuid) .bind(payload) .bind(cfg_ver) .bind(version) .execute(self.pool.get().await?.deref_mut()) .await?; Ok(()) } /// Store the agent's per-boot unattended-access password. Upserts so a /// device that's never sysinfo'd yet still gets a `device_sysinfo` row /// to hang the password on. Caller is expected to have validated the /// (id, uuid) pair against `peer` first — same gate as sysinfo_upsert. pub async fn set_unattended_password( &self, id: &str, uuid: &str, password: &str, ) -> ResultType<()> { sqlx::query( "INSERT INTO device_sysinfo(id, uuid, unattended_password, unattended_password_set_at) \ VALUES(?, ?, ?, current_timestamp) \ ON CONFLICT(id, uuid) DO UPDATE SET \ unattended_password = excluded.unattended_password, \ unattended_password_set_at = current_timestamp", ) .bind(id) .bind(uuid) .bind(password) .execute(self.pool.get().await?.deref_mut()) .await?; Ok(()) } // =================================================================== // M2: address book / tags / device groups / accessibility // =================================================================== /// Bind a device (peer_id, peer_uuid) to a user. Upserts so the binding /// sticks even when the device hasn't sysinfo'd yet — `--assign` from /// fresh installs is a real flow. Subsequent `sysinfo_heartbeat` calls /// then UPDATE the existing row and preserve `user_id`. pub async fn device_claim(&self, user_id: i64, peer_id: &str, peer_uuid: &str) { if peer_id.is_empty() || peer_uuid.is_empty() { return; } let res = sqlx::query( "INSERT INTO device_sysinfo(id, uuid, user_id) VALUES(?, ?, ?) \ ON CONFLICT(id, uuid) DO UPDATE SET user_id = excluded.user_id", ) .bind(peer_id) .bind(peer_uuid) .bind(user_id) .execute(self.pool.get().await.unwrap().deref_mut()) .await; if let Err(e) = res { log::warn!("device_claim failed: {}", e); } } /// Create a new shared (kind=1) address book owned by `owner_user_id`. /// The unique index on (owner_user_id, kind, name) means a duplicate /// name from the same owner surfaces as a SQL error — caller should /// translate to a friendly notice. pub async fn ab_create_shared( &self, owner_user_id: i64, name: &str, ) -> ResultType { let guid = uuid::Uuid::new_v4().to_string(); sqlx::query( "INSERT INTO address_books(guid, owner_user_id, name, kind) VALUES(?, ?, ?, 1)", ) .bind(&guid) .bind(owner_user_id) .bind(name) .execute(self.pool.get().await?.deref_mut()) .await?; Ok(guid) } /// Drop an address book and every dependent row. Used by the dashboard /// to remove shared books; personal books are protected at the handler /// layer (the desktop client owns those). pub async fn ab_delete(&self, guid: &str) -> ResultType { let _ = sqlx::query("DELETE FROM address_book_peer_tags WHERE ab_guid = ?") .bind(guid) .execute(self.pool.get().await?.deref_mut()) .await; let _ = sqlx::query("DELETE FROM address_book_tags WHERE ab_guid = ?") .bind(guid) .execute(self.pool.get().await?.deref_mut()) .await; let _ = sqlx::query("DELETE FROM address_book_peers WHERE ab_guid = ?") .bind(guid) .execute(self.pool.get().await?.deref_mut()) .await; let _ = sqlx::query("DELETE FROM address_book_shares WHERE ab_guid = ?") .bind(guid) .execute(self.pool.get().await?.deref_mut()) .await; let res = sqlx::query("DELETE FROM address_books WHERE guid = ?") .bind(guid) .execute(self.pool.get().await?.deref_mut()) .await?; Ok(res.rows_affected() > 0) } /// Returns (owner_user_id, kind) for a book, or None if unknown. /// Handlers use this to check ownership and to refuse mutations on /// personal (kind=0) books. pub async fn ab_get_owner_kind(&self, guid: &str) -> ResultType> { let row = sqlx::query("SELECT owner_user_id, kind FROM address_books WHERE guid = ? LIMIT 1") .bind(guid) .fetch_optional(self.pool.get().await?.deref_mut()) .await?; Ok(row.map(|r| { let owner: i64 = r.try_get("owner_user_id").unwrap_or(0); let kind: i64 = r.try_get("kind").unwrap_or(0); (owner, kind) })) } /// Per-user shares attached to a book (group shares aren't surfaced in /// the dashboard yet — operators can use device-groups when they want /// many users at once and assign by group via SQL; the common case is /// per-user). Returns rows of (user_id, username, rule) for direct /// `address_book_shares.user_id IS NOT NULL` rows. pub async fn ab_list_shares(&self, guid: &str) -> ResultType> { let rows = sqlx::query( "SELECT s.user_id, COALESCE(u.username,'') AS username, s.rule \ FROM address_book_shares s LEFT JOIN users u ON u.id = s.user_id \ WHERE s.ab_guid = ? AND s.user_id IS NOT NULL \ ORDER BY u.username", ) .bind(guid) .fetch_all(self.pool.get().await?.deref_mut()) .await?; Ok(rows .into_iter() .map(|r| AbShareDetailRow { user_id: r.try_get("user_id").unwrap_or(0), username: r.try_get("username").unwrap_or_default(), rule: r.try_get("rule").unwrap_or(1), }) .collect()) } /// Idempotent upsert of a per-user share. Replaces an existing rule /// row for the same (ab, user) pair so admins can promote/demote /// without having to remove first. pub async fn ab_share_set( &self, guid: &str, user_id: i64, rule: i64, ) -> ResultType<()> { let _ = sqlx::query( "DELETE FROM address_book_shares WHERE ab_guid = ? AND user_id = ?", ) .bind(guid) .bind(user_id) .execute(self.pool.get().await?.deref_mut()) .await; sqlx::query( "INSERT INTO address_book_shares(ab_guid, user_id, rule) VALUES(?, ?, ?)", ) .bind(guid) .bind(user_id) .bind(rule) .execute(self.pool.get().await?.deref_mut()) .await?; Ok(()) } pub async fn ab_share_remove(&self, guid: &str, user_id: i64) -> ResultType { let res = sqlx::query("DELETE FROM address_book_shares WHERE ab_guid = ? AND user_id = ?") .bind(guid) .bind(user_id) .execute(self.pool.get().await?.deref_mut()) .await?; Ok(res.rows_affected() > 0) } /// Look up the personal AB for a user, creating it if missing. pub async fn ab_get_or_create_personal(&self, user_id: i64) -> ResultType { let row = sqlx::query("SELECT guid FROM address_books WHERE owner_user_id = ? AND kind = 0") .bind(user_id) .fetch_optional(self.pool.get().await?.deref_mut()) .await?; if let Some(r) = row { return Ok(r.try_get::("guid")?); } let guid = uuid::Uuid::new_v4().to_string(); sqlx::query( "INSERT INTO address_books(guid, owner_user_id, name, kind) VALUES(?, ?, ?, 0)", ) .bind(&guid) .bind(user_id) .bind("My address book") .execute(self.pool.get().await?.deref_mut()) .await?; Ok(guid) } /// Resolve the maximum effective rule for `user_id` against `ab_guid`. /// Returns 3 (Full) for the owner, the largest matching rule across /// direct user shares and device-group shares, or None if no access. pub async fn ab_resolve_rule(&self, user_id: i64, ab_guid: &str) -> ResultType> { // Owner check first. let row = sqlx::query("SELECT owner_user_id FROM address_books WHERE guid = ?") .bind(ab_guid) .fetch_optional(self.pool.get().await?.deref_mut()) .await?; let Some(row) = row else { return Ok(None) }; let owner: i64 = row.try_get("owner_user_id")?; if owner == user_id { return Ok(Some(3)); } // Direct or via device-group. let row = sqlx::query( "SELECT MAX(rule) AS r FROM address_book_shares \ WHERE ab_guid = ? AND ( \ user_id = ? OR \ group_id IN (SELECT device_group_id FROM device_group_members WHERE user_id = ?) \ )", ) .bind(ab_guid) .bind(user_id) .bind(user_id) .fetch_one(self.pool.get().await?.deref_mut()) .await?; let rule: Option = row.try_get("r").ok(); Ok(rule) } /// List address books shared with `user_id` (excludes their personal AB). /// Returns (total, page). pub async fn ab_list_shared_for_user( &self, user_id: i64, offset: i64, limit: i64, ) -> ResultType<(i64, Vec)> { // total let total_row = sqlx::query( "SELECT COUNT(DISTINCT ab.guid) AS c \ FROM address_books ab \ JOIN address_book_shares s ON s.ab_guid = ab.guid \ WHERE ab.kind = 1 AND ( \ s.user_id = ? OR \ s.group_id IN (SELECT device_group_id FROM device_group_members WHERE user_id = ?) \ )", ) .bind(user_id) .bind(user_id) .fetch_one(self.pool.get().await?.deref_mut()) .await?; let total: i64 = total_row.try_get("c")?; // page let rows = sqlx::query( "SELECT ab.guid, ab.name, ab.note, ab.info_json, MAX(s.rule) AS rule, \ COALESCE(u.username, '') AS owner \ FROM address_books ab \ JOIN address_book_shares s ON s.ab_guid = ab.guid \ LEFT JOIN users u ON u.id = ab.owner_user_id \ WHERE ab.kind = 1 AND ( \ s.user_id = ? OR \ s.group_id IN (SELECT device_group_id FROM device_group_members WHERE user_id = ?) \ ) \ GROUP BY ab.guid \ ORDER BY ab.name \ LIMIT ? OFFSET ?", ) .bind(user_id) .bind(user_id) .bind(limit) .bind(offset) .fetch_all(self.pool.get().await?.deref_mut()) .await?; let data = rows .into_iter() .map(|r| AbProfileRow { guid: r.try_get("guid").unwrap_or_default(), name: r.try_get("name").unwrap_or_default(), owner: r.try_get("owner").unwrap_or_default(), note: r.try_get::, _>("note").unwrap_or_default().unwrap_or_default(), rule: r.try_get("rule").unwrap_or(1), info_json: r.try_get::, _>("info_json").ok().flatten(), }) .collect(); Ok((total, data)) } /// Page through peers in an address book. Tags are not filled in yet — /// callers loop over peers and call `ab_peer_tags`. pub async fn ab_list_peers( &self, ab_guid: &str, offset: i64, limit: i64, ) -> ResultType<(i64, Vec)> { let total_row = sqlx::query("SELECT COUNT(*) AS c FROM address_book_peers WHERE ab_guid = ?") .bind(ab_guid) .fetch_one(self.pool.get().await?.deref_mut()) .await?; let total: i64 = total_row.try_get("c")?; let rows = sqlx::query( "SELECT peer_id, alias, note, password, hash, username, hostname, platform \ FROM address_book_peers WHERE ab_guid = ? \ ORDER BY peer_id \ LIMIT ? OFFSET ?", ) .bind(ab_guid) .bind(limit) .bind(offset) .fetch_all(self.pool.get().await?.deref_mut()) .await?; let mut peers: Vec = rows .into_iter() .map(|r| AbPeerRow { id: r.try_get("peer_id").unwrap_or_default(), alias: r.try_get::, _>("alias").unwrap_or_default().unwrap_or_default(), note: r.try_get::, _>("note").unwrap_or_default().unwrap_or_default(), password: r.try_get::, _>("password").unwrap_or_default().unwrap_or_default(), hash: r.try_get::, _>("hash").unwrap_or_default().unwrap_or_default(), username: r.try_get::, _>("username").unwrap_or_default().unwrap_or_default(), hostname: r.try_get::, _>("hostname").unwrap_or_default().unwrap_or_default(), platform: r.try_get::, _>("platform").unwrap_or_default().unwrap_or_default(), tags: vec![], }) .collect(); // Fill in tags. One-shot bulk fetch keeps it O(1) round-trip per page. if !peers.is_empty() { let tag_rows = sqlx::query( "SELECT peer_id, tag_name FROM address_book_peer_tags WHERE ab_guid = ?", ) .bind(ab_guid) .fetch_all(self.pool.get().await?.deref_mut()) .await?; let mut by_peer: std::collections::HashMap> = std::collections::HashMap::new(); for row in tag_rows { let pid: String = row.try_get("peer_id").unwrap_or_default(); let tag: String = row.try_get("tag_name").unwrap_or_default(); by_peer.entry(pid).or_default().push(tag); } for p in peers.iter_mut() { if let Some(tags) = by_peer.remove(&p.id) { p.tags = tags; } } } Ok((total, peers)) } pub async fn ab_count_peers(&self, ab_guid: &str) -> ResultType { let row = sqlx::query("SELECT COUNT(*) AS c FROM address_book_peers WHERE ab_guid = ?") .bind(ab_guid) .fetch_one(self.pool.get().await?.deref_mut()) .await?; Ok(row.try_get("c")?) } pub async fn ab_list_tags(&self, ab_guid: &str) -> ResultType> { let rows = sqlx::query( "SELECT name, color FROM address_book_tags WHERE ab_guid = ? ORDER BY name", ) .bind(ab_guid) .fetch_all(self.pool.get().await?.deref_mut()) .await?; Ok(rows .into_iter() .map(|r| AbTagRow { name: r.try_get("name").unwrap_or_default(), color: r.try_get("color").unwrap_or(0), }) .collect()) } pub async fn ab_peer_insert( &self, ab_guid: &str, p: AbPeerInsert<'_>, tags: Option<&[String]>, ) -> ResultType<()> { sqlx::query( "INSERT INTO address_book_peers \ (ab_guid, peer_id, alias, note, password, hash, username, hostname, platform) \ VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?)", ) .bind(ab_guid) .bind(p.id) .bind(p.alias) .bind(p.note) .bind(p.password) .bind(p.hash) .bind(p.username) .bind(p.hostname) .bind(p.platform) .execute(self.pool.get().await?.deref_mut()) .await?; if let Some(tags) = tags { self.ab_peer_replace_tags(ab_guid, p.id, tags).await?; } Ok(()) } /// Partial peer update — only fields present in `body` are touched. pub async fn ab_peer_partial_update( &self, ab_guid: &str, peer_id: &str, body: &serde_json::Value, ) -> ResultType { // Build a dynamic SET list. Restrict to a known column set for safety. let cols = [ "alias", "note", "password", "hash", "username", "hostname", "platform", ]; let mut sets: Vec<&str> = vec![]; for c in cols.iter() { if body.get(*c).is_some() { sets.push(*c); } } if !sets.is_empty() { let setlist = sets .iter() .map(|c| format!("{} = ?", c)) .collect::>() .join(", "); let sql = format!( "UPDATE address_book_peers SET {}, updated_at = strftime('%s','now') \ WHERE ab_guid = ? AND peer_id = ?", setlist ); let mut q = sqlx::query(&sql); for c in &sets { q = q.bind(body.get(*c).and_then(|v| v.as_str()).unwrap_or("")); } q = q.bind(ab_guid).bind(peer_id); let res = q.execute(self.pool.get().await?.deref_mut()).await?; if res.rows_affected() == 0 { return Ok(false); } } // Tags update if present. if let Some(tags_v) = body.get("tags") { if let Some(arr) = tags_v.as_array() { let tags: Vec = arr .iter() .filter_map(|v| v.as_str().map(|s| s.to_string())) .collect(); self.ab_peer_replace_tags(ab_guid, peer_id, &tags).await?; } } Ok(true) } async fn ab_peer_replace_tags( &self, ab_guid: &str, peer_id: &str, tags: &[String], ) -> ResultType<()> { sqlx::query("DELETE FROM address_book_peer_tags WHERE ab_guid = ? AND peer_id = ?") .bind(ab_guid) .bind(peer_id) .execute(self.pool.get().await?.deref_mut()) .await?; for t in tags { // Ensure the tag row exists; if missing, insert with a default color // (Flutter's transparent black). Operators can fix later. sqlx::query( "INSERT OR IGNORE INTO address_book_tags(ab_guid, name, color) VALUES(?, ?, 0)", ) .bind(ab_guid) .bind(t) .execute(self.pool.get().await?.deref_mut()) .await?; sqlx::query( "INSERT OR IGNORE INTO address_book_peer_tags(ab_guid, peer_id, tag_name) \ VALUES(?, ?, ?)", ) .bind(ab_guid) .bind(peer_id) .bind(t) .execute(self.pool.get().await?.deref_mut()) .await?; } Ok(()) } pub async fn ab_peers_delete(&self, ab_guid: &str, ids: &[String]) -> ResultType { let mut total: u64 = 0; for id in ids { let res = sqlx::query( "DELETE FROM address_book_peers WHERE ab_guid = ? AND peer_id = ?", ) .bind(ab_guid) .bind(id) .execute(self.pool.get().await?.deref_mut()) .await?; total += res.rows_affected(); } Ok(total) } pub async fn ab_tag_insert(&self, ab_guid: &str, name: &str, color: i64) -> ResultType<()> { sqlx::query( "INSERT OR REPLACE INTO address_book_tags(ab_guid, name, color) VALUES(?, ?, ?)", ) .bind(ab_guid) .bind(name) .bind(color) .execute(self.pool.get().await?.deref_mut()) .await?; Ok(()) } pub async fn ab_tag_rename(&self, ab_guid: &str, old: &str, new: &str) -> ResultType<()> { // Two-step rename to keep peer_tags in sync. SQLite has no UPDATE // CASCADE so we touch both tables explicitly inside one transaction. // The deadpool guard must outlive the transaction borrow, hence the // explicit `let` binding. let mut guard = self.pool.get().await?; let conn: &mut SqliteConnection = guard.deref_mut(); let mut tx = conn.begin().await?; sqlx::query("UPDATE address_book_tags SET name = ? WHERE ab_guid = ? AND name = ?") .bind(new) .bind(ab_guid) .bind(old) .execute(&mut tx) .await?; sqlx::query( "UPDATE address_book_peer_tags SET tag_name = ? WHERE ab_guid = ? AND tag_name = ?", ) .bind(new) .bind(ab_guid) .bind(old) .execute(&mut tx) .await?; tx.commit().await?; Ok(()) } pub async fn ab_tag_update_color( &self, ab_guid: &str, name: &str, color: i64, ) -> ResultType<()> { sqlx::query( "UPDATE address_book_tags SET color = ? WHERE ab_guid = ? AND name = ?", ) .bind(color) .bind(ab_guid) .bind(name) .execute(self.pool.get().await?.deref_mut()) .await?; Ok(()) } pub async fn ab_tags_delete(&self, ab_guid: &str, names: &[String]) -> ResultType { let mut total: u64 = 0; for n in names { let res = sqlx::query("DELETE FROM address_book_tags WHERE ab_guid = ? AND name = ?") .bind(ab_guid) .bind(n) .execute(self.pool.get().await?.deref_mut()) .await?; total += res.rows_affected(); } Ok(total) } /// Replace the personal AB's contents wholesale — used by the legacy /// `POST /api/ab` endpoint. Drops all peers and tags, then re-inserts. pub async fn ab_legacy_replace( &self, ab_guid: &str, tags: &[(String, i64)], peers: &[AbPeerRow], ) -> ResultType<()> { let mut guard = self.pool.get().await?; let conn: &mut SqliteConnection = guard.deref_mut(); let mut tx = conn.begin().await?; sqlx::query("DELETE FROM address_book_peer_tags WHERE ab_guid = ?") .bind(ab_guid) .execute(&mut tx) .await?; sqlx::query("DELETE FROM address_book_peers WHERE ab_guid = ?") .bind(ab_guid) .execute(&mut tx) .await?; sqlx::query("DELETE FROM address_book_tags WHERE ab_guid = ?") .bind(ab_guid) .execute(&mut tx) .await?; for (name, color) in tags { sqlx::query( "INSERT INTO address_book_tags(ab_guid, name, color) VALUES(?, ?, ?)", ) .bind(ab_guid) .bind(name) .bind(color) .execute(&mut tx) .await?; } for p in peers { sqlx::query( "INSERT INTO address_book_peers \ (ab_guid, peer_id, alias, note, password, hash, username, hostname, platform) \ VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?)", ) .bind(ab_guid) .bind(&p.id) .bind(&p.alias) .bind(&p.note) .bind(&p.password) .bind(&p.hash) .bind(&p.username) .bind(&p.hostname) .bind(&p.platform) .execute(&mut tx) .await?; for t in &p.tags { sqlx::query( "INSERT OR IGNORE INTO address_book_peer_tags(ab_guid, peer_id, tag_name) \ VALUES(?, ?, ?)", ) .bind(ab_guid) .bind(&p.id) .bind(t) .execute(&mut tx) .await?; } } tx.commit().await?; Ok(()) } /// Device groups visible to a user (admin sees all; non-admin sees groups /// they're a member of). Returns (total, page). pub async fn groups_list_for_user( &self, user_id: i64, is_admin: bool, offset: i64, limit: i64, ) -> ResultType<(i64, Vec)> { if is_admin { let total: i64 = sqlx::query("SELECT COUNT(*) AS c FROM device_groups") .fetch_one(self.pool.get().await?.deref_mut()) .await? .try_get("c")?; let rows = sqlx::query( "SELECT id, name FROM device_groups ORDER BY name LIMIT ? OFFSET ?", ) .bind(limit) .bind(offset) .fetch_all(self.pool.get().await?.deref_mut()) .await?; return Ok(( total, rows.into_iter() .map(|r| DeviceGroupRow { id: r.try_get("id").unwrap_or(0), name: r.try_get("name").unwrap_or_default(), }) .collect(), )); } let total: i64 = sqlx::query( "SELECT COUNT(*) AS c FROM device_groups dg \ JOIN device_group_members m ON m.device_group_id = dg.id \ WHERE m.user_id = ?", ) .bind(user_id) .fetch_one(self.pool.get().await?.deref_mut()) .await? .try_get("c")?; let rows = sqlx::query( "SELECT dg.id, dg.name FROM device_groups dg \ JOIN device_group_members m ON m.device_group_id = dg.id \ WHERE m.user_id = ? \ ORDER BY dg.name LIMIT ? OFFSET ?", ) .bind(user_id) .bind(limit) .bind(offset) .fetch_all(self.pool.get().await?.deref_mut()) .await?; Ok(( total, rows.into_iter() .map(|r| DeviceGroupRow { id: r.try_get("id").unwrap_or(0), name: r.try_get("name").unwrap_or_default(), }) .collect(), )) } /// Users visible to a viewer. Admin sees all enabled users; non-admin /// sees themselves plus any user they share a device-group with. pub async fn users_list_accessible( &self, viewer_id: i64, is_admin: bool, offset: i64, limit: i64, ) -> ResultType<(i64, Vec)> { let (count_sql, list_sql): (&str, String) = if is_admin { ( "SELECT COUNT(*) AS c FROM users WHERE status = 1", "SELECT id, username, password_hash, display_name, email, note, avatar, status, is_admin, oidc_subject \ FROM users WHERE status = 1 ORDER BY username LIMIT ? OFFSET ?".to_string(), ) } else { ( "SELECT COUNT(DISTINCT u.id) AS c FROM users u WHERE u.status = 1 AND ( \ u.id = ? OR \ u.id IN ( \ SELECT m2.user_id FROM device_group_members m1 \ JOIN device_group_members m2 USING(device_group_id) \ WHERE m1.user_id = ? \ ) \ )", "SELECT DISTINCT u.id, u.username, u.password_hash, u.display_name, u.email, \ u.note, u.avatar, u.status, u.is_admin \ FROM users u WHERE u.status = 1 AND ( \ u.id = ? OR \ u.id IN ( \ SELECT m2.user_id FROM device_group_members m1 \ JOIN device_group_members m2 USING(device_group_id) \ WHERE m1.user_id = ? \ ) \ ) \ ORDER BY u.username LIMIT ? OFFSET ?".to_string(), ) }; let total: i64 = if is_admin { sqlx::query(count_sql) .fetch_one(self.pool.get().await?.deref_mut()) .await? .try_get("c")? } else { sqlx::query(count_sql) .bind(viewer_id) .bind(viewer_id) .fetch_one(self.pool.get().await?.deref_mut()) .await? .try_get("c")? }; let rows = if is_admin { sqlx::query(&list_sql) .bind(limit) .bind(offset) .fetch_all(self.pool.get().await?.deref_mut()) .await? } else { sqlx::query(&list_sql) .bind(viewer_id) .bind(viewer_id) .bind(limit) .bind(offset) .fetch_all(self.pool.get().await?.deref_mut()) .await? }; Ok((total, rows.into_iter().map(row_to_user).collect())) } /// Peers visible to a viewer. Admin sees all; non-admin sees peers they /// own plus peers owned by users they share a device-group with. pub async fn peers_list_accessible( &self, viewer_id: i64, is_admin: bool, offset: i64, limit: i64, ) -> ResultType<(i64, Vec)> { // Common select: device_sysinfo joined to its owner. We pick the // alphabetically-first device-group name as the surfaced group. let where_clause = if is_admin { "1 = 1" } else { "(ds.user_id = ? OR ds.user_id IN ( \ SELECT m2.user_id FROM device_group_members m1 \ JOIN device_group_members m2 USING(device_group_id) \ WHERE m1.user_id = ? \ ))" }; let count_sql = format!( "SELECT COUNT(*) AS c FROM device_sysinfo ds WHERE {}", where_clause ); let list_sql = format!( "SELECT ds.id AS pid, \ COALESCE(u.username, '') AS owner_username, \ COALESCE(u.display_name, '') AS owner_display_name, \ COALESCE(u.status, 1) AS owner_status, \ ds.payload AS sysinfo, \ ( SELECT dg.name FROM device_groups dg \ JOIN device_group_members mm ON mm.device_group_id = dg.id \ WHERE mm.user_id = ds.user_id ORDER BY dg.name LIMIT 1 \ ) AS device_group_name \ FROM device_sysinfo ds \ LEFT JOIN users u ON u.id = ds.user_id \ WHERE {} \ ORDER BY ds.id LIMIT ? OFFSET ?", where_clause ); let total: i64 = if is_admin { sqlx::query(&count_sql) .fetch_one(self.pool.get().await?.deref_mut()) .await? .try_get("c")? } else { sqlx::query(&count_sql) .bind(viewer_id) .bind(viewer_id) .fetch_one(self.pool.get().await?.deref_mut()) .await? .try_get("c")? }; let rows = if is_admin { sqlx::query(&list_sql) .bind(limit) .bind(offset) .fetch_all(self.pool.get().await?.deref_mut()) .await? } else { sqlx::query(&list_sql) .bind(viewer_id) .bind(viewer_id) .bind(limit) .bind(offset) .fetch_all(self.pool.get().await?.deref_mut()) .await? }; let data = rows .into_iter() .map(|r| PeerListRow { id: r.try_get("pid").unwrap_or_default(), owner_username: r.try_get("owner_username").unwrap_or_default(), owner_display_name: r.try_get("owner_display_name").unwrap_or_default(), device_group_name: r .try_get::, _>("device_group_name") .ok() .flatten() .unwrap_or_default(), note: String::new(), status: r.try_get("owner_status").unwrap_or(1), sysinfo_payload: r.try_get("sysinfo").unwrap_or_default(), }) .collect(); Ok((total, data)) } // =================================================================== // M3: audit / recordings / strategies / heartbeat commands // =================================================================== /// Insert an `audit_conn` row and return its GUID. The client treats the /// guid as opaque and passes it back later in `PUT /api/audit` to attach /// an end-of-session note. pub async fn audit_conn_insert( &self, peer_id: &str, conn_id: i64, session_id: i64, ip: &str, action: &str, ) -> ResultType { let guid = uuid::Uuid::new_v4().to_string(); sqlx::query( "INSERT INTO audit_conn(guid, peer_id, conn_id, session_id, ip, action, started_at) \ VALUES(?, ?, ?, ?, ?, ?, strftime('%s','now'))", ) .bind(&guid) .bind(peer_id) .bind(conn_id) .bind(session_id) .bind(ip) .bind(action) .execute(self.pool.get().await?.deref_mut()) .await?; Ok(guid) } pub async fn audit_conn_update_note(&self, guid: &str, note: &str) -> ResultType { let res = sqlx::query("UPDATE audit_conn SET note = ? WHERE guid = ?") .bind(note) .bind(guid) .execute(self.pool.get().await?.deref_mut()) .await?; Ok(res.rows_affected() > 0) } pub async fn audit_file_insert( &self, peer_id: &str, remote_peer: &str, direction: i64, path: &str, is_file: bool, info_json: &str, ) -> ResultType<()> { sqlx::query( "INSERT INTO audit_file(peer_id, remote_peer, direction, path, is_file, info_json) \ VALUES(?, ?, ?, ?, ?, ?)", ) .bind(peer_id) .bind(remote_peer) .bind(direction) .bind(path) .bind(if is_file { 1 } else { 0 }) .bind(info_json) .execute(self.pool.get().await?.deref_mut()) .await?; Ok(()) } pub async fn audit_alarm_insert( &self, peer_id: &str, typ: i64, info_json: &str, ) -> ResultType<()> { sqlx::query( "INSERT INTO audit_alarm(peer_id, typ, info_json) VALUES(?, ?, ?)", ) .bind(peer_id) .bind(typ) .bind(info_json) .execute(self.pool.get().await?.deref_mut()) .await?; Ok(()) } /// Bulk delete audit rows older than `days` days. Returns number deleted /// across the three tables. Used by the optional retention sweep. pub async fn audit_purge_older_than(&self, days: i64) -> ResultType { if days <= 0 { return Ok(0); } let cutoff = chrono::Utc::now().timestamp() - days * 86400; let mut total: u64 = 0; for sql in [ "DELETE FROM audit_conn WHERE started_at < ?", "DELETE FROM audit_file WHERE at < ?", "DELETE FROM audit_alarm WHERE at < ?", ] { let res = sqlx::query(sql) .bind(cutoff) .execute(self.pool.get().await?.deref_mut()) .await?; total += res.rows_affected(); } Ok(total) } // ----- Recordings (DB rows; on-disk I/O lives in api::record::storage) ----- pub async fn recording_new(&self, peer_id: &str, filename: &str) -> ResultType<()> { sqlx::query( "INSERT INTO recordings(filename, peer_id, size, state) \ VALUES(?, ?, 0, 'new') \ ON CONFLICT(filename) DO UPDATE SET \ peer_id = excluded.peer_id, \ size = 0, state = 'new', \ started_at = strftime('%s','now'), \ finished_at = NULL", ) .bind(filename) .bind(peer_id) .execute(self.pool.get().await?.deref_mut()) .await?; Ok(()) } pub async fn recording_set_state( &self, filename: &str, state: &str, size: Option, finished: bool, ) -> ResultType<()> { if finished { sqlx::query( "UPDATE recordings SET state = ?, size = COALESCE(?, size), \ finished_at = strftime('%s','now') WHERE filename = ?", ) .bind(state) .bind(size) .bind(filename) .execute(self.pool.get().await?.deref_mut()) .await?; } else { sqlx::query( "UPDATE recordings SET state = ?, size = COALESCE(?, size) WHERE filename = ?", ) .bind(state) .bind(size) .bind(filename) .execute(self.pool.get().await?.deref_mut()) .await?; } Ok(()) } pub async fn recording_get(&self, filename: &str) -> ResultType> { let row = sqlx::query("SELECT size, state FROM recordings WHERE filename = ?") .bind(filename) .fetch_optional(self.pool.get().await?.deref_mut()) .await?; Ok(row.map(|r| RecordingFile { size: r.try_get("size").unwrap_or(0), state: r.try_get("state").unwrap_or_default(), })) } pub async fn recording_delete(&self, filename: &str) -> ResultType<()> { sqlx::query("DELETE FROM recordings WHERE filename = ?") .bind(filename) .execute(self.pool.get().await?.deref_mut()) .await?; Ok(()) } // ----- Strategy resolver ----- /// Resolve the strategy for a peer. Priority order: direct peer /// assignment > device-group assignment (via the peer's owner) > user /// assignment. Returns the strategy with the largest `priority` within /// the highest-priority tier. If nothing matches, returns the row's /// `Default`, which the heartbeat handler treats as "no strategy". pub async fn strategy_resolve_for(&self, peer_id: &str) -> ResultType { // First try a direct peer assignment. if let Some(s) = self .strategy_lookup( "SELECT s.modified_at, s.config_options_json, s.extra_json \ FROM strategies s \ JOIN strategy_assignments sa ON sa.strategy_id = s.id \ WHERE sa.peer_id = ? \ ORDER BY sa.priority DESC LIMIT 1", &[peer_id], ) .await? { return Ok(s); } // Look up the device's owner; without an owner there's nothing to // join on, so we stop here. let owner = sqlx::query( "SELECT user_id FROM device_sysinfo WHERE id = ? AND user_id IS NOT NULL LIMIT 1", ) .bind(peer_id) .fetch_optional(self.pool.get().await?.deref_mut()) .await?; let Some(owner_row) = owner else { return Ok(ResolvedStrategy::default()); }; let owner_id: i64 = owner_row.try_get("user_id")?; let owner_id_str = owner_id.to_string(); // Device-group assignment: any strategy assigned to a group that the // owner is a member of. if let Some(s) = self .strategy_lookup( "SELECT s.modified_at, s.config_options_json, s.extra_json \ FROM strategies s \ JOIN strategy_assignments sa ON sa.strategy_id = s.id \ WHERE sa.device_group_id IN ( \ SELECT device_group_id FROM device_group_members WHERE user_id = ? \ ) \ ORDER BY sa.priority DESC LIMIT 1", &[&owner_id_str], ) .await? { return Ok(s); } // User assignment. if let Some(s) = self .strategy_lookup( "SELECT s.modified_at, s.config_options_json, s.extra_json \ FROM strategies s \ JOIN strategy_assignments sa ON sa.strategy_id = s.id \ WHERE sa.user_id = ? \ ORDER BY sa.priority DESC LIMIT 1", &[&owner_id_str], ) .await? { return Ok(s); } Ok(ResolvedStrategy::default()) } async fn strategy_lookup( &self, sql: &str, params: &[&str], ) -> ResultType> { let mut q = sqlx::query(sql); for p in params { q = q.bind(*p); } let row = q .fetch_optional(self.pool.get().await?.deref_mut()) .await?; Ok(row.map(|r| ResolvedStrategy { modified_at: r.try_get("modified_at").unwrap_or(0), config_options_json: r .try_get::, _>("config_options_json") .unwrap_or_default() .unwrap_or_else(|| "{}".to_string()), extra_json: r .try_get::, _>("extra_json") .unwrap_or_default() .unwrap_or_else(|| "{}".to_string()), })) } // =================================================================== // M4: 2FA (TOTP) + pending challenges // =================================================================== /// Returns the user's TOTP secret if they have enrolled. Used by the /// login handler to decide whether to issue a `tfa_check` challenge. pub async fn totp_get_secret(&self, user_id: i64) -> ResultType> { let row = sqlx::query("SELECT secret_b32 FROM user_totp_secrets WHERE user_id = ?") .bind(user_id) .fetch_optional(self.pool.get().await?.deref_mut()) .await?; Ok(row.map(|r| r.try_get::("secret_b32").unwrap_or_default())) } /// Idempotent — re-enrolling overwrites the existing secret. pub async fn totp_enroll(&self, user_id: i64, secret_b32: &str) -> ResultType<()> { sqlx::query( "INSERT INTO user_totp_secrets(user_id, secret_b32, enrolled_at) \ VALUES(?, ?, strftime('%s','now')) \ ON CONFLICT(user_id) DO UPDATE SET \ secret_b32 = excluded.secret_b32, \ enrolled_at = strftime('%s','now')", ) .bind(user_id) .bind(secret_b32) .execute(self.pool.get().await?.deref_mut()) .await?; Ok(()) } pub async fn totp_unenroll(&self, user_id: i64) -> ResultType { let res = sqlx::query("DELETE FROM user_totp_secrets WHERE user_id = ?") .bind(user_id) .execute(self.pool.get().await?.deref_mut()) .await?; Ok(res.rows_affected() > 0) } /// Issue a short-lived TFA-challenge nonce. The login handler returns /// this in the `secret` field of `tfa_check`; the client echoes it back /// alongside the TOTP code. pub async fn tfa_challenge_create( &self, user_id: i64, ttl_secs: i64, ) -> ResultType { let nonce = base64::encode_config( sodiumoxide::randombytes::randombytes(24), base64::URL_SAFE_NO_PAD, ); let expires_at = chrono::Utc::now().timestamp() + ttl_secs; sqlx::query( "INSERT INTO pending_tfa_challenges(secret, user_id, expires_at) \ VALUES(?, ?, ?)", ) .bind(&nonce) .bind(user_id) .bind(expires_at) .execute(self.pool.get().await?.deref_mut()) .await?; Ok(nonce) } /// Look up a TFA challenge nonce. Returns the user_id if the row exists /// and has not expired; otherwise None. Does NOT delete the row — the /// caller deletes after the TOTP code itself has been verified, so a /// failed code attempt does not invalidate the challenge. pub async fn tfa_challenge_lookup(&self, nonce: &str) -> ResultType> { let row = sqlx::query( "SELECT user_id, expires_at FROM pending_tfa_challenges WHERE secret = ?", ) .bind(nonce) .fetch_optional(self.pool.get().await?.deref_mut()) .await?; let Some(row) = row else { return Ok(None) }; let expires_at: i64 = row.try_get("expires_at")?; if expires_at <= chrono::Utc::now().timestamp() { return Ok(None); } Ok(Some(row.try_get("user_id")?)) } pub async fn tfa_challenge_consume(&self, nonce: &str) -> ResultType<()> { sqlx::query("DELETE FROM pending_tfa_challenges WHERE secret = ?") .bind(nonce) .execute(self.pool.get().await?.deref_mut()) .await?; Ok(()) } /// Replace any prior pending codes for this email with a fresh one. The /// `code_hash` is sha256(code) — the plaintext code is mailed, never /// persisted. pub async fn email_code_create( &self, email: &str, code_sha256: &[u8], ttl_secs: i64, ) -> ResultType<()> { // Drop earlier pending codes for the same email so the latest one // wins and we don't accumulate row clutter. sqlx::query("DELETE FROM pending_email_codes WHERE email = ?") .bind(email) .execute(self.pool.get().await?.deref_mut()) .await?; let expires_at = chrono::Utc::now().timestamp() + ttl_secs; sqlx::query( "INSERT INTO pending_email_codes(email, code_hash, expires_at) \ VALUES(?, ?, ?)", ) .bind(email) .bind(code_sha256) .bind(expires_at) .execute(self.pool.get().await?.deref_mut()) .await?; Ok(()) } /// Verify a code attempt for `email`. Returns: /// - `Ok(true)` on success — the row is consumed. /// - `Ok(false)` on bad code or no pending row — the attempts counter is /// bumped; after 5 attempts the row is purged. /// - `Err(_)` on DB error. pub async fn email_code_verify( &self, email: &str, code_sha256: &[u8], ) -> ResultType { let now = chrono::Utc::now().timestamp(); let row = sqlx::query( "SELECT id, code_hash, expires_at, attempts \ FROM pending_email_codes WHERE email = ? \ ORDER BY id DESC LIMIT 1", ) .bind(email) .fetch_optional(self.pool.get().await?.deref_mut()) .await?; let Some(row) = row else { return Ok(false) }; let id: i64 = row.try_get("id")?; let stored: Vec = row.try_get("code_hash")?; let expires_at: i64 = row.try_get("expires_at")?; let attempts: i64 = row.try_get("attempts")?; if expires_at <= now { sqlx::query("DELETE FROM pending_email_codes WHERE id = ?") .bind(id) .execute(self.pool.get().await?.deref_mut()) .await?; return Ok(false); } if stored.len() == code_sha256.len() && constant_time_eq(&stored, code_sha256) { sqlx::query("DELETE FROM pending_email_codes WHERE id = ?") .bind(id) .execute(self.pool.get().await?.deref_mut()) .await?; return Ok(true); } let new_attempts = attempts + 1; if new_attempts >= 5 { sqlx::query("DELETE FROM pending_email_codes WHERE id = ?") .bind(id) .execute(self.pool.get().await?.deref_mut()) .await?; } else { sqlx::query("UPDATE pending_email_codes SET attempts = ? WHERE id = ?") .bind(new_attempts) .bind(id) .execute(self.pool.get().await?.deref_mut()) .await?; } Ok(false) } pub async fn strategy_find_by_name(&self, name: &str) -> ResultType> { let row = sqlx::query("SELECT id FROM strategies WHERE name = ?") .bind(name) .fetch_optional(self.pool.get().await?.deref_mut()) .await?; Ok(row.map(|r| r.try_get::("id").unwrap_or(0))) } /// Replace any existing peer-scoped assignment for this peer with the /// new strategy. Keeps the resolver's "peer > group > user" priority /// stable per peer. pub async fn strategy_assign_peer( &self, strategy_id: i64, peer_id: &str, ) -> ResultType<()> { sqlx::query("DELETE FROM strategy_assignments WHERE peer_id = ?") .bind(peer_id) .execute(self.pool.get().await?.deref_mut()) .await?; sqlx::query( "INSERT INTO strategy_assignments(strategy_id, peer_id, priority) \ VALUES(?, ?, 100)", ) .bind(strategy_id) .bind(peer_id) .execute(self.pool.get().await?.deref_mut()) .await?; Ok(()) } /// Ensure `group_name` exists, create it if missing, and add `user_id` as /// a member if not already present. pub async fn device_group_ensure_member( &self, group_name: &str, user_id: i64, ) -> ResultType<()> { // Upsert the group itself. sqlx::query("INSERT OR IGNORE INTO device_groups(name) VALUES(?)") .bind(group_name) .execute(self.pool.get().await?.deref_mut()) .await?; let row = sqlx::query("SELECT id FROM device_groups WHERE name = ?") .bind(group_name) .fetch_one(self.pool.get().await?.deref_mut()) .await?; let gid: i64 = row.try_get("id")?; sqlx::query( "INSERT OR IGNORE INTO device_group_members(device_group_id, user_id) \ VALUES(?, ?)", ) .bind(gid) .bind(user_id) .execute(self.pool.get().await?.deref_mut()) .await?; Ok(()) } // =================================================================== // M4: OIDC provider config + session state // =================================================================== pub async fn oidc_provider_upsert(&self, p: &OidcProviderRow) -> ResultType<()> { sqlx::query( "INSERT INTO oidc_providers(name, display_name, icon_url, issuer_url, \ client_id, client_secret, scopes, redirect_url, enabled, \ admin_role, roles_claim) \ VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) \ ON CONFLICT(name) DO UPDATE SET \ display_name = excluded.display_name, \ icon_url = excluded.icon_url, \ issuer_url = excluded.issuer_url, \ client_id = excluded.client_id, \ client_secret = excluded.client_secret, \ scopes = excluded.scopes, \ redirect_url = excluded.redirect_url, \ enabled = excluded.enabled, \ admin_role = excluded.admin_role, \ roles_claim = excluded.roles_claim", ) .bind(&p.name) .bind(p.display_name.as_deref()) .bind(p.icon_url.as_deref()) .bind(&p.issuer_url) .bind(&p.client_id) .bind(&p.client_secret) .bind(&p.scopes) .bind(&p.redirect_url) .bind(if p.enabled { 1 } else { 0 }) .bind(p.admin_role.as_deref()) .bind(p.roles_claim.as_deref()) .execute(self.pool.get().await?.deref_mut()) .await?; Ok(()) } pub async fn oidc_provider_get(&self, name: &str) -> ResultType> { let row = sqlx::query( "SELECT name, display_name, icon_url, issuer_url, client_id, client_secret, \ scopes, redirect_url, enabled, admin_role, roles_claim \ FROM oidc_providers WHERE name = ? AND enabled = 1", ) .bind(name) .fetch_optional(self.pool.get().await?.deref_mut()) .await?; Ok(row.map(row_to_oidc_provider)) } pub async fn oidc_provider_list_enabled(&self) -> ResultType> { let rows = sqlx::query( "SELECT name, display_name, icon_url, issuer_url, client_id, client_secret, \ scopes, redirect_url, enabled, admin_role, roles_claim \ FROM oidc_providers WHERE enabled = 1 ORDER BY name", ) .fetch_all(self.pool.get().await?.deref_mut()) .await?; Ok(rows.into_iter().map(row_to_oidc_provider).collect()) } pub async fn oidc_session_create( &self, s: &OidcSessionInsert<'_>, ) -> ResultType<()> { sqlx::query( "INSERT INTO oidc_sessions(code, provider, state, client_id_str, client_uuid, \ device_info_json, created_at, expires_at) \ VALUES(?, ?, ?, ?, ?, ?, strftime('%s','now'), ?)", ) .bind(s.code) .bind(s.provider) .bind(s.state) .bind(s.client_id_str) .bind(s.client_uuid) .bind(s.device_info_json) .bind(s.expires_at) .execute(self.pool.get().await?.deref_mut()) .await?; Ok(()) } pub async fn oidc_session_get_by_code( &self, code: &str, ) -> ResultType> { let row = sqlx::query( "SELECT code, provider, state, client_id_str, client_uuid, device_info_json, \ expires_at, status, access_token, user_id, error \ FROM oidc_sessions WHERE code = ?", ) .bind(code) .fetch_optional(self.pool.get().await?.deref_mut()) .await?; Ok(row.map(row_to_oidc_session)) } pub async fn oidc_session_get_by_state( &self, state: &str, ) -> ResultType> { let row = sqlx::query( "SELECT code, provider, state, client_id_str, client_uuid, device_info_json, \ expires_at, status, access_token, user_id, error \ FROM oidc_sessions WHERE state = ?", ) .bind(state) .fetch_optional(self.pool.get().await?.deref_mut()) .await?; Ok(row.map(row_to_oidc_session)) } pub async fn oidc_session_complete( &self, code: &str, access_token: &str, user_id: i64, ) -> ResultType<()> { sqlx::query( "UPDATE oidc_sessions SET status = 'success', access_token = ?, user_id = ? \ WHERE code = ?", ) .bind(access_token) .bind(user_id) .bind(code) .execute(self.pool.get().await?.deref_mut()) .await?; Ok(()) } pub async fn oidc_session_fail(&self, code: &str, error: &str) -> ResultType<()> { sqlx::query( "UPDATE oidc_sessions SET status = 'error', error = ? WHERE code = ?", ) .bind(error) .bind(code) .execute(self.pool.get().await?.deref_mut()) .await?; Ok(()) } /// Find an existing user by their OIDC `sub`, falling back to email /// (case-insensitive). Returns `None` if neither matches; the caller /// then decides whether to auto-provision a new user. pub async fn user_find_by_oidc( &self, oidc_subject: &str, email: Option<&str>, ) -> ResultType> { let row = sqlx::query( "SELECT id, username, password_hash, display_name, email, note, avatar, status, is_admin, oidc_subject \ FROM users WHERE oidc_subject = ? LIMIT 1", ) .bind(oidc_subject) .fetch_optional(self.pool.get().await?.deref_mut()) .await?; if let Some(r) = row { return Ok(Some(row_to_user(r))); } if let Some(e) = email.filter(|s| !s.is_empty()) { return self.user_find_by_email(e).await; } Ok(None) } /// Create or update a user from an OIDC identity. The local username is /// either the email (preferred) or the sub if no email. Subsequent /// logins re-use the same row via oidc_subject. /// Find-or-create a local user from an OIDC sign-in. When /// `desired_admin` is `Some(b)`, the user's `is_admin` flag is forced /// to `b` on every login (used when role-based admin sync is configured /// on the provider — both promotion and demotion at the IdP propagate). /// `None` leaves `is_admin` untouched on existing rows and defaults to /// `false` for new ones. pub async fn user_upsert_oidc( &self, oidc_subject: &str, email: Option<&str>, display_name: Option<&str>, desired_admin: Option, ) -> ResultType { let username = email .filter(|s| !s.is_empty()) .map(|s| s.to_string()) .unwrap_or_else(|| format!("oidc:{}", oidc_subject)); if let Some(existing) = self.user_find_by_oidc(oidc_subject, email).await? { // Make sure the oidc_subject is recorded on this row even if we // matched by email — keeps subsequent lookups O(1). sqlx::query( "UPDATE users SET oidc_subject = ?, email = COALESCE(NULLIF(?, ''), email), \ display_name = COALESCE(NULLIF(?, ''), display_name) WHERE id = ?", ) .bind(oidc_subject) .bind(email.unwrap_or("")) .bind(display_name.unwrap_or("")) .bind(existing.id) .execute(self.pool.get().await?.deref_mut()) .await?; if let Some(want) = desired_admin { sqlx::query("UPDATE users SET is_admin = ? WHERE id = ?") .bind(if want { 1i64 } else { 0 }) .bind(existing.id) .execute(self.pool.get().await?.deref_mut()) .await?; } return Ok(self .user_find_by_id(existing.id) .await? .unwrap_or(existing)); } // New user. Empty password_hash blocks password login until the // operator (or the user) sets one. let initial_admin: i64 = if matches!(desired_admin, Some(true)) { 1 } else { 0 }; sqlx::query( "INSERT INTO users(username, password_hash, display_name, email, status, is_admin, oidc_subject) \ VALUES(?, '', ?, ?, 1, ?, ?)", ) .bind(&username) .bind(display_name.unwrap_or("")) .bind(email.unwrap_or("")) .bind(initial_admin) .bind(oidc_subject) .execute(self.pool.get().await?.deref_mut()) .await?; self.user_find_by_username(&username) .await? .ok_or_else(|| hbb_common::anyhow::anyhow!("post-insert lookup failed")) } pub async fn user_find_by_email(&self, email: &str) -> ResultType> { let row = sqlx::query( "SELECT id, username, password_hash, display_name, email, note, avatar, status, is_admin, oidc_subject \ FROM users WHERE email = ? COLLATE NOCASE LIMIT 1", ) .bind(email) .fetch_optional(self.pool.get().await?.deref_mut()) .await?; Ok(row.map(row_to_user)) } /// Read all queued heartbeat commands for `peer_id` and delete them in /// the same transaction. Each command is read at most once. pub async fn heartbeat_pop_commands( &self, peer_id: &str, ) -> ResultType> { let mut guard = self.pool.get().await?; let conn: &mut SqliteConnection = guard.deref_mut(); let mut tx = conn.begin().await?; let rows = sqlx::query( "SELECT kind, payload FROM heartbeat_commands WHERE peer_id = ?", ) .bind(peer_id) .fetch_all(&mut tx) .await?; if rows.is_empty() { return Ok(vec![]); } sqlx::query("DELETE FROM heartbeat_commands WHERE peer_id = ?") .bind(peer_id) .execute(&mut tx) .await?; tx.commit().await?; Ok(rows .into_iter() .map(|r| HeartbeatCommand { kind: r.try_get("kind").unwrap_or_default(), payload: r.try_get::, _>("payload").unwrap_or_default(), }) .collect()) } pub async fn get_peer(&self, id: &str) -> ResultType> { Ok(sqlx::query_as!( Peer, "select guid, id, uuid, pk, user, status, info from peer where id = ?", id ) .fetch_optional(self.pool.get().await?.deref_mut()) .await?) } pub async fn insert_peer( &self, id: &str, uuid: &[u8], pk: &[u8], info: &str, ) -> ResultType> { let guid = uuid::Uuid::new_v4().as_bytes().to_vec(); sqlx::query!( "insert into peer(guid, id, uuid, pk, info) values(?, ?, ?, ?, ?)", guid, id, uuid, pk, info ) .execute(self.pool.get().await?.deref_mut()) .await?; Ok(guid) } pub async fn update_pk( &self, guid: &Vec, id: &str, pk: &[u8], info: &str, ) -> ResultType<()> { sqlx::query!( "update peer set id=?, pk=?, info=? where guid=?", id, pk, info, guid ) .execute(self.pool.get().await?.deref_mut()) .await?; Ok(()) } } /// Timing-safe equality for hash comparisons. Slightly paranoid given the /// codes are short-lived, but cheap. fn constant_time_eq(a: &[u8], b: &[u8]) -> bool { if a.len() != b.len() { return false; } let mut diff: u8 = 0; for (x, y) in a.iter().zip(b.iter()) { diff |= x ^ y; } diff == 0 } fn row_to_user(row: sqlx::sqlite::SqliteRow) -> UserRow { let is_admin: i64 = row.try_get("is_admin").unwrap_or(0); UserRow { id: row.try_get("id").unwrap_or(0), username: row.try_get("username").unwrap_or_default(), password_hash: row.try_get("password_hash").unwrap_or_default(), display_name: row.try_get("display_name").unwrap_or_default(), email: row.try_get("email").unwrap_or_default(), note: row.try_get("note").unwrap_or_default(), avatar: row.try_get("avatar").unwrap_or_default(), status: row.try_get("status").unwrap_or(1), is_admin: is_admin != 0, oidc_subject: row.try_get::, _>("oidc_subject").ok().flatten(), } } fn row_to_oidc_provider(row: sqlx::sqlite::SqliteRow) -> OidcProviderRow { let enabled: i64 = row.try_get("enabled").unwrap_or(0); OidcProviderRow { name: row.try_get("name").unwrap_or_default(), display_name: row .try_get::, _>("display_name") .ok() .flatten(), icon_url: row.try_get::, _>("icon_url").ok().flatten(), issuer_url: row.try_get("issuer_url").unwrap_or_default(), client_id: row.try_get("client_id").unwrap_or_default(), client_secret: row.try_get("client_secret").unwrap_or_default(), scopes: row .try_get("scopes") .unwrap_or_else(|_| "openid email profile".to_string()), redirect_url: row.try_get("redirect_url").unwrap_or_default(), enabled: enabled != 0, admin_role: row .try_get::, _>("admin_role") .ok() .flatten(), roles_claim: row .try_get::, _>("roles_claim") .ok() .flatten(), } } fn row_to_oidc_session(row: sqlx::sqlite::SqliteRow) -> OidcSessionRow { OidcSessionRow { code: row.try_get("code").unwrap_or_default(), provider: row.try_get("provider").unwrap_or_default(), state: row.try_get("state").unwrap_or_default(), client_id_str: row.try_get("client_id_str").unwrap_or_default(), client_uuid: row.try_get("client_uuid").unwrap_or_default(), device_info_json: row .try_get("device_info_json") .unwrap_or_else(|_| "{}".to_string()), expires_at: row.try_get("expires_at").unwrap_or(0), status: row .try_get("status") .unwrap_or_else(|_| "pending".to_string()), access_token: row.try_get::, _>("access_token").ok().flatten(), user_id: row.try_get::, _>("user_id").ok().flatten(), error: row.try_get::, _>("error").ok().flatten(), } } const M1_SCHEMA: &[&str] = &[ "CREATE TABLE IF NOT EXISTS users ( id INTEGER PRIMARY KEY AUTOINCREMENT, username TEXT NOT NULL UNIQUE, password_hash TEXT NOT NULL, display_name TEXT NOT NULL DEFAULT '', email TEXT NOT NULL DEFAULT '', note TEXT NOT NULL DEFAULT '', avatar TEXT NOT NULL DEFAULT '', status INTEGER NOT NULL DEFAULT 1, is_admin INTEGER NOT NULL DEFAULT 0, created_at DATETIME NOT NULL DEFAULT(current_timestamp), updated_at DATETIME NOT NULL DEFAULT(current_timestamp) )", "CREATE UNIQUE INDEX IF NOT EXISTS idx_users_username ON users(username)", "CREATE TABLE IF NOT EXISTS tokens ( id INTEGER PRIMARY KEY AUTOINCREMENT, user_id INTEGER NOT NULL, token_sha256 BLOB NOT NULL UNIQUE, peer_id TEXT NOT NULL DEFAULT '', peer_uuid TEXT NOT NULL DEFAULT '', device_info TEXT NOT NULL DEFAULT '', created_at DATETIME NOT NULL DEFAULT(current_timestamp), last_used_at DATETIME NOT NULL DEFAULT(current_timestamp), expires_at DATETIME NOT NULL, FOREIGN KEY(user_id) REFERENCES users(id) ON DELETE CASCADE )", "CREATE INDEX IF NOT EXISTS idx_tokens_user ON tokens(user_id)", "CREATE INDEX IF NOT EXISTS idx_tokens_expires ON tokens(expires_at)", "CREATE TABLE IF NOT EXISTS device_sysinfo ( id TEXT NOT NULL, uuid TEXT NOT NULL, version INTEGER NOT NULL DEFAULT 0, last_seen_at DATETIME NOT NULL DEFAULT(current_timestamp), last_heartbeat_at DATETIME NOT NULL DEFAULT(current_timestamp), conns TEXT NOT NULL DEFAULT '[]', payload TEXT NOT NULL DEFAULT '{}', sysinfo_ver_seen TEXT NOT NULL DEFAULT '', updated_at DATETIME NOT NULL DEFAULT(current_timestamp), PRIMARY KEY (id, uuid) )", "CREATE INDEX IF NOT EXISTS idx_device_sysinfo_lastseen ON device_sysinfo(last_seen_at)", ]; const M2_SCHEMA: &[&str] = &[ "CREATE TABLE IF NOT EXISTS address_books ( guid TEXT PRIMARY KEY, owner_user_id INTEGER NOT NULL, name TEXT NOT NULL, note TEXT, kind INTEGER NOT NULL, info_json TEXT, created_at INTEGER NOT NULL DEFAULT (strftime('%s','now')) )", "CREATE UNIQUE INDEX IF NOT EXISTS idx_ab_owner_kind_name \ ON address_books(owner_user_id, kind, name)", "CREATE INDEX IF NOT EXISTS idx_ab_owner ON address_books(owner_user_id)", "CREATE TABLE IF NOT EXISTS device_groups ( id INTEGER PRIMARY KEY AUTOINCREMENT, name TEXT NOT NULL UNIQUE, note TEXT, created_at INTEGER NOT NULL DEFAULT (strftime('%s','now')) )", "CREATE TABLE IF NOT EXISTS device_group_members ( device_group_id INTEGER NOT NULL, user_id INTEGER, PRIMARY KEY (device_group_id, user_id) )", "CREATE INDEX IF NOT EXISTS idx_dgm_user ON device_group_members(user_id)", // SQLite forbids expressions in PRIMARY KEY constraints, so we use a // unique index over the COALESCEd tuple to enforce one share per // (ab, user) and one per (ab, group). NULLs collapse to 0 so two NULL // user_ids on the same ab + group still conflict. "CREATE TABLE IF NOT EXISTS address_book_shares ( ab_guid TEXT NOT NULL, user_id INTEGER, group_id INTEGER, rule INTEGER NOT NULL )", "CREATE UNIQUE INDEX IF NOT EXISTS uq_abshare \ ON address_book_shares(ab_guid, COALESCE(user_id,0), COALESCE(group_id,0))", "CREATE INDEX IF NOT EXISTS idx_abshare_user ON address_book_shares(user_id)", "CREATE INDEX IF NOT EXISTS idx_abshare_group ON address_book_shares(group_id)", "CREATE TABLE IF NOT EXISTS address_book_peers ( ab_guid TEXT NOT NULL, peer_id TEXT NOT NULL, alias TEXT, note TEXT, password TEXT, hash TEXT, username TEXT, hostname TEXT, platform TEXT, updated_at INTEGER NOT NULL DEFAULT (strftime('%s','now')), PRIMARY KEY (ab_guid, peer_id) )", "CREATE TABLE IF NOT EXISTS address_book_tags ( ab_guid TEXT NOT NULL, name TEXT NOT NULL, color INTEGER NOT NULL, PRIMARY KEY (ab_guid, name) )", "CREATE TABLE IF NOT EXISTS address_book_peer_tags ( ab_guid TEXT NOT NULL, peer_id TEXT NOT NULL, tag_name TEXT NOT NULL, PRIMARY KEY (ab_guid, peer_id, tag_name) )", "CREATE INDEX IF NOT EXISTS idx_abpt_peer ON address_book_peer_tags(ab_guid, peer_id)", "CREATE INDEX IF NOT EXISTS idx_abpt_tag ON address_book_peer_tags(ab_guid, tag_name)", ]; const M2_SOFT_ALTERS: &[&str] = &[ // Bind a device to its enrolled user. Filled by the login handler when // the client passes id+uuid in the body. "ALTER TABLE device_sysinfo ADD COLUMN user_id INTEGER", // OIDC `sub` claim, used to map an IdP identity to a local user across // sessions. Nullable so password-only users keep working. "ALTER TABLE users ADD COLUMN oidc_subject TEXT", // Optional role-based admin sync. When `admin_role` is non-NULL the // OIDC callback evaluates the userinfo claim at `roles_claim` // (defaulting to "roles") and sets is_admin accordingly on every // login — promotion AND demotion at the IdP propagate. "ALTER TABLE oidc_providers ADD COLUMN admin_role TEXT", "ALTER TABLE oidc_providers ADD COLUMN roles_claim TEXT", // Unattended-access password. Some agents (hello-agent) generate a // random "permanent password" on every boot and report it back here // so a supporter can reach the box when no user is logged in to // approve a connection. Stored as plaintext on purpose: the admin UI // displays it for the operator to read, and it rotates each boot. "ALTER TABLE device_sysinfo ADD COLUMN unattended_password TEXT", "ALTER TABLE device_sysinfo ADD COLUMN unattended_password_set_at DATETIME", ]; const M3_SCHEMA: &[&str] = &[ // Audit conn rows are keyed by an opaque guid that we hand back to the // client so the operator's end-of-session note dialog can attach a note // to the right session. "CREATE TABLE IF NOT EXISTS audit_conn ( guid TEXT PRIMARY KEY, peer_id TEXT NOT NULL, remote_id TEXT, conn_id INTEGER NOT NULL DEFAULT 0, session_id INTEGER NOT NULL DEFAULT 0, ip TEXT, action TEXT NOT NULL, note TEXT, started_at INTEGER NOT NULL, ended_at INTEGER )", "CREATE INDEX IF NOT EXISTS idx_audit_conn_peer ON audit_conn(peer_id, started_at)", "CREATE TABLE IF NOT EXISTS audit_file ( id INTEGER PRIMARY KEY AUTOINCREMENT, peer_id TEXT NOT NULL, remote_peer TEXT, direction INTEGER NOT NULL, path TEXT NOT NULL, is_file INTEGER NOT NULL, info_json TEXT NOT NULL, at INTEGER NOT NULL DEFAULT (strftime('%s','now')) )", "CREATE INDEX IF NOT EXISTS idx_audit_file_peer ON audit_file(peer_id, at)", "CREATE TABLE IF NOT EXISTS audit_alarm ( id INTEGER PRIMARY KEY AUTOINCREMENT, peer_id TEXT NOT NULL, typ INTEGER NOT NULL, info_json TEXT NOT NULL, at INTEGER NOT NULL DEFAULT (strftime('%s','now')) )", "CREATE INDEX IF NOT EXISTS idx_audit_alarm_peer ON audit_alarm(peer_id, at)", "CREATE TABLE IF NOT EXISTS recordings ( filename TEXT PRIMARY KEY, peer_id TEXT NOT NULL, size INTEGER NOT NULL DEFAULT 0, state TEXT NOT NULL, started_at INTEGER NOT NULL DEFAULT (strftime('%s','now')), finished_at INTEGER )", "CREATE INDEX IF NOT EXISTS idx_recordings_peer ON recordings(peer_id, started_at)", "CREATE TABLE IF NOT EXISTS strategies ( id INTEGER PRIMARY KEY AUTOINCREMENT, name TEXT NOT NULL UNIQUE, modified_at INTEGER NOT NULL DEFAULT 0, config_options_json TEXT NOT NULL DEFAULT '{}', extra_json TEXT NOT NULL DEFAULT '{}' )", // strategy_assignments: exactly one of (user_id, device_group_id, peer_id) // is non-null per row. Resolution priority is encoded by `priority` // (higher wins on ties within the same scope). "CREATE TABLE IF NOT EXISTS strategy_assignments ( id INTEGER PRIMARY KEY AUTOINCREMENT, strategy_id INTEGER NOT NULL, user_id INTEGER, device_group_id INTEGER, peer_id TEXT, priority INTEGER NOT NULL DEFAULT 0 )", "CREATE INDEX IF NOT EXISTS idx_strategy_assign_peer ON strategy_assignments(peer_id)", "CREATE INDEX IF NOT EXISTS idx_strategy_assign_user ON strategy_assignments(user_id)", "CREATE INDEX IF NOT EXISTS idx_strategy_assign_group ON strategy_assignments(device_group_id)", // heartbeat_commands: one-shot commands the next /api/heartbeat // response delivers, then deletes. `kind` is one of 'disconnect' // (payload = JSON array of conn_ids) or 'sysinfo' (payload null). "CREATE TABLE IF NOT EXISTS heartbeat_commands ( peer_id TEXT NOT NULL, kind TEXT NOT NULL, payload TEXT, created_at INTEGER NOT NULL DEFAULT (strftime('%s','now')), PRIMARY KEY (peer_id, kind) )", ]; const M4_SCHEMA: &[&str] = &[ // TOTP enrollment per user. The shared secret is stored base32 to match // how authenticator apps encode it; the operator scans/enters it. "CREATE TABLE IF NOT EXISTS user_totp_secrets ( user_id INTEGER PRIMARY KEY, secret_b32 TEXT NOT NULL, enrolled_at INTEGER NOT NULL, recovery_codes_json TEXT )", // Short-lived nonce echoed back by the client during the second login // POST. Login flow: password verified -> tfa_check{secret=} -> // client sends tfa_code{secret=, tfaCode=<6 digits>}. "CREATE TABLE IF NOT EXISTS pending_tfa_challenges ( secret TEXT PRIMARY KEY, user_id INTEGER NOT NULL, expires_at INTEGER NOT NULL )", "CREATE INDEX IF NOT EXISTS idx_pending_tfa_user ON pending_tfa_challenges(user_id)", // Pending email-login codes. Hashed at rest so a DB leak doesn't // immediately give an attacker a working code; bcrypt would be overkill // for a 6-digit secret with a 10-minute TTL — sha256 is enough. "CREATE TABLE IF NOT EXISTS pending_email_codes ( id INTEGER PRIMARY KEY AUTOINCREMENT, email TEXT NOT NULL, code_hash BLOB NOT NULL, expires_at INTEGER NOT NULL, attempts INTEGER NOT NULL DEFAULT 0, created_at INTEGER NOT NULL DEFAULT (strftime('%s','now')) )", "CREATE INDEX IF NOT EXISTS idx_pending_email_email ON pending_email_codes(email)", "CREATE INDEX IF NOT EXISTS idx_pending_email_expires ON pending_email_codes(expires_at)", // OIDC providers and in-flight device-flow sessions. Providers are // upserted at startup from the operator-supplied --oidc-config TOML // (or hand-inserted via SQL). "CREATE TABLE IF NOT EXISTS oidc_providers ( name TEXT PRIMARY KEY, display_name TEXT, icon_url TEXT, issuer_url TEXT NOT NULL, client_id TEXT NOT NULL, client_secret TEXT NOT NULL, scopes TEXT NOT NULL DEFAULT 'openid email profile', redirect_url TEXT NOT NULL, enabled INTEGER NOT NULL DEFAULT 1 )", // `code` is the opaque handle the client polls with; `state` is the // CSRF token round-tripped through the IdP. Status transitions: // pending -> success | error. "CREATE TABLE IF NOT EXISTS oidc_sessions ( code TEXT PRIMARY KEY, provider TEXT NOT NULL, state TEXT NOT NULL UNIQUE, client_id_str TEXT NOT NULL DEFAULT '', client_uuid TEXT NOT NULL DEFAULT '', device_info_json TEXT NOT NULL DEFAULT '{}', created_at INTEGER NOT NULL, expires_at INTEGER NOT NULL, status TEXT NOT NULL DEFAULT 'pending', access_token TEXT, user_id INTEGER, error TEXT )", "CREATE INDEX IF NOT EXISTS idx_oidc_sessions_status ON oidc_sessions(status, expires_at)", ]; #[cfg(test)] mod tests { use hbb_common::tokio; #[test] fn test_insert() { insert(); } #[tokio::main(flavor = "multi_thread")] async fn insert() { let db = super::Database::new("test.sqlite3").await.unwrap(); let mut jobs = vec![]; for i in 0..10000 { let cloned = db.clone(); let id = i.to_string(); let a = tokio::spawn(async move { let empty_vec = Vec::new(); cloned .insert_peer(&id, &empty_vec, &empty_vec, "") .await .unwrap(); }); jobs.push(a); } for i in 0..10000 { let cloned = db.clone(); let id = i.to_string(); let a = tokio::spawn(async move { cloned.get_peer(&id).await.unwrap(); }); jobs.push(a); } hbb_common::futures::future::join_all(jobs).await; } }