Initial commit: hello-agent — headless RustDesk-protocol-compatible Windows agent
build-windows / build-hello-agent-x64 (push) Successful in 5m41s

A single-binary, Flutter-free remote-support agent that speaks the stock
RustDesk wire protocol. Designed for one-line MDM deployment against a
self-hosted rustdesk-server: a supporter using the unmodified rustdesk.exe
client connects, the controlled-side user gets a native Win32 approval
prompt, click Yes / No.

CLI surface

    hello-agent.exe --install                # register + start service
    hello-agent.exe --uninstall              # stop, delete, clean up
    hello-agent.exe --config <BLOB>          # admin-UI deploy string
    hello-agent.exe --install --config <BLOB>   # MDM one-liner

--config accepts both forms emitted by the rustdesk-server admin UI: the
reversed-base64 deploy string and the host=,key=,api=,relay= filename
form. Decoded via the upstream custom_server module, persisted via
hbb_common::config::Config::set_option.

Architecture

    --service runs as a Session 0 LocalSystem service. It polls
    WTSGetActiveConsoleSessionId and (re)spawns hello-agent.exe --server
    into the active console session via librustdesk::platform::run_as_user,
    handling the Session 0 → user-session token impersonation.

    --server is the worker. It boots three concurrent components:
      1. cm_popup: an IPC listener on the rustdesk `_cm` named pipe
      2. librustdesk::start_server(true, false): the upstream protocol
         stack — rendezvous mediator, NAT punch, IPC server, screen
         capture, login validation, hbbs_http heartbeat / sysinfo sync
      3. (implicit) ApproveMode::Click is pinned in config, so every
         incoming connection routes through cm_popup

The popup mechanism reuses an existing upstream contract without any
patches to the protocol code: when a peer connects with no password,
Connection::start in the upstream code calls try_start_cm_ipc, which
ipc::connect-s the `_cm` pipe before falling back to spawning a Flutter
CM child. Since cm_popup is up first, step 1 succeeds; we read the
Data::Login{authorized:false} frame, show MessageBoxTimeoutW (Yes/No,
60s, top-most, system-modal), and reply Data::Authorize or Data::Close.

Source tree

    src/main.rs             CLI dispatcher + run_server() composition
    src/cli.rs              hand-rolled argv parser + unit tests
    src/service.rs          windows-service install/uninstall/dispatcher
    src/config_import.rs    --config blob decoding + persistence
    src/cm_popup.rs         _cm IPC listener + Win32 approval dialog

Vendoring

The upstream RustDesk crate is vendored under vendor/rustdesk/ — full
workspace including libs/{hbb_common, scrap, enigo, clipboard,
virtual_display, remote_printer}. This makes the build self-contained
(no submodules, no sibling-repo checkout in CI) and gives us freedom to
fork in a different direction later. Excluded from the vendor: .git,
target/, flutter/, appimage/, flatpak/, fastlane/, docs/, examples/,
ci/, build.py, Dockerfile, upstream README/CLAUDE/AGENTS/GEMINI.

One local divergence vs. upstream: vendor/rustdesk/src/lib.rs flips
`mod custom_server` → `pub mod custom_server` so config_import.rs can
call get_custom_server_from_string without going through the
ui_interface shim. Documented in README.md → "Re-syncing the vendored
copy".

CI

.gitea/workflows/build-windows.yml builds on a self-hosted Windows
runner with Rust 1.75, LLVM 15.0.6 (libclang for bindgen via libvpx-sys),
and a vcpkg cache. The vendored vcpkg.json drives x64-windows-static
deps. The workflow stages the resulting hello-agent.exe into
SignOutput\, reports authenticode signing status (warns on unsigned),
and uploads as artifact. ~15 min full build, faster on incremental.

Out of scope for this commit: Linux/macOS builds, code signing, MSI
packaging, coexistence with stock rustdesk on the same box (currently
shares the RustDesk APP_NAME and config dir).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-05-07 11:01:30 +02:00
commit f8ead215d8
479 changed files with 188052 additions and 0 deletions
+366
View File
@@ -0,0 +1,366 @@
use super::HbbHttpResponse;
use crate::hbbs_http::create_http_client_with_url;
use hbb_common::{config::LocalConfig, log, ResultType};
use serde_derive::{Deserialize, Serialize};
use serde_repr::{Deserialize_repr, Serialize_repr};
use std::{
collections::HashMap,
sync::{Arc, RwLock},
time::{Duration, Instant},
};
use url::Url;
lazy_static::lazy_static! {
static ref OIDC_SESSION: Arc<RwLock<OidcSession>> = Arc::new(RwLock::new(OidcSession::new()));
}
const QUERY_INTERVAL_SECS: f32 = 1.0;
const QUERY_TIMEOUT_SECS: u64 = 60 * 3;
const REQUESTING_ACCOUNT_AUTH: &str = "Requesting account auth";
const WAITING_ACCOUNT_AUTH: &str = "Waiting account auth";
const LOGIN_ACCOUNT_AUTH: &str = "Login account auth";
#[derive(Deserialize, Clone, Debug)]
pub struct OidcAuthUrl {
code: String,
url: Url,
}
#[derive(Debug, Deserialize, Serialize, Default, Clone)]
pub struct DeviceInfo {
/// Linux , Windows , Android ...
#[serde(default)]
pub os: String,
/// `browser` or `client`
#[serde(default)]
pub r#type: String,
/// device name from rustdesk client,
/// browser info(name + version) from browser
#[serde(default)]
pub name: String,
}
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct WhitelistItem {
data: String, // ip / device uuid
info: DeviceInfo,
exp: u64,
}
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct UserInfo {
#[serde(default, flatten)]
pub settings: UserSettings,
#[serde(default)]
pub login_device_whitelist: Vec<WhitelistItem>,
#[serde(default)]
pub other: HashMap<String, String>,
}
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct UserSettings {
#[serde(default)]
pub email_verification: bool,
#[serde(default)]
pub email_alarm_notification: bool,
}
#[derive(Debug, Clone, Copy, PartialEq, Serialize_repr, Deserialize_repr)]
#[repr(i64)]
pub enum UserStatus {
Disabled = 0,
Normal = 1,
Unverified = -1,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct UserPayload {
pub name: String,
#[serde(default)]
pub display_name: Option<String>,
#[serde(default)]
pub avatar: Option<String>,
#[serde(default)]
pub email: Option<String>,
#[serde(default)]
pub note: Option<String>,
#[serde(default)]
pub status: UserStatus,
pub info: UserInfo,
#[serde(default)]
pub is_admin: bool,
#[serde(default)]
pub third_auth_type: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AuthBody {
pub access_token: String,
pub r#type: String,
#[serde(default)]
pub tfa_type: String,
#[serde(default)]
pub secret: String,
pub user: UserPayload,
}
pub struct OidcSession {
warmed_api_server: Option<String>,
state_msg: &'static str,
failed_msg: String,
code_url: Option<OidcAuthUrl>,
auth_body: Option<AuthBody>,
keep_querying: bool,
running: bool,
query_timeout: Duration,
}
#[derive(Serialize)]
pub struct AuthResult {
pub state_msg: String,
pub failed_msg: String,
pub url: Option<String>,
pub auth_body: Option<AuthBody>,
}
impl Default for UserStatus {
fn default() -> Self {
UserStatus::Normal
}
}
impl OidcSession {
fn new() -> Self {
Self {
warmed_api_server: None,
state_msg: REQUESTING_ACCOUNT_AUTH,
failed_msg: "".to_owned(),
code_url: None,
auth_body: None,
keep_querying: false,
running: false,
query_timeout: Duration::from_secs(QUERY_TIMEOUT_SECS),
}
}
fn ensure_client(api_server: &str) {
let mut write_guard = OIDC_SESSION.write().unwrap();
if write_guard.warmed_api_server.as_deref() == Some(api_server) {
return;
}
// This URL is used to detect the appropriate TLS implementation for the server.
let login_option_url = format!("{}/api/login-options", api_server);
let _ = create_http_client_with_url(&login_option_url);
write_guard.warmed_api_server = Some(api_server.to_owned());
}
fn auth(
api_server: &str,
op: &str,
id: &str,
uuid: &str,
) -> ResultType<HbbHttpResponse<OidcAuthUrl>> {
Self::ensure_client(api_server);
let body = serde_json::json!({
"op": op,
"id": id,
"uuid": uuid,
"deviceInfo": crate::ui_interface::get_login_device_info(),
})
.to_string();
let resp = crate::post_request_sync(format!("{}/api/oidc/auth", api_server), body, "")?;
HbbHttpResponse::parse(&resp)
}
fn query(
api_server: &str,
code: &str,
id: &str,
uuid: &str,
) -> ResultType<HbbHttpResponse<AuthBody>> {
let url = Url::parse_with_params(
&format!("{}/api/oidc/auth-query", api_server),
&[("code", code), ("id", id), ("uuid", uuid)],
)?;
Self::ensure_client(api_server);
#[derive(Deserialize)]
struct HttpResponseBody {
body: String,
}
let resp = crate::http_request_sync(
url.to_string(),
"GET".to_owned(),
None,
"{}".to_owned(),
)?;
let resp = serde_json::from_str::<HttpResponseBody>(&resp)?;
HbbHttpResponse::parse(&resp.body)
}
fn reset(&mut self) {
self.state_msg = REQUESTING_ACCOUNT_AUTH;
self.failed_msg = "".to_owned();
self.keep_querying = true;
self.running = false;
self.code_url = None;
self.auth_body = None;
}
fn before_task(&mut self) {
self.reset();
self.running = true;
}
fn after_task(&mut self) {
self.running = false;
}
fn sleep(secs: f32) {
std::thread::sleep(std::time::Duration::from_secs_f32(secs));
}
fn auth_task(api_server: String, op: String, id: String, uuid: String, remember_me: bool) {
let auth_request_res = Self::auth(&api_server, &op, &id, &uuid);
log::info!("Request oidc auth result: {:?}", &auth_request_res);
let code_url = match auth_request_res {
Ok(HbbHttpResponse::<_>::Data(code_url)) => code_url,
Ok(HbbHttpResponse::<_>::Error(err)) => {
OIDC_SESSION
.write()
.unwrap()
.set_state(REQUESTING_ACCOUNT_AUTH, err);
return;
}
Ok(_) => {
OIDC_SESSION
.write()
.unwrap()
.set_state(REQUESTING_ACCOUNT_AUTH, "Invalid auth response".to_owned());
return;
}
Err(err) => {
OIDC_SESSION
.write()
.unwrap()
.set_state(REQUESTING_ACCOUNT_AUTH, err.to_string());
return;
}
};
OIDC_SESSION
.write()
.unwrap()
.set_state(WAITING_ACCOUNT_AUTH, "".to_owned());
OIDC_SESSION.write().unwrap().code_url = Some(code_url.clone());
let begin = Instant::now();
let query_timeout = OIDC_SESSION.read().unwrap().query_timeout;
while OIDC_SESSION.read().unwrap().keep_querying && begin.elapsed() < query_timeout {
match Self::query(&api_server, &code_url.code, &id, &uuid) {
Ok(HbbHttpResponse::<_>::Data(auth_body)) => {
if auth_body.r#type == "access_token" {
if remember_me {
LocalConfig::set_option(
"access_token".to_owned(),
auth_body.access_token.clone(),
);
LocalConfig::set_option(
"user_info".to_owned(),
serde_json::json!({
"name": auth_body.user.name,
"display_name": auth_body.user.display_name,
"avatar": auth_body.user.avatar,
"status": auth_body.user.status
})
.to_string(),
);
}
}
OIDC_SESSION
.write()
.unwrap()
.set_state(LOGIN_ACCOUNT_AUTH, "".to_owned());
OIDC_SESSION.write().unwrap().auth_body = Some(auth_body);
return;
}
Ok(HbbHttpResponse::<_>::Error(err)) => {
if err.contains("No authed oidc is found") {
// ignore, keep querying
} else {
OIDC_SESSION
.write()
.unwrap()
.set_state(WAITING_ACCOUNT_AUTH, err);
return;
}
}
Ok(_) => {
// ignore
}
Err(err) => {
log::trace!("Failed query oidc {}", err);
// ignore
}
}
Self::sleep(QUERY_INTERVAL_SECS);
}
if begin.elapsed() >= query_timeout {
OIDC_SESSION
.write()
.unwrap()
.set_state(WAITING_ACCOUNT_AUTH, "timeout".to_owned());
}
// no need to handle "keep_querying == false"
}
fn set_state(&mut self, state_msg: &'static str, failed_msg: String) {
self.state_msg = state_msg;
self.failed_msg = failed_msg;
}
fn wait_stop_querying() {
let wait_secs = 0.3;
while OIDC_SESSION.read().unwrap().running {
Self::sleep(wait_secs);
}
}
pub fn account_auth(
api_server: String,
op: String,
id: String,
uuid: String,
remember_me: bool,
) {
Self::auth_cancel();
Self::wait_stop_querying();
OIDC_SESSION.write().unwrap().before_task();
std::thread::spawn(move || {
Self::auth_task(api_server, op, id, uuid, remember_me);
OIDC_SESSION.write().unwrap().after_task();
});
}
fn get_result_(&self) -> AuthResult {
AuthResult {
state_msg: self.state_msg.to_string(),
failed_msg: self.failed_msg.clone(),
url: self.code_url.as_ref().map(|x| x.url.to_string()),
auth_body: self.auth_body.clone(),
}
}
pub fn auth_cancel() {
OIDC_SESSION.write().unwrap().keep_querying = false;
}
pub fn get_result() -> AuthResult {
OIDC_SESSION.read().unwrap().get_result_()
}
}
+309
View File
@@ -0,0 +1,309 @@
use super::create_http_client_async_with_url;
use hbb_common::{
bail,
lazy_static::lazy_static,
log,
tokio::{
self,
fs::File,
io::AsyncWriteExt,
sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender},
},
ResultType,
};
use serde_derive::Serialize;
use std::{collections::HashMap, path::PathBuf, sync::Mutex, time::Duration};
lazy_static! {
static ref DOWNLOADERS: Mutex<HashMap<String, Downloader>> = Default::default();
}
/// This struct is used to return the download data to the caller.
/// The caller should check if the file is downloaded successfully and remove the job from the map.
/// If the file is not downloaded successfully, the `data` field will be empty.
/// If the file is downloaded successfully, the `data` field will contain the downloaded data if `path` is None.
#[derive(Serialize, Debug)]
pub struct DownloadData {
#[serde(skip_serializing_if = "Vec::is_empty")]
pub data: Vec<u8>,
#[serde(skip_serializing_if = "Option::is_none")]
pub path: Option<PathBuf>,
#[serde(skip_serializing_if = "Option::is_none")]
pub total_size: Option<u64>,
pub downloaded_size: u64,
#[serde(skip_serializing_if = "Option::is_none")]
pub error: Option<String>,
}
struct Downloader {
data: Vec<u8>,
path: Option<PathBuf>,
// Some file may be empty, so we use Option<u64> to indicate if the size is known
total_size: Option<u64>,
downloaded_size: u64,
error: Option<String>,
finished: bool,
tx_cancel: UnboundedSender<()>,
}
// The caller should check if the file is downloaded successfully and remove the job from the map.
pub fn download_file(
url: String,
path: Option<PathBuf>,
auto_del_dur: Option<Duration>,
) -> ResultType<String> {
let id = url.clone();
// First pass: if a non-error downloader exists for this URL, reuse it.
// If an errored downloader exists, remove it so this call can retry.
let mut stale_path = None;
{
let mut downloaders = DOWNLOADERS.lock().unwrap();
if let Some(downloader) = downloaders.get(&id) {
if downloader.error.is_none() {
return Ok(id);
}
stale_path = downloader.path.clone();
downloaders.remove(&id);
}
}
if let Some(p) = stale_path {
if p.exists() {
if let Err(e) = std::fs::remove_file(&p) {
log::warn!("Failed to remove stale download file {}: {}", p.display(), e);
}
}
}
if let Some(path) = path.as_ref() {
if path.exists() {
bail!("File {} already exists", path.display());
}
if let Some(parent) = path.parent() {
std::fs::create_dir_all(parent)?;
}
}
let (tx, rx) = unbounded_channel();
let downloader = Downloader {
data: Vec::new(),
path: path.clone(),
total_size: None,
downloaded_size: 0,
error: None,
tx_cancel: tx,
finished: false,
};
// Second pass (atomic with insert) to avoid race with another concurrent caller.
let mut stale_path_after_check = None;
{
let mut downloaders = DOWNLOADERS.lock().unwrap();
if let Some(existing) = downloaders.get(&id) {
if existing.error.is_none() {
return Ok(id);
}
stale_path_after_check = existing.path.clone();
downloaders.remove(&id);
}
downloaders.insert(id.clone(), downloader);
}
if let Some(p) = stale_path_after_check {
if p.exists() {
if let Err(e) = std::fs::remove_file(&p) {
log::warn!("Failed to remove stale download file {}: {}", p.display(), e);
}
}
}
let id2 = id.clone();
std::thread::spawn(
move || match do_download(&id2, url, path, auto_del_dur, rx) {
Ok(is_all_downloaded) => {
let mut downloaded_size = 0;
let mut total_size = 0;
DOWNLOADERS.lock().unwrap().get_mut(&id2).map(|downloader| {
downloaded_size = downloader.downloaded_size;
total_size = downloader.total_size.unwrap_or(0);
});
log::info!(
"Download {} end, {}/{}, {:.2} %",
&id2,
downloaded_size,
total_size,
if total_size == 0 {
0.0
} else {
downloaded_size as f64 / total_size as f64 * 100.0
}
);
let is_canceled = !is_all_downloaded;
if is_canceled {
if let Some(downloader) = DOWNLOADERS.lock().unwrap().remove(&id2) {
if let Some(p) = downloader.path {
if p.exists() {
std::fs::remove_file(p).ok();
}
}
}
}
}
Err(e) => {
let err = e.to_string();
log::error!("Download {}, failed: {}", &id2, &err);
DOWNLOADERS.lock().unwrap().get_mut(&id2).map(|downloader| {
downloader.error = Some(err);
});
}
},
);
Ok(id)
}
#[tokio::main(flavor = "current_thread")]
async fn do_download(
id: &str,
url: String,
path: Option<PathBuf>,
auto_del_dur: Option<Duration>,
mut rx_cancel: UnboundedReceiver<()>,
) -> ResultType<bool> {
let client = create_http_client_async_with_url(&url).await;
let mut is_all_downloaded = false;
tokio::select! {
_ = rx_cancel.recv() => {
return Ok(is_all_downloaded);
}
head_resp = client.head(&url).send() => {
match head_resp {
Ok(resp) => {
if resp.status().is_success() {
let total_size = resp
.headers()
.get(reqwest::header::CONTENT_LENGTH)
.and_then(|ct_len| ct_len.to_str().ok())
.and_then(|ct_len| ct_len.parse::<u64>().ok());
let Some(total_size) = total_size else {
bail!("Failed to get content length");
};
DOWNLOADERS.lock().unwrap().get_mut(id).map(|downloader| {
downloader.total_size = Some(total_size);
});
} else {
bail!("Failed to get content length: {}", resp.status());
}
}
Err(e) => {
return Err(e.into());
}
}
}
}
let mut response;
tokio::select! {
_ = rx_cancel.recv() => {
return Ok(is_all_downloaded);
}
resp = client.get(url).send() => {
response = resp?;
}
}
let mut dest: Option<File> = None;
if let Some(p) = path {
dest = Some(File::create(p).await?);
}
loop {
tokio::select! {
_ = rx_cancel.recv() => {
break;
}
chunk = response.chunk() => {
match chunk {
Ok(Some(chunk)) => {
match dest {
Some(ref mut f) => {
f.write_all(&chunk).await?;
f.flush().await?;
DOWNLOADERS.lock().unwrap().get_mut(id).map(|downloader| {
downloader.downloaded_size += chunk.len() as u64;
});
}
None => {
DOWNLOADERS.lock().unwrap().get_mut(id).map(|downloader| {
downloader.data.extend_from_slice(&chunk);
downloader.downloaded_size += chunk.len() as u64;
});
}
}
}
Ok(None) => {
is_all_downloaded = true;
break;
},
Err(e) => {
log::error!("Download {} failed: {}", id, e);
return Err(e.into());
}
}
}
}
}
if let Some(mut f) = dest.take() {
f.flush().await?;
}
if let Some(ref mut downloader) = DOWNLOADERS.lock().unwrap().get_mut(id) {
downloader.finished = true;
}
if is_all_downloaded {
let id_del = id.to_string();
if let Some(dur) = auto_del_dur {
tokio::spawn(async move {
tokio::time::sleep(dur).await;
DOWNLOADERS.lock().unwrap().remove(&id_del);
});
}
}
Ok(is_all_downloaded)
}
pub fn get_download_data(id: &str) -> ResultType<DownloadData> {
let downloaders = DOWNLOADERS.lock().unwrap();
if let Some(downloader) = downloaders.get(id) {
let downloaded_size = downloader.downloaded_size;
let total_size = downloader.total_size.clone();
let error = downloader.error.clone();
let data = if total_size.unwrap_or(0) == downloaded_size && downloader.path.is_none() {
downloader.data.clone()
} else {
Vec::new()
};
let path = downloader.path.clone();
let download_data = DownloadData {
data,
path,
total_size,
downloaded_size,
error,
};
Ok(download_data)
} else {
bail!("Downloader not found")
}
}
pub fn cancel(id: &str) {
if let Some(downloader) = DOWNLOADERS.lock().unwrap().get(id) {
// downloader.is_canceled.store(true, Ordering::SeqCst);
// The receiver may not be able to receive the cancel signal, so we also set the atomic bool to true
let _ = downloader.tx_cancel.send(());
}
}
pub fn remove(id: &str) {
let _ = DOWNLOADERS.lock().unwrap().remove(id);
}
+336
View File
@@ -0,0 +1,336 @@
use hbb_common::{
async_recursion::async_recursion,
config::{Config, Socks5Server},
log::{self, info},
proxy::{Proxy, ProxyScheme},
tls::{
get_cached_tls_accept_invalid_cert, get_cached_tls_type, is_plain, upsert_tls_cache,
TlsType,
},
};
use reqwest::{blocking::Client as SyncClient, Client as AsyncClient};
macro_rules! configure_http_client {
($builder:expr, $tls_type:expr, $danger_accept_invalid_cert:expr, $Client: ty) => {{
// https://github.com/rustdesk/rustdesk/issues/11569
// https://docs.rs/reqwest/latest/reqwest/struct.ClientBuilder.html#method.no_proxy
let mut builder = $builder.no_proxy();
match $tls_type {
TlsType::Plain => {}
TlsType::NativeTls => {
builder = builder.use_native_tls();
if $danger_accept_invalid_cert {
builder = builder.danger_accept_invalid_certs(true);
}
}
TlsType::Rustls => {
#[cfg(any(target_os = "android", target_os = "ios"))]
match hbb_common::verifier::client_config($danger_accept_invalid_cert) {
Ok(client_config) => {
builder = builder.use_preconfigured_tls(client_config);
}
Err(e) => {
hbb_common::log::error!("Failed to get client config: {}", e);
}
}
#[cfg(not(any(target_os = "android", target_os = "ios")))]
{
builder = builder.use_rustls_tls();
if $danger_accept_invalid_cert {
builder = builder.danger_accept_invalid_certs(true);
}
}
}
}
let client = if let Some(conf) = Config::get_socks() {
let proxy_result = Proxy::from_conf(&conf, None);
match proxy_result {
Ok(proxy) => {
let proxy_setup = match &proxy.intercept {
ProxyScheme::Http { host, .. } => {
reqwest::Proxy::all(format!("http://{}", host))
}
ProxyScheme::Https { host, .. } => {
reqwest::Proxy::all(format!("https://{}", host))
}
ProxyScheme::Socks5 { addr, .. } => {
reqwest::Proxy::all(&format!("socks5://{}", addr))
}
};
match proxy_setup {
Ok(mut p) => {
if let Some(auth) = proxy.intercept.maybe_auth() {
if !auth.username().is_empty() && !auth.password().is_empty() {
p = p.basic_auth(auth.username(), auth.password());
}
}
builder = builder.proxy(p);
builder.build().unwrap_or_else(|e| {
info!("Failed to create a proxied client: {}", e);
<$Client>::new()
})
}
Err(e) => {
info!("Failed to set up proxy: {}", e);
<$Client>::new()
}
}
}
Err(e) => {
info!("Failed to configure proxy: {}", e);
<$Client>::new()
}
}
} else {
builder.build().unwrap_or_else(|e| {
info!("Failed to create a client: {}", e);
<$Client>::new()
})
};
client
}};
}
pub fn create_http_client(tls_type: TlsType, danger_accept_invalid_cert: bool) -> SyncClient {
let builder = SyncClient::builder();
configure_http_client!(builder, tls_type, danger_accept_invalid_cert, SyncClient)
}
pub fn create_http_client_async(
tls_type: TlsType,
danger_accept_invalid_cert: bool,
) -> AsyncClient {
let builder = AsyncClient::builder();
configure_http_client!(builder, tls_type, danger_accept_invalid_cert, AsyncClient)
}
pub fn get_url_for_tls<'a>(url: &'a str, proxy_conf: &'a Option<Socks5Server>) -> &'a str {
if is_plain(url) {
if let Some(conf) = proxy_conf {
if conf.proxy.starts_with("https://") {
return &conf.proxy;
}
}
}
url
}
pub fn create_http_client_with_url(url: &str) -> SyncClient {
let proxy_conf = Config::get_socks();
let tls_url = get_url_for_tls(url, &proxy_conf);
let tls_type = get_cached_tls_type(tls_url);
let is_tls_type_cached = tls_type.is_some();
let tls_type = tls_type.unwrap_or(TlsType::Rustls);
let tls_danger_accept_invalid_cert = get_cached_tls_accept_invalid_cert(tls_url);
create_http_client_with_url_(
url,
tls_url,
tls_type,
is_tls_type_cached,
tls_danger_accept_invalid_cert,
tls_danger_accept_invalid_cert,
)
}
fn create_http_client_with_url_(
url: &str,
tls_url: &str,
tls_type: TlsType,
is_tls_type_cached: bool,
danger_accept_invalid_cert: Option<bool>,
original_danger_accept_invalid_cert: Option<bool>,
) -> SyncClient {
let mut client = create_http_client(tls_type, danger_accept_invalid_cert.unwrap_or(false));
if is_tls_type_cached && original_danger_accept_invalid_cert.is_some() {
return client;
}
if let Err(e) = client.head(url).send() {
if e.is_request() {
match (tls_type, is_tls_type_cached, danger_accept_invalid_cert) {
(TlsType::Rustls, _, None) => {
log::warn!(
"Failed to connect to server {} with rustls-tls: {:?}, trying accept invalid cert",
tls_url,
e
);
client = create_http_client_with_url_(
url,
tls_url,
tls_type,
is_tls_type_cached,
Some(true),
original_danger_accept_invalid_cert,
);
}
(TlsType::Rustls, false, Some(_)) => {
log::warn!(
"Failed to connect to server {} with rustls-tls: {:?}, trying native-tls",
tls_url,
e
);
client = create_http_client_with_url_(
url,
tls_url,
TlsType::NativeTls,
is_tls_type_cached,
original_danger_accept_invalid_cert,
original_danger_accept_invalid_cert,
);
}
(TlsType::NativeTls, _, None) => {
log::warn!(
"Failed to connect to server {} with native-tls: {:?}, trying accept invalid cert",
tls_url,
e
);
client = create_http_client_with_url_(
url,
tls_url,
tls_type,
is_tls_type_cached,
Some(true),
original_danger_accept_invalid_cert,
);
}
_ => {
log::error!(
"Failed to connect to server {} with {:?}, err: {:?}.",
tls_url,
tls_type,
e
);
}
}
} else {
log::warn!(
"Failed to connect to server {} with {:?}, err: {}.",
tls_url,
tls_type,
e
);
}
} else {
log::info!(
"Successfully connected to server {} with {:?}",
tls_url,
tls_type
);
upsert_tls_cache(
tls_url,
tls_type,
danger_accept_invalid_cert.unwrap_or(false),
);
}
client
}
pub async fn create_http_client_async_with_url(url: &str) -> AsyncClient {
let proxy_conf = Config::get_socks();
let tls_url = get_url_for_tls(url, &proxy_conf);
let tls_type = get_cached_tls_type(tls_url);
let is_tls_type_cached = tls_type.is_some();
let tls_type = tls_type.unwrap_or(TlsType::Rustls);
let danger_accept_invalid_cert = get_cached_tls_accept_invalid_cert(tls_url);
create_http_client_async_with_url_(
url,
tls_url,
tls_type,
is_tls_type_cached,
danger_accept_invalid_cert,
danger_accept_invalid_cert,
)
.await
}
#[async_recursion]
async fn create_http_client_async_with_url_(
url: &str,
tls_url: &str,
tls_type: TlsType,
is_tls_type_cached: bool,
danger_accept_invalid_cert: Option<bool>,
original_danger_accept_invalid_cert: Option<bool>,
) -> AsyncClient {
let mut client =
create_http_client_async(tls_type, danger_accept_invalid_cert.unwrap_or(false));
if is_tls_type_cached && original_danger_accept_invalid_cert.is_some() {
return client;
}
if let Err(e) = client.head(url).send().await {
match (tls_type, is_tls_type_cached, danger_accept_invalid_cert) {
(TlsType::Rustls, _, None) => {
log::warn!(
"Failed to connect to server {} with rustls-tls: {:?}, trying accept invalid cert",
tls_url,
e
);
client = create_http_client_async_with_url_(
url,
tls_url,
tls_type,
is_tls_type_cached,
Some(true),
original_danger_accept_invalid_cert,
)
.await;
}
(TlsType::Rustls, false, Some(_)) => {
log::warn!(
"Failed to connect to server {} with rustls-tls: {:?}, trying native-tls",
tls_url,
e
);
client = create_http_client_async_with_url_(
url,
tls_url,
TlsType::NativeTls,
is_tls_type_cached,
original_danger_accept_invalid_cert,
original_danger_accept_invalid_cert,
)
.await;
}
(TlsType::NativeTls, _, None) => {
log::warn!(
"Failed to connect to server {} with native-tls: {:?}, trying accept invalid cert",
tls_url,
e
);
client = create_http_client_async_with_url_(
url,
tls_url,
tls_type,
is_tls_type_cached,
Some(true),
original_danger_accept_invalid_cert,
)
.await;
}
_ => {
log::error!(
"Failed to connect to server {} with {:?}, err: {:?}.",
tls_url,
tls_type,
e
);
}
}
} else {
log::info!(
"Successfully connected to server {} with {:?}",
tls_url,
tls_type
);
upsert_tls_cache(
tls_url,
tls_type,
danger_accept_invalid_cert.unwrap_or(false),
);
}
client
}
+211
View File
@@ -0,0 +1,211 @@
use crate::hbbs_http::create_http_client_with_url;
use bytes::Bytes;
use hbb_common::{bail, config::Config, lazy_static, log, ResultType};
use reqwest::blocking::{Body, Client};
use scrap::record::RecordState;
use serde::Serialize;
use serde_json::Map;
use std::{
fs::File,
io::{prelude::*, SeekFrom},
sync::{mpsc::Receiver, Arc, Mutex},
time::{Duration, Instant},
};
const MAX_HEADER_LEN: usize = 1024;
const SHOULD_SEND_TIME: Duration = Duration::from_secs(1);
const SHOULD_SEND_SIZE: u64 = 1024 * 1024;
lazy_static::lazy_static! {
static ref ENABLE: Arc<Mutex<bool>> = Default::default();
}
pub fn is_enable() -> bool {
ENABLE.lock().unwrap().clone()
}
pub fn run(rx: Receiver<RecordState>) {
std::thread::spawn(move || {
let api_server = crate::get_api_server(
Config::get_option("api-server"),
Config::get_option("custom-rendezvous-server"),
);
// This URL is used for TLS connectivity testing and fallback detection.
let login_option_url = format!("{}/api/login-options", &api_server);
let client = create_http_client_with_url(&login_option_url);
let mut uploader = RecordUploader {
client,
api_server,
filepath: Default::default(),
filename: Default::default(),
upload_size: Default::default(),
running: Default::default(),
last_send: Instant::now(),
};
loop {
if let Err(e) = match rx.recv() {
Ok(state) => match state {
RecordState::NewFile(filepath) => uploader.handle_new_file(filepath),
RecordState::NewFrame => {
if uploader.running {
uploader.handle_frame(false)
} else {
Ok(())
}
}
RecordState::WriteTail => {
if uploader.running {
uploader.handle_tail()
} else {
Ok(())
}
}
RecordState::RemoveFile => {
if uploader.running {
uploader.handle_remove()
} else {
Ok(())
}
}
},
Err(e) => {
log::trace!("upload thread stop: {}", e);
break;
}
} {
uploader.running = false;
log::error!("upload stop: {}", e);
}
}
});
}
struct RecordUploader {
client: Client,
api_server: String,
filepath: String,
filename: String,
upload_size: u64,
running: bool,
last_send: Instant,
}
impl RecordUploader {
fn send<Q, B>(&self, query: &Q, body: B) -> ResultType<()>
where
Q: Serialize + ?Sized,
B: Into<Body>,
{
match self
.client
.post(format!("{}/api/record", self.api_server))
.query(query)
.body(body)
.send()
{
Ok(resp) => {
if let Ok(m) = resp.json::<Map<String, serde_json::Value>>() {
if let Some(e) = m.get("error") {
bail!(e.to_string());
}
}
Ok(())
}
Err(e) => bail!(e.to_string()),
}
}
fn handle_new_file(&mut self, filepath: String) -> ResultType<()> {
match std::path::PathBuf::from(&filepath).file_name() {
Some(filename) => match filename.to_owned().into_string() {
Ok(filename) => {
self.filename = filename.clone();
self.filepath = filepath.clone();
self.upload_size = 0;
self.running = true;
self.last_send = Instant::now();
self.send(&[("type", "new"), ("file", &filename)], Bytes::new())?;
Ok(())
}
Err(_) => bail!("can't parse filename:{:?}", filename),
},
None => bail!("can't parse filepath:{}", filepath),
}
}
fn handle_frame(&mut self, flush: bool) -> ResultType<()> {
if !flush && self.last_send.elapsed() < SHOULD_SEND_TIME {
return Ok(());
}
match File::open(&self.filepath) {
Ok(mut file) => match file.metadata() {
Ok(m) => {
let len = m.len();
if len <= self.upload_size {
return Ok(());
}
if !flush && len - self.upload_size < SHOULD_SEND_SIZE {
return Ok(());
}
let mut buf = Vec::new();
match file.seek(SeekFrom::Start(self.upload_size)) {
Ok(_) => match file.read_to_end(&mut buf) {
Ok(length) => {
self.send(
&[
("type", "part"),
("file", &self.filename),
("offset", &self.upload_size.to_string()),
("length", &length.to_string()),
],
buf,
)?;
self.upload_size = len;
self.last_send = Instant::now();
Ok(())
}
Err(e) => bail!(e.to_string()),
},
Err(e) => bail!(e.to_string()),
}
}
Err(e) => bail!(e.to_string()),
},
Err(e) => bail!(e.to_string()),
}
}
fn handle_tail(&mut self) -> ResultType<()> {
self.handle_frame(true)?;
match File::open(&self.filepath) {
Ok(mut file) => {
let mut buf = vec![0u8; MAX_HEADER_LEN];
match file.read(&mut buf) {
Ok(length) => {
buf.truncate(length);
self.send(
&[
("type", "tail"),
("file", &self.filename),
("offset", "0"),
("length", &length.to_string()),
],
buf,
)?;
log::info!("upload success, file: {}", self.filename);
Ok(())
}
Err(e) => bail!(e.to_string()),
}
}
Err(e) => bail!(e.to_string()),
}
}
fn handle_remove(&mut self) -> ResultType<()> {
self.send(
&[("type", "remove"), ("file", &self.filename)],
Bytes::new(),
)?;
Ok(())
}
}
+325
View File
@@ -0,0 +1,325 @@
use std::{
collections::HashMap,
sync::{Arc, Mutex},
time::Duration,
};
#[cfg(not(any(target_os = "ios")))]
use crate::{ui_interface::get_builtin_option, Connection};
use hbb_common::{
config::{self, keys, Config, LocalConfig},
log,
tokio::{self, sync::broadcast, time::Instant},
};
use serde::{Deserialize, Serialize};
use serde_json::{json, Value};
// hello-agent local patch: lowered from 15s to 5s so device-online status in
// the admin UI reacts faster. Re-apply on vendor resync.
const TIME_HEARTBEAT: Duration = Duration::from_secs(1);
const UPLOAD_SYSINFO_TIMEOUT: Duration = Duration::from_secs(120);
const TIME_CONN: Duration = Duration::from_secs(3);
#[cfg(not(any(target_os = "ios")))]
lazy_static::lazy_static! {
static ref SENDER : Mutex<broadcast::Sender<Vec<i32>>> = Mutex::new(start_hbbs_sync());
static ref PRO: Arc<Mutex<bool>> = Default::default();
}
#[cfg(not(any(target_os = "ios")))]
pub fn start() {
let _sender = SENDER.lock().unwrap();
}
#[cfg(not(target_os = "ios"))]
pub fn signal_receiver() -> broadcast::Receiver<Vec<i32>> {
SENDER.lock().unwrap().subscribe()
}
#[cfg(not(any(target_os = "ios")))]
fn start_hbbs_sync() -> broadcast::Sender<Vec<i32>> {
let (tx, _rx) = broadcast::channel::<Vec<i32>>(16);
std::thread::spawn(move || start_hbbs_sync_async());
return tx;
}
#[derive(Debug, Serialize, Deserialize)]
pub struct StrategyOptions {
#[serde(default, skip_serializing_if = "HashMap::is_empty")]
pub config_options: HashMap<String, String>,
#[serde(default, skip_serializing_if = "HashMap::is_empty")]
pub extra: HashMap<String, String>,
}
struct InfoUploaded {
uploaded: bool,
url: String,
last_uploaded: Option<Instant>,
id: String,
username: Option<String>,
}
impl Default for InfoUploaded {
fn default() -> Self {
Self {
uploaded: false,
url: "".to_owned(),
last_uploaded: None,
id: "".to_owned(),
username: None,
}
}
}
impl InfoUploaded {
fn uploaded(url: String, id: String, username: String) -> Self {
Self {
uploaded: true,
url,
last_uploaded: None,
id,
username: Some(username),
}
}
}
#[cfg(not(any(target_os = "ios")))]
#[tokio::main(flavor = "current_thread")]
async fn start_hbbs_sync_async() {
let mut interval = crate::rustdesk_interval(tokio::time::interval_at(
Instant::now() + TIME_CONN,
TIME_CONN,
));
let mut last_sent: Option<Instant> = None;
let mut info_uploaded = InfoUploaded::default();
let mut sysinfo_ver = "".to_owned();
loop {
tokio::select! {
_ = interval.tick() => {
let url = heartbeat_url();
let id = Config::get_id();
if url.is_empty() {
*PRO.lock().unwrap() = false;
continue;
}
if config::option2bool("stop-service", &Config::get_option("stop-service")) {
continue;
}
let conns = Connection::alive_conns();
if info_uploaded.uploaded && (url != info_uploaded.url || id != info_uploaded.id) {
info_uploaded.uploaded = false;
*PRO.lock().unwrap() = false;
}
// For Windows:
// We can't skip uploading sysinfo when the username is empty, because the username may
// always be empty before login. We also need to upload the other sysinfo info.
//
// https://github.com/rustdesk/rustdesk/discussions/8031
// We still need to check the username after uploading sysinfo, because
// 1. The username may be empty when logining in, and it can be fetched after a while.
// In this case, we need to upload sysinfo again.
// 2. The username may be changed after uploading sysinfo, and we need to upload sysinfo again.
//
// The Windows session will switch to the last user session before the restart,
// so it may be able to get the username before login.
// But strangely, sometimes we can get the username before login,
// we may not be able to get the username before login after the next restart.
let mut v = crate::get_sysinfo();
let sys_username = v["username"].as_str().unwrap_or_default().to_string();
// Though the username comparison is only necessary on Windows,
// we still keep the comparison on other platforms for consistency.
let need_upload = (!info_uploaded.uploaded || info_uploaded.username.as_ref() != Some(&sys_username)) &&
info_uploaded.last_uploaded.map(|x| x.elapsed() >= UPLOAD_SYSINFO_TIMEOUT).unwrap_or(true);
if need_upload {
v["version"] = json!(crate::VERSION);
v["id"] = json!(id);
v["uuid"] = json!(crate::encode64(hbb_common::get_uuid()));
// Optional rebrand identity: `AGENT_NAME` / `AGENT_VERSION`
// are empty by default (vanilla rustdesk) and populated by
// OEM shells like hello-agent. We only stamp the field
// when set so older servers parsing the payload don't see
// empty strings they have to special-case.
let agent_name = config::AGENT_NAME.read().unwrap().clone();
if !agent_name.is_empty() {
v["agent_name"] = json!(agent_name);
}
let agent_version = config::AGENT_VERSION.read().unwrap().clone();
if !agent_version.is_empty() {
v["agent_version"] = json!(agent_version);
}
let ab_name = Config::get_option(keys::OPTION_PRESET_ADDRESS_BOOK_NAME);
if !ab_name.is_empty() {
v[keys::OPTION_PRESET_ADDRESS_BOOK_NAME] = json!(ab_name);
}
let ab_tag = Config::get_option(keys::OPTION_PRESET_ADDRESS_BOOK_TAG);
if !ab_tag.is_empty() {
v[keys::OPTION_PRESET_ADDRESS_BOOK_TAG] = json!(ab_tag);
}
let ab_alias = Config::get_option(keys::OPTION_PRESET_ADDRESS_BOOK_ALIAS);
if !ab_alias.is_empty() {
v[keys::OPTION_PRESET_ADDRESS_BOOK_ALIAS] = json!(ab_alias);
}
let ab_password = Config::get_option(keys::OPTION_PRESET_ADDRESS_BOOK_PASSWORD);
if !ab_password.is_empty() {
v[keys::OPTION_PRESET_ADDRESS_BOOK_PASSWORD] = json!(ab_password);
}
let ab_note = Config::get_option(keys::OPTION_PRESET_ADDRESS_BOOK_NOTE);
if !ab_note.is_empty() {
v[keys::OPTION_PRESET_ADDRESS_BOOK_NOTE] = json!(ab_note);
}
let username = get_builtin_option(keys::OPTION_PRESET_USERNAME);
if !username.is_empty() {
v[keys::OPTION_PRESET_USERNAME] = json!(username);
}
let strategy_name = get_builtin_option(keys::OPTION_PRESET_STRATEGY_NAME);
if !strategy_name.is_empty() {
v[keys::OPTION_PRESET_STRATEGY_NAME] = json!(strategy_name);
}
let device_group_name = get_builtin_option(keys::OPTION_PRESET_DEVICE_GROUP_NAME);
if !device_group_name.is_empty() {
v[keys::OPTION_PRESET_DEVICE_GROUP_NAME] = json!(device_group_name);
}
let device_username = Config::get_option(keys::OPTION_PRESET_DEVICE_USERNAME);
if !device_username.is_empty() {
v["username"] = json!(device_username);
}
let device_name = Config::get_option(keys::OPTION_PRESET_DEVICE_NAME);
if !device_name.is_empty() {
v["hostname"] = json!(device_name);
}
let note = Config::get_option(keys::OPTION_PRESET_NOTE);
if !note.is_empty() {
v[keys::OPTION_PRESET_NOTE] = json!(note);
}
let v = v.to_string();
let mut hash = "".to_owned();
if crate::is_public(&url) {
use sha2::{Digest, Sha256};
let mut hasher = Sha256::new();
hasher.update(url.as_bytes());
hasher.update(&v.as_bytes());
let res = hasher.finalize();
hash = hbb_common::base64::encode(&res[..]);
let old_hash = config::Status::get("sysinfo_hash");
let ver = config::Status::get("sysinfo_ver"); // sysinfo_ver is the version of sysinfo on server's side
if hash == old_hash {
// When the api doesn't exist, Ok("") will be returned in test.
let samever = match crate::post_request(url.replace("heartbeat", "sysinfo_ver"), "".to_owned(), "").await {
Ok(x) => {
sysinfo_ver = x.clone();
*PRO.lock().unwrap() = true;
x == ver
}
_ => {
false // to make sure Pro can be assigned in below post for old
// hbbs pro not supporting sysinfo_ver, use false for ensuring
}
};
if samever {
info_uploaded = InfoUploaded::uploaded(url.clone(), id.clone(), sys_username);
log::info!("sysinfo not changed, skip upload");
continue;
}
}
}
match crate::post_request(url.replace("heartbeat", "sysinfo"), v, "").await {
Ok(x) => {
if x == "SYSINFO_UPDATED" {
info_uploaded = InfoUploaded::uploaded(url.clone(), id.clone(), sys_username);
log::info!("sysinfo updated");
if !hash.is_empty() {
config::Status::set("sysinfo_hash", hash);
config::Status::set("sysinfo_ver", sysinfo_ver.clone());
}
*PRO.lock().unwrap() = true;
} else if x == "ID_NOT_FOUND" {
info_uploaded.last_uploaded = None; // next heartbeat will upload sysinfo again
} else {
info_uploaded.last_uploaded = Some(Instant::now());
}
}
_ => {
info_uploaded.last_uploaded = Some(Instant::now());
}
}
}
if conns.is_empty() && last_sent.map(|x| x.elapsed() < TIME_HEARTBEAT).unwrap_or(false) {
continue;
}
last_sent = Some(Instant::now());
let mut v = Value::default();
v["id"] = json!(id);
v["uuid"] = json!(crate::encode64(hbb_common::get_uuid()));
v["ver"] = json!(hbb_common::get_version_number(crate::VERSION));
if !conns.is_empty() {
v["conns"] = json!(conns);
}
let modified_at = LocalConfig::get_option("strategy_timestamp").parse::<i64>().unwrap_or(0);
v["modified_at"] = json!(modified_at);
if let Ok(s) = crate::post_request(url.clone(), v.to_string(), "").await {
if let Ok(mut rsp) = serde_json::from_str::<HashMap::<&str, Value>>(&s) {
if rsp.remove("sysinfo").is_some() {
info_uploaded.uploaded = false;
config::Status::set("sysinfo_hash", "".to_owned());
log::info!("sysinfo required to forcely update");
}
if let Some(conns) = rsp.remove("disconnect") {
if let Ok(conns) = serde_json::from_value::<Vec<i32>>(conns) {
SENDER.lock().unwrap().send(conns).ok();
}
}
if let Some(rsp_modified_at) = rsp.remove("modified_at") {
if let Ok(rsp_modified_at) = serde_json::from_value::<i64>(rsp_modified_at) {
if rsp_modified_at != modified_at {
LocalConfig::set_option("strategy_timestamp".to_string(), rsp_modified_at.to_string());
}
}
}
if let Some(strategy) = rsp.remove("strategy") {
if let Ok(strategy) = serde_json::from_value::<StrategyOptions>(strategy) {
log::info!("strategy updated");
handle_config_options(strategy.config_options);
}
}
}
}
}
}
}
}
fn heartbeat_url() -> String {
let url = crate::common::get_api_server(
Config::get_option("api-server"),
Config::get_option("custom-rendezvous-server"),
);
if url.is_empty() || crate::is_public(&url) {
return "".to_owned();
}
format!("{}/api/heartbeat", url)
}
fn handle_config_options(config_options: HashMap<String, String>) {
let mut options = Config::get_options();
let default_settings = config::DEFAULT_SETTINGS.read().unwrap().clone();
config_options
.iter()
.map(|(k, v)| {
// Priority: user config > default advanced options.
// Only when default advanced options are also empty, remove user option (fallback to built-in default);
// otherwise insert an empty value so user config remains present.
if v.is_empty() && default_settings.get(k).map_or("", |v| v).is_empty() {
options.remove(k);
} else {
options.insert(k.to_string(), v.to_string());
}
})
.count();
Config::set_options(options);
}
#[allow(unused)]
#[cfg(not(any(target_os = "ios")))]
pub fn is_pro() -> bool {
PRO.lock().unwrap().clone()
}