Implement remote execution
This commit is contained in:
+259
@@ -0,0 +1,259 @@
|
||||
//! PowerShell remote-exec worker.
|
||||
//!
|
||||
//! Subscribes to `librustdesk::hbbs_http::sync::exec_signal_receiver()` — a
|
||||
//! broadcast channel that the vendored sync loop populates whenever the
|
||||
//! server returns an `exec` field in a heartbeat reply (see
|
||||
//! rustdesk-server/docs/AGENT-API-AUTH.md). For each `ExecRequest` we:
|
||||
//!
|
||||
//! 1. Spawn `powershell.exe -NoProfile -NonInteractive -ExecutionPolicy
|
||||
//! Bypass -Command -` and write the script to stdin.
|
||||
//! 2. Concurrently drain stdout and stderr into 1 MiB-capped buffers.
|
||||
//! 3. Apply a wall-clock timeout (default 5 min); kill on expiry.
|
||||
//! 4. POST the result to `/api/agent/exec-result` with the same Ed25519
|
||||
//! signature the heartbeat / sysinfo posts use.
|
||||
//!
|
||||
//! The whole thing only makes sense on Windows (the agent's target OS),
|
||||
//! so the module body is `#[cfg(windows)]` and other platforms get a
|
||||
//! no-op `start()` to keep the call site in `service.rs` portable.
|
||||
|
||||
#[cfg(windows)]
|
||||
mod windows_impl {
|
||||
use anyhow::{anyhow, Result};
|
||||
use hbb_common::config::Config;
|
||||
use librustdesk::hbbs_http::sync::ExecRequest;
|
||||
use std::process::Stdio;
|
||||
use std::sync::{Arc, Mutex};
|
||||
use std::time::Duration;
|
||||
use tokio::io::{AsyncReadExt, AsyncWriteExt};
|
||||
|
||||
pub fn start() {
|
||||
std::thread::spawn(|| {
|
||||
let rt = match tokio::runtime::Builder::new_current_thread()
|
||||
.enable_all()
|
||||
.build()
|
||||
{
|
||||
Ok(rt) => rt,
|
||||
Err(e) => {
|
||||
log::warn!("exec worker: build runtime: {e}");
|
||||
return;
|
||||
}
|
||||
};
|
||||
rt.block_on(run_loop());
|
||||
});
|
||||
}
|
||||
|
||||
async fn run_loop() {
|
||||
// The vendored sync layer creates the broadcast channel lazily on
|
||||
// first `subscribe()`. Calling here also primes it for the parser.
|
||||
let mut rx = librustdesk::hbbs_http::sync::exec_signal_receiver();
|
||||
log::info!("exec worker: subscribed to heartbeat exec channel");
|
||||
loop {
|
||||
match rx.recv().await {
|
||||
Ok(req) => {
|
||||
log::info!(
|
||||
"exec worker: received cmd_id={} script_len={} max_secs={} max_bytes={}",
|
||||
req.cmd_id,
|
||||
req.script.len(),
|
||||
req.max_secs,
|
||||
req.max_bytes
|
||||
);
|
||||
let outcome = run_one(&req).await;
|
||||
if let Err(e) = report(&req, &outcome).await {
|
||||
log::warn!(
|
||||
"exec worker: report failed for cmd_id={}: {e:#}",
|
||||
req.cmd_id
|
||||
);
|
||||
}
|
||||
}
|
||||
Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => {
|
||||
log::warn!("exec worker: lagged, dropped {n} exec requests");
|
||||
}
|
||||
Err(tokio::sync::broadcast::error::RecvError::Closed) => {
|
||||
log::warn!("exec worker: channel closed, exiting");
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
struct Outcome {
|
||||
exit_code: i64,
|
||||
stdout: String,
|
||||
stderr: String,
|
||||
timed_out: bool,
|
||||
truncated: bool,
|
||||
}
|
||||
|
||||
async fn run_one(req: &ExecRequest) -> Outcome {
|
||||
// Defensive lower bound — a misconfigured server shouldn't be able to
|
||||
// send max_secs=0 and have us skip the wait.
|
||||
let timeout = Duration::from_secs(req.max_secs.max(1));
|
||||
let max_bytes = req.max_bytes.max(1024) as usize;
|
||||
|
||||
// `-Command -` makes PowerShell read the script body from stdin,
|
||||
// which avoids quoting / length issues that plague `-Command "…"`
|
||||
// for multi-line scripts. `-NoProfile` skips both the
|
||||
// machine-wide and user-wide profile loads — those would change
|
||||
// behaviour depending on which AD-managed PowerShell profile the
|
||||
// service account inherited. `-NonInteractive` makes prompts fail
|
||||
// instead of hanging the run.
|
||||
let mut child = match tokio::process::Command::new("powershell.exe")
|
||||
.args([
|
||||
"-NoProfile",
|
||||
"-NonInteractive",
|
||||
"-ExecutionPolicy",
|
||||
"Bypass",
|
||||
"-Command",
|
||||
"-",
|
||||
])
|
||||
.stdin(Stdio::piped())
|
||||
.stdout(Stdio::piped())
|
||||
.stderr(Stdio::piped())
|
||||
.spawn()
|
||||
{
|
||||
Ok(c) => c,
|
||||
Err(e) => {
|
||||
return Outcome {
|
||||
exit_code: -1,
|
||||
stdout: String::new(),
|
||||
stderr: format!("spawn failed: {e}"),
|
||||
timed_out: false,
|
||||
truncated: false,
|
||||
};
|
||||
}
|
||||
};
|
||||
|
||||
if let Some(mut stdin) = child.stdin.take() {
|
||||
let _ = stdin.write_all(req.script.as_bytes()).await;
|
||||
let _ = stdin.shutdown().await;
|
||||
}
|
||||
|
||||
let stdout = child.stdout.take().expect("piped stdout was requested");
|
||||
let stderr = child.stderr.take().expect("piped stderr was requested");
|
||||
|
||||
// Concurrent capped readers. Each task accumulates up to
|
||||
// `max_bytes` bytes, then drains and discards the rest so the
|
||||
// pipe doesn't block the child writer.
|
||||
let stdout_buf: Arc<Mutex<(Vec<u8>, bool)>> = Arc::new(Mutex::new((Vec::new(), false)));
|
||||
let stderr_buf: Arc<Mutex<(Vec<u8>, bool)>> = Arc::new(Mutex::new((Vec::new(), false)));
|
||||
let so = tokio::spawn(read_capped(stdout, stdout_buf.clone(), max_bytes));
|
||||
let se = tokio::spawn(read_capped(stderr, stderr_buf.clone(), max_bytes));
|
||||
|
||||
let wait_result = tokio::time::timeout(timeout, child.wait()).await;
|
||||
let (exit_code, timed_out) = match wait_result {
|
||||
Ok(Ok(s)) => (s.code().unwrap_or(-1) as i64, false),
|
||||
Ok(Err(_)) => (-1, false),
|
||||
Err(_) => {
|
||||
// Timed out: kill, then wait the killed child so it
|
||||
// reaps cleanly (and so the read tasks finish via EOF).
|
||||
let _ = child.kill().await;
|
||||
let _ = child.wait().await;
|
||||
(-1, true)
|
||||
}
|
||||
};
|
||||
let _ = so.await;
|
||||
let _ = se.await;
|
||||
|
||||
let (out_bytes, out_trunc) = {
|
||||
let g = stdout_buf.lock().unwrap();
|
||||
(g.0.clone(), g.1)
|
||||
};
|
||||
let (err_bytes, err_trunc) = {
|
||||
let g = stderr_buf.lock().unwrap();
|
||||
(g.0.clone(), g.1)
|
||||
};
|
||||
Outcome {
|
||||
exit_code,
|
||||
// PowerShell on a current Windows defaults to UTF-8 when
|
||||
// OutputEncoding is set, but the agent service inherits the
|
||||
// legacy code page on older boxes. `from_utf8_lossy`
|
||||
// guarantees we always have a UTF-8 string to ship; the
|
||||
// operator sees a U+FFFD when raw bytes weren't UTF-8.
|
||||
stdout: String::from_utf8_lossy(&out_bytes).into_owned(),
|
||||
stderr: String::from_utf8_lossy(&err_bytes).into_owned(),
|
||||
timed_out,
|
||||
truncated: out_trunc || err_trunc,
|
||||
}
|
||||
}
|
||||
|
||||
async fn read_capped<R: AsyncReadExt + Unpin>(
|
||||
mut reader: R,
|
||||
buf: Arc<Mutex<(Vec<u8>, bool)>>,
|
||||
cap: usize,
|
||||
) {
|
||||
let mut chunk = [0u8; 8192];
|
||||
loop {
|
||||
match reader.read(&mut chunk).await {
|
||||
Ok(0) => return,
|
||||
Ok(n) => {
|
||||
let mut g = buf.lock().unwrap();
|
||||
if g.0.len() < cap {
|
||||
let room = cap - g.0.len();
|
||||
if n <= room {
|
||||
g.0.extend_from_slice(&chunk[..n]);
|
||||
} else {
|
||||
g.0.extend_from_slice(&chunk[..room]);
|
||||
g.1 = true; // truncated; keep draining
|
||||
}
|
||||
}
|
||||
// else: already truncated, drop this chunk on the floor.
|
||||
}
|
||||
Err(_) => return,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn report(req: &ExecRequest, out: &Outcome) -> Result<()> {
|
||||
let api = librustdesk::common::get_api_server(
|
||||
Config::get_option("api-server"),
|
||||
Config::get_option("custom-rendezvous-server"),
|
||||
);
|
||||
if api.is_empty() {
|
||||
return Err(anyhow!("no api-server configured"));
|
||||
}
|
||||
let url = format!("{api}/api/agent/exec-result");
|
||||
let id = Config::get_id();
|
||||
let uuid = librustdesk::common::encode64(hbb_common::get_uuid());
|
||||
let body = hbb_common::serde_json::json!({
|
||||
"id": id,
|
||||
"uuid": uuid,
|
||||
"cmd_id": req.cmd_id,
|
||||
"exit_code": out.exit_code,
|
||||
"stdout": out.stdout,
|
||||
"stderr": out.stderr,
|
||||
"timed_out": out.timed_out,
|
||||
"truncated": out.truncated,
|
||||
})
|
||||
.to_string();
|
||||
|
||||
let headers = librustdesk::hbbs_http::sign::build_signed_headers(
|
||||
"POST",
|
||||
"/api/agent/exec-result",
|
||||
body.as_bytes(),
|
||||
)
|
||||
.unwrap_or_default();
|
||||
if headers.is_empty() {
|
||||
// Server rejects unsigned exec-result posts unconditionally
|
||||
// (see api/agent_exec.rs); bail loudly so the operator can
|
||||
// see the agent isn't ready to sign yet.
|
||||
return Err(anyhow!("no signing keypair available"));
|
||||
}
|
||||
|
||||
let resp = librustdesk::common::post_request(url, body, &headers)
|
||||
.await
|
||||
.map_err(|e| anyhow!("post: {e}"))?;
|
||||
if resp.trim() == "OK" {
|
||||
Ok(())
|
||||
} else {
|
||||
Err(anyhow!("unexpected response: {}", resp.trim()))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(windows)]
|
||||
pub use windows_impl::start;
|
||||
|
||||
#[cfg(not(windows))]
|
||||
pub fn start() {
|
||||
log::info!("exec worker: skipped (non-Windows build)");
|
||||
}
|
||||
+11
@@ -28,6 +28,8 @@ mod inventory;
|
||||
#[cfg(target_os = "windows")]
|
||||
mod cm_popup;
|
||||
#[cfg(target_os = "windows")]
|
||||
mod exec;
|
||||
#[cfg(target_os = "windows")]
|
||||
mod service;
|
||||
#[cfg(target_os = "windows")]
|
||||
mod unattended_password;
|
||||
@@ -278,6 +280,15 @@ fn run_server() {
|
||||
}
|
||||
});
|
||||
|
||||
// Start the PowerShell remote-exec worker. Subscribes to the
|
||||
// broadcast channel in the vendored sync layer; the channel is
|
||||
// shared in-process so the worker MUST run in this --server process
|
||||
// (where sync.rs lives), not the --service supervisor. The worker
|
||||
// is idle until an admin dispatches an exec from the dashboard.
|
||||
// Gated server-side on peer.managed=1 + strategy.enable-remote-exec.
|
||||
#[cfg(target_os = "windows")]
|
||||
exec::start();
|
||||
|
||||
// `start_server` is `#[tokio::main]` and runs forever. (is_server=true,
|
||||
// no_server=false). It boots the default IPC server, input service,
|
||||
// rendezvous mediator, and heartbeat sync.
|
||||
|
||||
Reference in New Issue
Block a user