feat: stream real-time progress for agent SDK install/upgrade/uninstall

Replace the spinner-only UX with live log output during agent SDK
operations, matching the existing OpenCode plugin install experience.

Backend: emit structured events (started/log/completed/failed) via
EventEmitter during npm install and binary download. npm commands now
run with piped stdio for line-by-line streaming; binary downloads
report chunked progress every 1 MB.

Frontend: subscribe to `app://agent-install` events through a new
`useAgentInstallStream` hook and render a theme-aware log terminal
below the preflight checks panel.

Also fixes the install log container in both agent settings and the
OpenCode plugins modal: auto-scroll no longer shifts the outer page,
and colours now follow the active light/dark theme.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
xintaofei
2026-04-12 21:43:54 +08:00
parent 6c69f432b9
commit a763adaf36
10 changed files with 541 additions and 118 deletions

View File

@@ -188,29 +188,30 @@ fn parse_version_parts(input: &str) -> Vec<u32> {
.collect()
}
/// Ensure a binary agent is available locally.
/// Returns the absolute path to the executable.
pub async fn ensure_binary_for_agent(
/// Same as `ensure_binary_for_agent` but calls `on_progress` with human-readable
/// status messages during download / extraction.
pub async fn ensure_binary_for_agent_with_progress(
agent_type: AgentType,
version: &str,
archive_url: &str,
cmd_name: &str,
on_progress: impl Fn(&str),
) -> Result<PathBuf, AcpError> {
if let Some(path) = find_cached_binary_for_agent(agent_type, version, cmd_name)? {
on_progress("Binary already cached, skipping download");
return Ok(path);
}
let agent_id = agent_cache_key(agent_type);
ensure_binary(&agent_id, version, archive_url, cmd_name).await
ensure_binary_with_progress(&agent_id, version, archive_url, cmd_name, on_progress).await
}
/// Ensure a binary is available for a specific cache key.
/// Returns the absolute path to the executable.
pub async fn ensure_binary(
async fn ensure_binary_with_progress(
agent_id: &str,
version: &str,
archive_url: &str,
cmd_name: &str,
on_progress: impl Fn(&str),
) -> Result<PathBuf, AcpError> {
if let Some(path) = find_cached_binary(agent_id, version, cmd_name)? {
return Ok(path);
@@ -236,12 +237,14 @@ pub async fn ensure_binary(
let result: Result<PathBuf, AcpError> = async {
let archive_path = tmp_dir.join("archive");
download_file(archive_url, &archive_path).await?;
on_progress(&format!("Downloading {archive_url}"));
download_file_with_progress(archive_url, &archive_path, &on_progress).await?;
let extract_dir = tmp_dir.join("extracted");
std::fs::create_dir_all(&extract_dir)
.map_err(|e| AcpError::DownloadFailed(format!("failed to create extract dir: {e}")))?;
on_progress("Extracting archive...");
if archive_url.ends_with(".tar.gz") || archive_url.ends_with(".tgz") {
extract_tar_gz(&archive_path, &extract_dir)?;
} else if archive_url.ends_with(".tar.bz2") || archive_url.ends_with(".tbz2") {
@@ -255,6 +258,7 @@ pub async fn ensure_binary(
}
// Find the binary in extracted files and move to final location.
on_progress("Locating binary...");
let extracted_bin = find_binary_recursive(&extract_dir, &bin_name).ok_or_else(|| {
AcpError::DownloadFailed(format!("binary '{bin_name}' not found in archive"))
})?;
@@ -270,6 +274,7 @@ pub async fn ensure_binary(
));
}
set_executable_permissions(&final_path)?;
on_progress("Binary installed successfully");
Ok(final_path)
}
.await;
@@ -313,7 +318,13 @@ pub(crate) fn find_binary_recursive(dir: &PathBuf, name: &str) -> Option<PathBuf
None
}
async fn download_file(url: &str, dest: &PathBuf) -> Result<(), AcpError> {
async fn download_file_with_progress(
url: &str,
dest: &PathBuf,
on_progress: &impl Fn(&str),
) -> Result<(), AcpError> {
use futures_util::StreamExt;
let response = reqwest::Client::new()
.get(url)
.send()
@@ -327,13 +338,43 @@ async fn download_file(url: &str, dest: &PathBuf) -> Result<(), AcpError> {
)));
}
let bytes = response
.bytes()
.await
.map_err(|e| AcpError::DownloadFailed(format!("failed to read response: {e}")))?;
let total_size = response.content_length();
let mut downloaded: u64 = 0;
let mut last_reported_mb: u64 = 0;
let mut stream = response.bytes_stream();
let mut file = std::fs::File::create(dest)
.map_err(|e| AcpError::DownloadFailed(format!("failed to create archive file: {e}")))?;
std::fs::write(dest, &bytes)
.map_err(|e| AcpError::DownloadFailed(format!("failed to write archive: {e}")))?;
use std::io::Write;
while let Some(chunk) = stream.next().await {
let chunk =
chunk.map_err(|e| AcpError::DownloadFailed(format!("failed to read chunk: {e}")))?;
file.write_all(&chunk)
.map_err(|e| AcpError::DownloadFailed(format!("failed to write archive: {e}")))?;
downloaded += chunk.len() as u64;
// Report progress every 1MB
let current_mb = downloaded / (1024 * 1024);
if current_mb > last_reported_mb {
last_reported_mb = current_mb;
if let Some(total) = total_size {
let total_mb = total as f64 / (1024.0 * 1024.0);
on_progress(&format!(
"Downloading... {current_mb:.0} MB / {total_mb:.1} MB"
));
} else {
on_progress(&format!("Downloading... {current_mb:.0} MB"));
}
}
}
if let Some(total) = total_size {
let total_mb = total as f64 / (1024.0 * 1024.0);
on_progress(&format!("Download complete ({total_mb:.1} MB)"));
} else {
let final_mb = downloaded as f64 / (1024.0 * 1024.0);
on_progress(&format!("Download complete ({final_mb:.1} MB)"));
}
Ok(())
}

View File

@@ -46,6 +46,41 @@ fn emit_acp_agents_updated(
);
}
const AGENT_INSTALL_EVENT: &str = "app://agent-install";
#[derive(Debug, Clone, Serialize)]
#[serde(rename_all = "snake_case")]
pub(crate) enum AgentInstallEventKind {
Started,
Log,
Completed,
Failed,
}
#[derive(Debug, Clone, Serialize)]
pub(crate) struct AgentInstallEvent {
pub task_id: String,
pub kind: AgentInstallEventKind,
pub payload: String,
}
fn emit_agent_install_event(
emitter: &EventEmitter,
task_id: &str,
kind: AgentInstallEventKind,
payload: impl Into<String>,
) {
crate::web::event_bridge::emit_event(
emitter,
AGENT_INSTALL_EVENT,
AgentInstallEvent {
task_id: task_id.to_string(),
kind,
payload: payload.into(),
},
);
}
fn is_version_like(value: &str) -> bool {
value.chars().any(|c| c.is_ascii_digit()) && value.contains('.')
}
@@ -215,46 +250,130 @@ async fn detect_local_version(agent_type: AgentType) -> Option<String> {
/// may not have synced niche packages like `@agentclientprotocol/*`.
const NPM_OFFICIAL_REGISTRY: &str = "https://registry.npmjs.org";
async fn install_npm_global_package(package: &str) -> Result<(), AcpError> {
/// Run an npm command with piped stdout/stderr, streaming each line as a log event.
/// Returns (success: bool, collected_stderr: String) so callers can inspect errors.
async fn run_npm_streaming(
args: &[&str],
task_id: &str,
emitter: &EventEmitter,
) -> Result<(bool, String), AcpError> {
use tokio::io::{AsyncBufReadExt, BufReader};
let mut cmd = crate::process::tokio_command("npm");
for arg in args {
cmd.arg(arg);
}
cmd.stdout(std::process::Stdio::piped())
.stderr(std::process::Stdio::piped());
let mut child = cmd.spawn().map_err(|e| {
AcpError::protocol(format!("failed to spawn npm: {e}"))
})?;
let stdout = child.stdout.take();
let stderr = child.stderr.take();
let emitter_clone = emitter.clone();
let task_id_owned = task_id.to_string();
let stdout_handle = tokio::spawn({
let emitter = emitter_clone.clone();
let task_id = task_id_owned.clone();
async move {
if let Some(out) = stdout {
let reader = BufReader::new(out);
let mut lines = reader.lines();
while let Ok(Some(line)) = lines.next_line().await {
emit_agent_install_event(
&emitter, &task_id, AgentInstallEventKind::Log, &line,
);
}
}
}
});
let stderr_handle = tokio::spawn({
let emitter = emitter_clone;
let task_id = task_id_owned;
async move {
let mut collected = String::new();
if let Some(err) = stderr {
let reader = BufReader::new(err);
let mut lines = reader.lines();
while let Ok(Some(line)) = lines.next_line().await {
emit_agent_install_event(
&emitter, &task_id, AgentInstallEventKind::Log, &line,
);
if !collected.is_empty() {
collected.push('\n');
}
collected.push_str(&line);
}
}
collected
}
});
let (_, stderr_result) = tokio::join!(stdout_handle, stderr_handle);
let collected_stderr = stderr_result.unwrap_or_default();
let status = child.wait().await.map_err(|e| {
AcpError::protocol(format!("failed to wait for npm process: {e}"))
})?;
Ok((status.success(), collected_stderr))
}
async fn install_npm_global_package_streaming(
package: &str,
task_id: &str,
emitter: &EventEmitter,
) -> Result<(), AcpError> {
let registry_arg = format!("--registry={NPM_OFFICIAL_REGISTRY}");
let output = crate::process::tokio_command("npm")
.arg("install")
.arg("-g")
.arg(&registry_arg)
.arg(package)
.output()
.await
.map_err(|e| AcpError::protocol(format!("failed to run npm install -g: {e}")))?;
emit_agent_install_event(
emitter, task_id, AgentInstallEventKind::Log,
format!("$ npm install -g {package}"),
);
if !output.status.success() {
let stderr = String::from_utf8_lossy(&output.stderr);
let (success, stderr) = run_npm_streaming(
&["install", "-g", &registry_arg, package],
task_id,
emitter,
).await?;
if !success {
// EACCES: permission denied — retry with a user-local --prefix so
// we don't require root/sudo on macOS / Linux.
// Check EACCES first: an EEXIST error message may also contain EACCES
// context, and the --force retry would fail again without the prefix
// fallback.
if stderr.contains("EACCES") {
return install_npm_to_user_prefix(package, &registry_arg).await;
emit_agent_install_event(
emitter, task_id, AgentInstallEventKind::Log,
"Permission denied, retrying with user prefix...",
);
return install_npm_to_user_prefix_streaming(
package, &registry_arg, task_id, emitter,
).await;
}
// EEXIST: file conflict — retry with --force to overwrite
if stderr.contains("EEXIST") {
let retry = crate::process::tokio_command("npm")
.arg("install")
.arg("-g")
.arg("--force")
.arg(&registry_arg)
.arg(package)
.output()
.await
.map_err(|e| AcpError::protocol(format!("failed to run npm install -g --force: {e}")))?;
if !retry.status.success() {
let retry_stderr = String::from_utf8_lossy(&retry.stderr);
// The --force retry itself may fail with EACCES on systems
// where the global prefix is not writable.
emit_agent_install_event(
emitter, task_id, AgentInstallEventKind::Log,
"File conflict, retrying with --force...",
);
let (retry_success, retry_stderr) = run_npm_streaming(
&["install", "-g", "--force", &registry_arg, package],
task_id,
emitter,
).await?;
if !retry_success {
if retry_stderr.contains("EACCES") {
return install_npm_to_user_prefix(package, &registry_arg).await;
emit_agent_install_event(
emitter, task_id, AgentInstallEventKind::Log,
"Permission denied on --force retry, falling back to user prefix...",
);
return install_npm_to_user_prefix_streaming(
package, &registry_arg, task_id, emitter,
).await;
}
let err = retry_stderr.trim().to_string();
let msg = if err.is_empty() {
@@ -281,7 +400,12 @@ async fn install_npm_global_package(package: &str) -> Result<(), AcpError> {
/// Fallback: install an npm package into a user-local prefix (`~/.codeg/npm-global/`)
/// when the system global prefix is not writable (EACCES).
async fn install_npm_to_user_prefix(package: &str, registry_arg: &str) -> Result<(), AcpError> {
async fn install_npm_to_user_prefix_streaming(
package: &str,
registry_arg: &str,
task_id: &str,
emitter: &EventEmitter,
) -> Result<(), AcpError> {
let prefix = crate::process::user_npm_prefix().ok_or_else(|| {
AcpError::protocol(
"npm install -g failed with EACCES and could not determine home directory for fallback"
@@ -298,41 +422,33 @@ async fn install_npm_to_user_prefix(package: &str, registry_arg: &str) -> Result
})?;
let prefix_arg = format!("--prefix={}", prefix.display());
let output = crate::process::tokio_command("npm")
.arg("install")
.arg("-g")
.arg(&prefix_arg)
.arg(registry_arg)
.arg(package)
.output()
.await
.map_err(|e| {
AcpError::protocol(format!("failed to run npm install -g with user prefix: {e}"))
})?;
if !output.status.success() {
let stderr = String::from_utf8_lossy(&output.stderr);
emit_agent_install_event(
emitter, task_id, AgentInstallEventKind::Log,
format!("$ npm install -g --prefix={} {package}", prefix.display()),
);
let (success, stderr) = run_npm_streaming(
&["install", "-g", &prefix_arg, registry_arg, package],
task_id,
emitter,
).await?;
if !success {
// EEXIST in the user prefix: retry with --force to overwrite stale files
// from a previous installation.
if stderr.contains("EEXIST") {
let force_retry = crate::process::tokio_command("npm")
.arg("install")
.arg("-g")
.arg("--force")
.arg(&prefix_arg)
.arg(registry_arg)
.arg(package)
.output()
.await
.map_err(|e| {
AcpError::protocol(format!(
"failed to run npm install -g --force with user prefix: {e}"
))
})?;
if !force_retry.status.success() {
let err = String::from_utf8_lossy(&force_retry.stderr)
.trim()
.to_string();
emit_agent_install_event(
emitter, task_id, AgentInstallEventKind::Log,
"File conflict in user prefix, retrying with --force...",
);
let (force_success, force_stderr) = run_npm_streaming(
&["install", "-g", "--force", &prefix_arg, registry_arg, package],
task_id,
emitter,
).await?;
if !force_success {
let err = force_stderr.trim().to_string();
let msg = if err.is_empty() {
format!(
"failed to install npm package (user prefix {}, --force)",
@@ -2448,10 +2564,13 @@ pub async fn acp_update_agent_config(
pub(crate) async fn acp_download_agent_binary_core(
agent_type: AgentType,
task_id: String,
emitter: &EventEmitter,
) -> Result<(), AcpError> {
emit_agent_install_event(emitter, &task_id, AgentInstallEventKind::Started, "");
let meta = registry::get_agent_meta(agent_type);
match meta.distribution {
let result = match meta.distribution {
registry::AgentDistribution::Binary {
version,
cmd,
@@ -2469,25 +2588,55 @@ pub(crate) async fn acp_download_agent_binary_core(
))
})?;
let _ = binary_cache::ensure_binary_for_agent(agent_type, version, fallback.url, cmd)
.await?;
emit_agent_install_event(
emitter, &task_id, AgentInstallEventKind::Log,
format!("Downloading {} v{version} for {platform}", meta.name),
);
let emitter_clone = emitter.clone();
let task_id_clone = task_id.clone();
let _ = binary_cache::ensure_binary_for_agent_with_progress(
agent_type, version, fallback.url, cmd,
move |msg| {
emit_agent_install_event(
&emitter_clone, &task_id_clone, AgentInstallEventKind::Log, msg,
);
},
)
.await?;
emit_acp_agents_updated(emitter, "binary_downloaded", Some(agent_type));
Ok(())
}
registry::AgentDistribution::Npx { .. } => Err(
AcpError::protocol("download is only supported for binary agents"),
),
};
match &result {
Ok(()) => {
emit_agent_install_event(
emitter, &task_id, AgentInstallEventKind::Completed,
format!("{} installed successfully", meta.name),
);
}
Err(e) => {
emit_agent_install_event(
emitter, &task_id, AgentInstallEventKind::Failed, e.to_string(),
);
}
}
result
}
#[cfg(feature = "tauri-runtime")]
#[cfg_attr(feature = "tauri-runtime", tauri::command)]
pub async fn acp_download_agent_binary(
agent_type: AgentType,
task_id: String,
app: tauri::AppHandle,
) -> Result<(), AcpError> {
let emitter = EventEmitter::Tauri(app);
acp_download_agent_binary_core(agent_type, &emitter).await
acp_download_agent_binary_core(agent_type, task_id, &emitter).await
}
pub(crate) async fn acp_detect_agent_local_version_core(
@@ -2525,11 +2674,14 @@ pub async fn acp_detect_agent_local_version(
pub(crate) async fn acp_prepare_npx_agent_core(
agent_type: AgentType,
registry_version: Option<String>,
task_id: String,
db: &AppDatabase,
emitter: &EventEmitter,
) -> Result<String, AcpError> {
emit_agent_install_event(emitter, &task_id, AgentInstallEventKind::Started, "");
let meta = registry::get_agent_meta(agent_type);
match meta.distribution {
let result = match meta.distribution {
registry::AgentDistribution::Npx { package, .. } => {
let default = agent_setting_service::AgentDefaultInput {
agent_type,
@@ -2546,7 +2698,16 @@ pub(crate) async fn acp_prepare_npx_agent_core(
.flatten()
.and_then(|m| m.installed_version);
install_npm_global_package(package).await?;
emit_agent_install_event(
emitter, &task_id, AgentInstallEventKind::Log,
format!("Installing {} ({package})", meta.name),
);
install_npm_global_package_streaming(package, &task_id, emitter).await?;
emit_agent_install_event(
emitter, &task_id, AgentInstallEventKind::Log,
"Detecting installed version...",
);
let resolved = detect_local_version(agent_type)
.await
.or_else(|| version_from_package_spec(package))
@@ -2575,7 +2736,22 @@ pub(crate) async fn acp_prepare_npx_agent_core(
registry::AgentDistribution::Binary { .. } => Err(AcpError::protocol(
"prepare is only supported for npx agents",
)),
};
match &result {
Ok(version) => {
emit_agent_install_event(
emitter, &task_id, AgentInstallEventKind::Completed,
format!("{} v{version} installed successfully", meta.name),
);
}
Err(e) => {
emit_agent_install_event(
emitter, &task_id, AgentInstallEventKind::Failed, e.to_string(),
);
}
}
result
}
#[cfg(feature = "tauri-runtime")]
@@ -2583,44 +2759,72 @@ pub(crate) async fn acp_prepare_npx_agent_core(
pub async fn acp_prepare_npx_agent(
agent_type: AgentType,
registry_version: Option<String>,
task_id: String,
db: State<'_, AppDatabase>,
app: tauri::AppHandle,
) -> Result<String, AcpError> {
let emitter = EventEmitter::Tauri(app);
acp_prepare_npx_agent_core(agent_type, registry_version, &db, &emitter).await
acp_prepare_npx_agent_core(agent_type, registry_version, task_id, &db, &emitter).await
}
pub(crate) async fn acp_uninstall_agent_core(
agent_type: AgentType,
task_id: String,
db: &AppDatabase,
emitter: &EventEmitter,
) -> Result<(), AcpError> {
emit_agent_install_event(emitter, &task_id, AgentInstallEventKind::Started, "");
let meta = registry::get_agent_meta(agent_type);
match meta.distribution {
registry::AgentDistribution::Binary { .. } => {
binary_cache::clear_agent_cache(agent_type)?;
emit_agent_install_event(
emitter, &task_id, AgentInstallEventKind::Log,
format!("Uninstalling {}...", meta.name),
);
let result: Result<(), AcpError> = async {
match meta.distribution {
registry::AgentDistribution::Binary { .. } => {
binary_cache::clear_agent_cache(agent_type)?;
}
registry::AgentDistribution::Npx { package, .. } => {
uninstall_npm_global_package(package).await?;
}
}
registry::AgentDistribution::Npx { package, .. } => {
uninstall_npm_global_package(package).await?;
agent_setting_service::set_installed_version(&db.conn, agent_type, None)
.await
.map_err(|e| AcpError::protocol(e.to_string()))?;
emit_acp_agents_updated(emitter, "agent_uninstalled", Some(agent_type));
Ok(())
}
.await;
match &result {
Ok(()) => {
emit_agent_install_event(
emitter, &task_id, AgentInstallEventKind::Completed,
format!("{} uninstalled successfully", meta.name),
);
}
Err(e) => {
emit_agent_install_event(
emitter, &task_id, AgentInstallEventKind::Failed, e.to_string(),
);
}
}
agent_setting_service::set_installed_version(&db.conn, agent_type, None)
.await
.map_err(|e| AcpError::protocol(e.to_string()))?;
emit_acp_agents_updated(emitter, "agent_uninstalled", Some(agent_type));
Ok(())
result
}
#[cfg(feature = "tauri-runtime")]
#[cfg_attr(feature = "tauri-runtime", tauri::command)]
pub async fn acp_uninstall_agent(
agent_type: AgentType,
task_id: String,
db: State<'_, AppDatabase>,
app: tauri::AppHandle,
) -> Result<(), AcpError> {
let emitter = EventEmitter::Tauri(app);
acp_uninstall_agent_core(agent_type, &db, &emitter).await
acp_uninstall_agent_core(agent_type, task_id, &db, &emitter).await
}
pub(crate) async fn acp_reorder_agents_core(

View File

@@ -50,6 +50,9 @@ pub enum EventEmitter {
#[cfg(feature = "tauri-runtime")]
Tauri(tauri::AppHandle),
WebOnly(Arc<WebEventBroadcaster>),
/// Silent no-op emitter — drops all events. Used when streaming progress
/// is not needed (e.g. legacy non-streaming call paths).
Noop,
}
/// Unified event emission: sends to both Tauri webview and Web clients (if applicable).
@@ -70,5 +73,6 @@ pub fn emit_event(
EventEmitter::WebOnly(broadcaster) => {
broadcaster.send(event, &payload);
}
EventEmitter::Noop => {}
}
}

View File

@@ -459,12 +459,19 @@ pub async fn acp_update_agent_config(
Ok(Json(()))
}
#[derive(Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct AcpDownloadAgentBinaryParams {
pub agent_type: AgentType,
pub task_id: String,
}
pub async fn acp_download_agent_binary(
Extension(state): Extension<Arc<AppState>>,
Json(params): Json<AgentTypeParams>,
Json(params): Json<AcpDownloadAgentBinaryParams>,
) -> Result<Json<()>, AppCommandError> {
let emitter = state.emitter.clone();
acp_commands::acp_download_agent_binary_core(params.agent_type, &emitter)
acp_commands::acp_download_agent_binary_core(params.agent_type, params.task_id, &emitter)
.await
.map_err(|e| AppCommandError::task_execution_failed(e.to_string()))?;
Ok(Json(()))
@@ -487,6 +494,7 @@ pub async fn acp_detect_agent_local_version(
pub struct AcpPrepareNpxAgentParams {
pub agent_type: AgentType,
pub registry_version: Option<String>,
pub task_id: String,
}
pub async fn acp_prepare_npx_agent(
@@ -498,6 +506,7 @@ pub async fn acp_prepare_npx_agent(
let result = acp_commands::acp_prepare_npx_agent_core(
params.agent_type,
params.registry_version,
params.task_id,
db,
&emitter,
)
@@ -506,13 +515,20 @@ pub async fn acp_prepare_npx_agent(
Ok(Json(result))
}
#[derive(Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct AcpUninstallAgentParams {
pub agent_type: AgentType,
pub task_id: String,
}
pub async fn acp_uninstall_agent(
Extension(state): Extension<Arc<AppState>>,
Json(params): Json<AgentTypeParams>,
Json(params): Json<AcpUninstallAgentParams>,
) -> Result<Json<()>, AppCommandError> {
let db = &state.db;
let emitter = state.emitter.clone();
acp_commands::acp_uninstall_agent_core(params.agent_type, db, &emitter)
acp_commands::acp_uninstall_agent_core(params.agent_type, params.task_id, db, &emitter)
.await
.map_err(|e| AppCommandError::task_execution_failed(e.to_string()))?;
Ok(Json(()))