fix(acp): harden session-page connection and localize backend errors
- Session-page connect never triggers download/install; returns SdkNotInstalled immediately and prompts the user to install from Agent Settings instead - Binary agents now accept any cached version via find_best_cached_binary_for_agent so stale caches still connect - Bound Initialize handshake with a 60s timeout and convert it to AcpError::InitializeTimeout via a sentinel in run_connection - Spawn background task owns ConnectionManager map insertion and removes the entry on exit through an RAII guard that survives panics, preventing leaked stale entries - AcpError gains SdkNotInstalled and InitializeTimeout variants plus a stable code() identifier; AcpEvent::Error carries code so the frontend can render localized messages by key - Frontend preflight now runs for all connect sources; error event handler switches on code to show translated text for initialize_timeout, sdk_not_installed, platform_not_supported, process_exited, spawn_failed and download_failed - Remove ConnectionStatus::Downloading enum variant, all frontend branches, and i18n strings; drop obsolete autoLinkFailedTitle, autoLinkPreflightFailed, preflightCheckFailedDefault and preflightFailedTitle keys across 10 locales - Add backendErrors.* translations in 10 languages - Diagnostic logging: always log agent stderr plus binary path/size/args/env keys and Initialize timing; gate stdin/stdout JSON-RPC tracing behind CODEG_ACP_DEBUG to avoid persisting user content into OS log files Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -93,6 +93,41 @@ pub enum ConnectionCommand {
|
||||
Disconnect,
|
||||
}
|
||||
|
||||
/// Sentinel string embedded in a `sacp::Error` when the Initialize
|
||||
/// handshake times out. Converted back to `AcpError::InitializeTimeout`
|
||||
/// by the outer `.map_err(...)` in `run_connection`.
|
||||
const INIT_TIMEOUT_SENTINEL: &str = "__codeg_init_timeout__";
|
||||
|
||||
/// RAII guard that removes the `AgentConnection` entry from the manager
|
||||
/// map when dropped. Runs on both normal task exit AND task panic, so a
|
||||
/// panic inside `run_connection` can't leak a stale map entry.
|
||||
///
|
||||
/// The `Mutex` is async, so we take two paths:
|
||||
/// - If the lock is immediately available (`try_lock` succeeds), remove
|
||||
/// the entry synchronously in the current context.
|
||||
/// - Otherwise, spawn a short-lived cleanup task to acquire the lock
|
||||
/// and remove the entry asynchronously. The guard must hold owned
|
||||
/// `Arc<Mutex<_>>` and `String` so the spawned task has `'static`
|
||||
/// captures.
|
||||
struct ConnectionCleanupGuard {
|
||||
connections: Arc<tokio::sync::Mutex<HashMap<String, AgentConnection>>>,
|
||||
connection_id: String,
|
||||
}
|
||||
|
||||
impl Drop for ConnectionCleanupGuard {
|
||||
fn drop(&mut self) {
|
||||
if let Ok(mut guard) = self.connections.try_lock() {
|
||||
guard.remove(&self.connection_id);
|
||||
return;
|
||||
}
|
||||
let connections = self.connections.clone();
|
||||
let connection_id = std::mem::take(&mut self.connection_id);
|
||||
tokio::spawn(async move {
|
||||
connections.lock().await.remove(&connection_id);
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
/// Represents a single active ACP agent connection.
|
||||
pub struct AgentConnection {
|
||||
pub id: String,
|
||||
@@ -116,8 +151,6 @@ impl AgentConnection {
|
||||
async fn build_agent(
|
||||
agent_type: AgentType,
|
||||
runtime_env: &BTreeMap<String, String>,
|
||||
connection_id: &str,
|
||||
emitter: &EventEmitter,
|
||||
) -> Result<AcpAgent, AcpError> {
|
||||
let meta = registry::get_agent_meta(agent_type);
|
||||
debug_assert_eq!(meta.agent_type, agent_type);
|
||||
@@ -182,14 +215,14 @@ async fn build_agent(
|
||||
.map_err(|e| AcpError::SpawnFailed(e.to_string()))
|
||||
}
|
||||
AgentDistribution::Binary {
|
||||
version,
|
||||
version: registry_version,
|
||||
cmd,
|
||||
args,
|
||||
env,
|
||||
platforms,
|
||||
} => {
|
||||
let platform = registry::current_platform();
|
||||
let info = platforms
|
||||
let _ = platforms
|
||||
.iter()
|
||||
.find(|p| p.platform == platform)
|
||||
.ok_or_else(|| {
|
||||
@@ -199,33 +232,49 @@ async fn build_agent(
|
||||
))
|
||||
})?;
|
||||
|
||||
let has_cached_binary =
|
||||
crate::acp::binary_cache::find_cached_binary_for_agent(agent_type, version, cmd)
|
||||
.ok()
|
||||
.flatten()
|
||||
.is_some();
|
||||
if !has_cached_binary {
|
||||
crate::web::event_bridge::emit_event(
|
||||
emitter,
|
||||
"acp://event",
|
||||
AcpEvent::StatusChanged {
|
||||
connection_id: connection_id.into(),
|
||||
status: ConnectionStatus::Downloading,
|
||||
},
|
||||
// Session-page connect must never trigger a download. Use
|
||||
// the best cached version available (tolerates users on
|
||||
// older-but-still-working binaries); return SdkNotInstalled
|
||||
// only when nothing is cached, so the frontend can prompt
|
||||
// the user to install it from the Agent Settings page.
|
||||
//
|
||||
// INVARIANT: the substring "is not installed" is matched
|
||||
// verbatim by the frontend catch block in
|
||||
// `src/contexts/acp-connections-context.tsx` to surface a
|
||||
// localized install prompt. Do not change the wording.
|
||||
let (binary_path, cached_version) =
|
||||
crate::acp::binary_cache::find_best_cached_binary_for_agent(agent_type, cmd)?
|
||||
.ok_or_else(|| {
|
||||
AcpError::SdkNotInstalled(format!(
|
||||
"{} is not installed. Please install it in Agent Settings.",
|
||||
meta.name
|
||||
))
|
||||
})?;
|
||||
if cached_version == registry_version {
|
||||
eprintln!(
|
||||
"[ACP][{}] Using cached binary {cached_version}",
|
||||
meta.name
|
||||
);
|
||||
} else {
|
||||
eprintln!(
|
||||
"[ACP][{}] Using cached binary {cached_version} (registry recommends {registry_version})",
|
||||
meta.name
|
||||
);
|
||||
}
|
||||
let binary_path = crate::acp::binary_cache::ensure_binary_for_agent(
|
||||
agent_type, version, info.url, cmd,
|
||||
)
|
||||
.await?;
|
||||
|
||||
let binary_str = binary_path.to_string_lossy().to_string();
|
||||
let binary_size = std::fs::metadata(&binary_path)
|
||||
.map(|m| m.len())
|
||||
.unwrap_or(0);
|
||||
let mut server = McpServerStdio::new(meta.name, &binary_str);
|
||||
let cmd_args: Vec<String> = args.iter().map(|a| (*a).to_string()).collect();
|
||||
let cmd_args_for_log = cmd_args.clone();
|
||||
if !cmd_args.is_empty() {
|
||||
server = server.args(cmd_args);
|
||||
}
|
||||
let merged_env = merge_agent_env(env, runtime_env);
|
||||
let env_key_list: Vec<&str> =
|
||||
merged_env.iter().map(|(k, _)| k.as_str()).collect();
|
||||
if !merged_env.is_empty() {
|
||||
let env_vars: Vec<sacp::schema::EnvVariable> = merged_env
|
||||
.iter()
|
||||
@@ -233,11 +282,63 @@ async fn build_agent(
|
||||
.collect();
|
||||
server = server.env(env_vars);
|
||||
}
|
||||
// Spawn-time diagnostic dump: binary identity, args, and env
|
||||
// key list (values omitted — they may contain API keys). If
|
||||
// the connection hangs later, these lines pin down exactly
|
||||
// which binary was invoked and how.
|
||||
eprintln!(
|
||||
"[ACP][{}] binary_path={} size={} platform={} args={:?} env_keys={:?}",
|
||||
meta.name,
|
||||
binary_str,
|
||||
binary_size,
|
||||
registry::current_platform(),
|
||||
cmd_args_for_log,
|
||||
env_key_list
|
||||
);
|
||||
|
||||
// Stdio logging policy:
|
||||
// - stderr is always on: it's the agent's own diagnostic
|
||||
// output (ANSI log lines) and does not contain user data.
|
||||
// - stdin / stdout carry JSON-RPC traffic that includes
|
||||
// prompt text, tool-call arguments, file read/write
|
||||
// contents, and permission-response payloads — all of
|
||||
// which may contain API keys pasted by users or file
|
||||
// contents the agent is editing. They are gated behind
|
||||
// the `CODEG_ACP_DEBUG=1` env var so production builds
|
||||
// don't persist user content into OS-level log files
|
||||
// (Console.app on macOS, journald on Linux).
|
||||
// - Max line length is kept short so what does get logged
|
||||
// captures the JSON-RPC envelope (method, id) rather
|
||||
// than large payload bodies.
|
||||
let stdio_debug_enabled = std::env::var("CODEG_ACP_DEBUG")
|
||||
.map(|v| v == "1" || v.eq_ignore_ascii_case("true"))
|
||||
.unwrap_or(false);
|
||||
let agent_name = meta.name.to_string();
|
||||
Ok(AcpAgent::new(sacp::schema::McpServer::Stdio(server)).with_debug(
|
||||
move |line, dir| {
|
||||
if dir == sacp_tokio::LineDirection::Stderr {
|
||||
eprintln!("[ACP][{agent_name}][stderr] {line}");
|
||||
let (tag, enabled) = match dir {
|
||||
sacp_tokio::LineDirection::Stderr => ("stderr", true),
|
||||
sacp_tokio::LineDirection::Stdout => ("stdout", stdio_debug_enabled),
|
||||
sacp_tokio::LineDirection::Stdin => ("stdin", stdio_debug_enabled),
|
||||
};
|
||||
if !enabled {
|
||||
return;
|
||||
}
|
||||
const MAX: usize = 256;
|
||||
if line.len() > MAX {
|
||||
let head = line
|
||||
.char_indices()
|
||||
.take_while(|(i, _)| *i < MAX)
|
||||
.last()
|
||||
.map(|(i, c)| i + c.len_utf8())
|
||||
.unwrap_or(MAX);
|
||||
eprintln!(
|
||||
"[ACP][{agent_name}][{tag}] {}... <truncated {} bytes>",
|
||||
&line[..head],
|
||||
line.len() - head
|
||||
);
|
||||
} else {
|
||||
eprintln!("[ACP][{agent_name}][{tag}] {line}");
|
||||
}
|
||||
},
|
||||
))
|
||||
@@ -246,6 +347,13 @@ async fn build_agent(
|
||||
}
|
||||
|
||||
/// Spawn an ACP agent process and run the connection loop in a background task.
|
||||
///
|
||||
/// On success, the newly created `AgentConnection` is inserted into
|
||||
/// `connections` before this function returns. The background task
|
||||
/// automatically removes the entry from `connections` once `run_connection`
|
||||
/// exits (timeout, error, or clean disconnect), so the manager never
|
||||
/// leaks stale entries after a connection tears down.
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub async fn spawn_agent_connection(
|
||||
connection_id: String,
|
||||
agent_type: AgentType,
|
||||
@@ -254,7 +362,8 @@ pub async fn spawn_agent_connection(
|
||||
runtime_env: BTreeMap<String, String>,
|
||||
owner_window_label: String,
|
||||
emitter: EventEmitter,
|
||||
) -> Result<AgentConnection, AcpError> {
|
||||
connections: Arc<tokio::sync::Mutex<HashMap<String, AgentConnection>>>,
|
||||
) -> Result<(), AcpError> {
|
||||
crate::web::event_bridge::emit_event(
|
||||
&emitter,
|
||||
"acp://event",
|
||||
@@ -264,13 +373,36 @@ pub async fn spawn_agent_connection(
|
||||
},
|
||||
);
|
||||
|
||||
let agent = build_agent(agent_type, &runtime_env, &connection_id, &emitter).await?;
|
||||
let agent = build_agent(agent_type, &runtime_env).await?;
|
||||
|
||||
let (cmd_tx, cmd_rx) = mpsc::channel::<ConnectionCommand>(32);
|
||||
let conn_id = connection_id.clone();
|
||||
let emitter_clone = emitter.clone();
|
||||
let cleanup_connections = connections.clone();
|
||||
let cleanup_connection_id = connection_id.clone();
|
||||
|
||||
// Insert the entry BEFORE spawning the background task so that a
|
||||
// fast-failing `run_connection` can never remove it before it was
|
||||
// inserted (would otherwise leak the entry).
|
||||
connections.lock().await.insert(
|
||||
connection_id.clone(),
|
||||
AgentConnection {
|
||||
id: connection_id,
|
||||
agent_type,
|
||||
status: ConnectionStatus::Connecting,
|
||||
owner_window_label,
|
||||
cmd_tx,
|
||||
},
|
||||
);
|
||||
|
||||
tokio::spawn(async move {
|
||||
// RAII guard: runs on normal exit AND on panic unwinding, so a
|
||||
// panic inside `run_connection` can't leak a stale map entry.
|
||||
let _cleanup = ConnectionCleanupGuard {
|
||||
connections: cleanup_connections,
|
||||
connection_id: cleanup_connection_id,
|
||||
};
|
||||
|
||||
let result = run_connection(
|
||||
agent,
|
||||
conn_id.clone(),
|
||||
@@ -283,6 +415,7 @@ pub async fn spawn_agent_connection(
|
||||
.await;
|
||||
|
||||
if let Err(e) = result {
|
||||
let code = e.code().map(String::from);
|
||||
crate::web::event_bridge::emit_event(
|
||||
&emitter_clone,
|
||||
"acp://event",
|
||||
@@ -290,6 +423,7 @@ pub async fn spawn_agent_connection(
|
||||
connection_id: conn_id.clone(),
|
||||
message: e.to_string(),
|
||||
agent_type: agent_type.to_string(),
|
||||
code,
|
||||
},
|
||||
);
|
||||
}
|
||||
@@ -302,15 +436,11 @@ pub async fn spawn_agent_connection(
|
||||
status: ConnectionStatus::Disconnected,
|
||||
},
|
||||
);
|
||||
// `_cleanup` is dropped here — removes the connection entry from
|
||||
// the manager map. Same drop semantics apply on panic unwinding.
|
||||
});
|
||||
|
||||
Ok(AgentConnection {
|
||||
id: connection_id,
|
||||
agent_type,
|
||||
status: ConnectionStatus::Connecting,
|
||||
owner_window_label,
|
||||
cmd_tx,
|
||||
})
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Shared state for pending permission responders.
|
||||
@@ -699,6 +829,8 @@ async fn run_connection(
|
||||
on_receive_request!(),
|
||||
)
|
||||
.connect_with(agent, async move |cx| -> Result<(), sacp::Error> {
|
||||
let agent_name_for_log = registry::get_agent_meta(agent_type).name;
|
||||
|
||||
// Advertise filesystem + terminal capabilities for ACP tool execution.
|
||||
let init_request = InitializeRequest::new(ProtocolVersion::LATEST).client_capabilities(
|
||||
ClientCapabilities::new()
|
||||
@@ -707,7 +839,52 @@ async fn run_connection(
|
||||
.read_text_file(true)
|
||||
.write_text_file(true)),
|
||||
);
|
||||
let init_resp = cx.send_request_to(Agent, init_request).block_task().await?;
|
||||
// Bound the Initialize handshake so an outdated / incompatible
|
||||
// cached binary that never responds can't leave the frontend
|
||||
// stuck on "Connecting...". A healthy agent answers in <1s; we
|
||||
// give 60s headroom for cold process startup on slow machines.
|
||||
//
|
||||
// We cannot carry a structured error code through sacp's Error
|
||||
// type, so we tag the timeout with `INIT_TIMEOUT_SENTINEL` and
|
||||
// convert it back to `AcpError::InitializeTimeout` in the
|
||||
// outer `.map_err(...)` below. The outer layer attaches a
|
||||
// stable `code` to the frontend event so it can be localized.
|
||||
eprintln!(
|
||||
"[ACP][{agent_name_for_log}] Sending Initialize (protocol={}, timeout=60s)",
|
||||
ProtocolVersion::LATEST
|
||||
);
|
||||
let init_started = std::time::Instant::now();
|
||||
let init_resp = match tokio::time::timeout(
|
||||
std::time::Duration::from_secs(60),
|
||||
cx.send_request_to(Agent, init_request).block_task(),
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(Ok(resp)) => {
|
||||
eprintln!(
|
||||
"[ACP][{agent_name_for_log}] Initialize responded in {:?}",
|
||||
init_started.elapsed()
|
||||
);
|
||||
resp
|
||||
}
|
||||
Ok(Err(e)) => {
|
||||
eprintln!(
|
||||
"[ACP][{agent_name_for_log}] Initialize failed in {:?}: {e}",
|
||||
init_started.elapsed()
|
||||
);
|
||||
return Err(e);
|
||||
}
|
||||
Err(_) => {
|
||||
eprintln!(
|
||||
"[ACP][{agent_name_for_log}] Initialize TIMED OUT after {:?} \
|
||||
— the agent never answered the handshake. Check the \
|
||||
[stderr] lines above for agent-side errors. For a full \
|
||||
JSON-RPC trace, re-launch with CODEG_ACP_DEBUG=1.",
|
||||
init_started.elapsed()
|
||||
);
|
||||
return Err(sacp::util::internal_error(INIT_TIMEOUT_SENTINEL));
|
||||
}
|
||||
};
|
||||
emit_prompt_capabilities(
|
||||
&conn_id,
|
||||
&emitter_clone,
|
||||
@@ -860,6 +1037,7 @@ async fn run_connection(
|
||||
"Failed to load session, starting new: {e}"
|
||||
),
|
||||
agent_type: agent_type.to_string(),
|
||||
code: None,
|
||||
},
|
||||
);
|
||||
}
|
||||
@@ -968,7 +1146,14 @@ async fn run_connection(
|
||||
}
|
||||
})
|
||||
.await
|
||||
.map_err(|e| AcpError::protocol(e.to_string()))
|
||||
.map_err(|e| {
|
||||
let raw = e.to_string();
|
||||
if raw.contains(INIT_TIMEOUT_SENTINEL) {
|
||||
AcpError::InitializeTimeout
|
||||
} else {
|
||||
AcpError::protocol(raw)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
/// Store the permission responder and emit event to frontend.
|
||||
@@ -1614,6 +1799,7 @@ async fn run_conversation_loop<'a>(
|
||||
connection_id: conn_id.into(),
|
||||
message: "Prompt must contain at least one content block".into(),
|
||||
agent_type: agent_type.to_string(),
|
||||
code: None,
|
||||
},
|
||||
);
|
||||
continue;
|
||||
@@ -1802,6 +1988,7 @@ async fn run_conversation_loop<'a>(
|
||||
connection_id: conn_id.into(),
|
||||
message: format!("Failed to set mode: {e}"),
|
||||
agent_type: agent_type.to_string(),
|
||||
code: None,
|
||||
},
|
||||
);
|
||||
}
|
||||
@@ -1829,6 +2016,7 @@ async fn run_conversation_loop<'a>(
|
||||
connection_id: conn_id.into(),
|
||||
message: format!("Failed to set config option: {e}"),
|
||||
agent_type: agent_type.to_string(),
|
||||
code: None,
|
||||
},
|
||||
);
|
||||
}
|
||||
@@ -1938,6 +2126,7 @@ async fn run_conversation_loop<'a>(
|
||||
connection_id: conn_id.into(),
|
||||
message: format!("Failed to set mode: {e}"),
|
||||
agent_type: agent_type.to_string(),
|
||||
code: None,
|
||||
},
|
||||
);
|
||||
}
|
||||
@@ -1958,6 +2147,7 @@ async fn run_conversation_loop<'a>(
|
||||
connection_id: conn_id.into(),
|
||||
message: format!("Failed to set config option: {e}"),
|
||||
agent_type: agent_type.to_string(),
|
||||
code: None,
|
||||
},
|
||||
);
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user