fix(acp): prevent Windows terminal hangs from pipe orphans and .cmd shims

This commit is contained in:
xintaofei
2026-04-24 23:54:11 +08:00
parent 8405aa5dc8
commit 675f97af94
2 changed files with 68 additions and 27 deletions

View File

@@ -1,6 +1,7 @@
use std::collections::HashMap;
use std::process::Stdio;
use std::sync::Arc;
use std::time::Duration;
use sacp::schema::{
CreateTerminalRequest, CreateTerminalResponse, KillTerminalRequest, KillTerminalResponse,
@@ -9,9 +10,16 @@ use sacp::schema::{
};
use tokio::io::{AsyncRead, AsyncReadExt};
use tokio::sync::Mutex;
use tokio::task::JoinHandle;
type TerminalMap = HashMap<String, Arc<TerminalInstance>>;
const DEFAULT_OUTPUT_BYTE_LIMIT: u64 = 1_000_000;
/// After the child process exits, wait up to this long for the stdout/stderr
/// reader tasks to drain naturally before aborting them. Needed because a
/// grandchild process (e.g. Node spawned from a `.cmd` shim on Windows) can
/// inherit the pipe handle and keep it open long after the direct child
/// exits, turning `wait_for_exit` into a silent hang.
const READER_DRAIN_GRACE: Duration = Duration::from_millis(200);
#[derive(Debug)]
pub enum TerminalRuntimeError {
@@ -41,6 +49,7 @@ struct TerminalInstance {
output_limit: Option<usize>,
child: Mutex<Option<tokio::process::Child>>,
snapshot: Mutex<TerminalSnapshot>,
reader_handles: Mutex<Vec<JoinHandle<()>>>,
}
impl TerminalInstance {
@@ -50,6 +59,24 @@ impl TerminalInstance {
output_limit: output_limit.and_then(|v| usize::try_from(v).ok()),
child: Mutex::new(Some(child)),
snapshot: Mutex::new(TerminalSnapshot::default()),
reader_handles: Mutex::new(Vec::new()),
}
}
/// Wait briefly for stdout/stderr reader tasks to finish; abort any that
/// remain. Must be called after the direct child has already exited —
/// otherwise we would abort readers that are still making progress.
async fn drain_readers(&self) {
let handles: Vec<JoinHandle<()>> =
std::mem::take(&mut *self.reader_handles.lock().await);
for handle in handles {
let abort = handle.abort_handle();
if tokio::time::timeout(READER_DRAIN_GRACE, handle)
.await
.is_err()
{
abort.abort();
}
}
}
@@ -105,11 +132,10 @@ impl TerminalInstance {
async fn wait_for_exit(&self) -> Result<TerminalExitStatus, TerminalRuntimeError> {
self.refresh_exit_status().await?;
{
let snapshot = self.snapshot.lock().await;
if let Some(exit_status) = snapshot.exit_status.clone() {
return Ok(exit_status);
}
let cached_exit = self.snapshot.lock().await.exit_status.clone();
if let Some(exit_status) = cached_exit {
self.drain_readers().await;
return Ok(exit_status);
}
let exit_status = {
@@ -128,6 +154,8 @@ impl TerminalInstance {
map_exit_status(status)
};
self.drain_readers().await;
let mut snapshot = self.snapshot.lock().await;
snapshot.exit_status = Some(exit_status.clone());
Ok(exit_status)
@@ -135,11 +163,10 @@ impl TerminalInstance {
async fn kill_command(&self) -> Result<(), TerminalRuntimeError> {
self.refresh_exit_status().await?;
{
let snapshot = self.snapshot.lock().await;
if snapshot.exit_status.is_some() {
return Ok(());
}
let already_exited = self.snapshot.lock().await.exit_status.is_some();
if already_exited {
self.drain_readers().await;
return Ok(());
}
let exit_status = {
@@ -163,6 +190,8 @@ impl TerminalInstance {
map_exit_status(status)
};
self.drain_readers().await;
let mut snapshot = self.snapshot.lock().await;
snapshot.exit_status = Some(exit_status);
Ok(())
@@ -246,18 +275,23 @@ impl TerminalRuntime {
child,
));
let mut handles: Vec<JoinHandle<()>> = Vec::new();
if let Some(reader) = stdout {
let terminal_ref = terminal.clone();
tokio::spawn(async move {
handles.push(tokio::spawn(async move {
read_stream(reader, terminal_ref).await;
});
}));
}
if let Some(reader) = stderr {
let terminal_ref = terminal.clone();
tokio::spawn(async move {
handles.push(tokio::spawn(async move {
read_stream(reader, terminal_ref).await;
});
}));
}
if !handles.is_empty() {
terminal.reader_handles.lock().await.extend(handles);
}
self.terminals