use std::collections::{BTreeMap, HashMap, HashSet}; use std::path::{Path, PathBuf}; use std::sync::Arc; use sacp::schema::McpServerStdio; use sacp::schema::{ BlobResourceContents, CancelNotification, ClientCapabilities, ContentBlock, ContentChunk, CreateTerminalRequest, CreateTerminalResponse, EmbeddedResource, EmbeddedResourceResource, FileSystemCapabilities, ImageContent, InitializeRequest, KillTerminalRequest, KillTerminalResponse, LoadSessionRequest, NewSessionRequest, NewSessionResponse, PermissionOptionKind, Plan, PlanEntryPriority, PlanEntryStatus, PromptRequest, ProtocolVersion, ReadTextFileRequest, ReadTextFileResponse, ReleaseTerminalRequest, ReleaseTerminalResponse, RequestPermissionOutcome, RequestPermissionRequest, RequestPermissionResponse, ResourceLink, SelectedPermissionOutcome, SessionConfigKind, SessionConfigOption, SessionConfigOptionCategory, SessionConfigSelectGroup, SessionConfigSelectOption, SessionConfigSelectOptions, SessionId, SessionModeState, SessionNotification, SessionUpdate, SetSessionConfigOptionRequest, SetSessionConfigOptionResponse, SetSessionModeRequest, StopReason, TerminalExitStatus, TerminalOutputRequest, TerminalOutputResponse, TextContent, TextResourceContents, ToolCallContent, WaitForTerminalExitRequest, WaitForTerminalExitResponse, WriteTextFileRequest, WriteTextFileResponse, }; use sacp::util::MatchDispatch; use sacp::{ on_receive_request, Agent, Client, ConnectionTo, Dispatch, Responder, SessionMessage, UntypedMessage, }; use sacp_tokio::AcpAgent; use tokio::sync::mpsc; use crate::acp::error::AcpError; use crate::acp::file_system_runtime::{FileSystemRuntime, FileSystemRuntimeError}; use crate::acp::registry::{self, AgentDistribution}; use crate::acp::terminal_runtime::{TerminalRuntime, TerminalRuntimeError}; use crate::acp::types::{ AcpEvent, AvailableCommandInfo, ConnectionInfo, ConnectionStatus, PermissionOptionInfo, PlanEntryInfo, PromptCapabilitiesInfo, PromptInputBlock, SessionConfigKindInfo, SessionConfigOptionInfo, SessionConfigSelectGroupInfo, SessionConfigSelectInfo, SessionConfigSelectOptionInfo, SessionModeInfo, SessionModeStateInfo, }; use crate::models::agent::AgentType; use crate::network::proxy; use crate::web::event_bridge::EventEmitter; const DEFAULT_COMMAND_COLOR_ENV: [(&str, &str); 1] = [("CLICOLOR_FORCE", "1")]; fn merge_agent_env( env: &[(&'static str, &'static str)], runtime_env: &BTreeMap, ) -> Vec<(String, String)> { // Env var order is not semantically meaningful; use map overwrite semantics // to keep precedence while avoiding repeated O(n) scans. let mut merged = BTreeMap::::new(); for (key, value) in DEFAULT_COMMAND_COLOR_ENV { merged.insert(key.to_string(), value.to_string()); } for (key, value) in env { merged.insert((*key).to_string(), (*value).to_string()); } for (key, value) in runtime_env { merged.insert(key.clone(), value.clone()); } for (key, value) in proxy::current_proxy_env_vars() { merged.insert(key, value); } merged.into_iter().collect() } /// Commands sent from Tauri command handlers to the ACP connection loop. pub enum ConnectionCommand { Prompt { blocks: Vec, }, SetMode { mode_id: String, }, SetConfigOption { config_id: String, value_id: String, }, Cancel, RespondPermission { request_id: String, option_id: String, }, Fork { reply: tokio::sync::oneshot::Sender>, }, Disconnect, } /// Sentinel string embedded in a `sacp::Error` when the Initialize /// handshake times out. Converted back to `AcpError::InitializeTimeout` /// by the outer `.map_err(...)` in `run_connection`. const INIT_TIMEOUT_SENTINEL: &str = "__codeg_init_timeout__"; /// RAII guard that removes the `AgentConnection` entry from the manager /// map when dropped. Runs on both normal task exit AND task panic, so a /// panic inside `run_connection` can't leak a stale map entry. /// /// The `Mutex` is async, so we take two paths: /// - If the lock is immediately available (`try_lock` succeeds), remove /// the entry synchronously in the current context. /// - Otherwise, spawn a short-lived cleanup task to acquire the lock /// and remove the entry asynchronously. The guard must hold owned /// `Arc>` and `String` so the spawned task has `'static` /// captures. struct ConnectionCleanupGuard { connections: Arc>>, connection_id: String, } impl Drop for ConnectionCleanupGuard { fn drop(&mut self) { if let Ok(mut guard) = self.connections.try_lock() { guard.remove(&self.connection_id); return; } let connections = self.connections.clone(); let connection_id = std::mem::take(&mut self.connection_id); tokio::spawn(async move { connections.lock().await.remove(&connection_id); }); } } /// Represents a single active ACP agent connection. pub struct AgentConnection { pub id: String, pub agent_type: AgentType, pub status: ConnectionStatus, pub owner_window_label: String, pub cmd_tx: mpsc::Sender, } impl AgentConnection { pub fn info(&self) -> ConnectionInfo { ConnectionInfo { id: self.id.clone(), agent_type: self.agent_type, status: self.status.clone(), } } } /// Build an AcpAgent from registry metadata. async fn build_agent( agent_type: AgentType, runtime_env: &BTreeMap, ) -> Result { let meta = registry::get_agent_meta(agent_type); debug_assert_eq!(meta.agent_type, agent_type); match meta.distribution { AgentDistribution::Npx { cmd, args, env, .. } => { let merged_env = merge_agent_env(env, runtime_env); let mut parts: Vec = Vec::new(); for (k, v) in &merged_env { parts.push(format!("{k}={v}")); } parts.push( which::which(cmd) .map(|p| p.to_string_lossy().to_string()) .unwrap_or_else(|_| { crate::process::normalized_program(cmd) .to_string_lossy() .to_string() }), ); for a in args { parts.push((*a).into()); } // Translate OpenClaw-specific env vars to CLI flags if agent_type == AgentType::OpenClaw { if let Some(url) = runtime_env .get("OPENCLAW_GATEWAY_URL") .filter(|v| !v.is_empty()) { parts.push("--url".into()); parts.push(url.clone()); } if let Some(key) = runtime_env .get("OPENCLAW_SESSION_KEY") .filter(|v| !v.is_empty()) { parts.push("--session".into()); parts.push(key.clone()); } // When creating a new conversation (no session_id to resume), // pass --reset-session so OpenClaw mints a fresh transcript // instead of appending to the previous one. if runtime_env .get("OPENCLAW_RESET_SESSION") .is_some_and(|v| v == "1") { parts.push("--reset-session".into()); } } let refs: Vec<&str> = parts.iter().map(|s| s.as_str()).collect(); let agent_name = meta.name.to_string(); AcpAgent::from_args(&refs) .map(|a| { a.with_debug(move |line, dir| { if dir == sacp_tokio::LineDirection::Stderr { eprintln!("[ACP][{agent_name}][stderr] {line}"); } }) }) .map_err(|e| AcpError::SpawnFailed(e.to_string())) } AgentDistribution::Binary { version: registry_version, cmd, args, env, platforms, } => { let platform = registry::current_platform(); let _ = platforms .iter() .find(|p| p.platform == platform) .ok_or_else(|| { AcpError::PlatformNotSupported(format!( "{} is not available on {platform}", meta.name )) })?; // Session-page connect must never trigger a download. Use // the best cached version available (tolerates users on // older-but-still-working binaries); return SdkNotInstalled // only when nothing is cached, so the frontend can prompt // the user to install it from the Agent Settings page. // // INVARIANT: the substring "is not installed" is matched // verbatim by the frontend catch block in // `src/contexts/acp-connections-context.tsx` to surface a // localized install prompt. Do not change the wording. let (binary_path, cached_version) = crate::acp::binary_cache::find_best_cached_binary_for_agent(agent_type, cmd)? .ok_or_else(|| { AcpError::SdkNotInstalled(format!( "{} is not installed. Please install it in Agent Settings.", meta.name )) })?; if cached_version == registry_version { eprintln!("[ACP][{}] Using cached binary {cached_version}", meta.name); } else { eprintln!( "[ACP][{}] Using cached binary {cached_version} (registry recommends {registry_version})", meta.name ); } let binary_str = binary_path.to_string_lossy().to_string(); let binary_size = std::fs::metadata(&binary_path) .map(|m| m.len()) .unwrap_or(0); let mut server = McpServerStdio::new(meta.name, &binary_str); let cmd_args: Vec = args.iter().map(|a| (*a).to_string()).collect(); let cmd_args_for_log = cmd_args.clone(); if !cmd_args.is_empty() { server = server.args(cmd_args); } let merged_env = merge_agent_env(env, runtime_env); let env_key_list: Vec<&str> = merged_env.iter().map(|(k, _)| k.as_str()).collect(); if !merged_env.is_empty() { let env_vars: Vec = merged_env .iter() .map(|(k, v)| sacp::schema::EnvVariable::new(k, v)) .collect(); server = server.env(env_vars); } // Spawn-time diagnostic dump: binary identity, args, and env // key list (values omitted — they may contain API keys). If // the connection hangs later, these lines pin down exactly // which binary was invoked and how. eprintln!( "[ACP][{}] binary_path={} size={} platform={} args={:?} env_keys={:?}", meta.name, binary_str, binary_size, registry::current_platform(), cmd_args_for_log, env_key_list ); // Stdio logging policy: // - stderr is always on: it's the agent's own diagnostic // output (ANSI log lines) and does not contain user data. // - stdin / stdout carry JSON-RPC traffic that includes // prompt text, tool-call arguments, file read/write // contents, and permission-response payloads — all of // which may contain API keys pasted by users or file // contents the agent is editing. They are gated behind // the `CODEG_ACP_DEBUG=1` env var so production builds // don't persist user content into OS-level log files // (Console.app on macOS, journald on Linux). // - Max line length is kept short so what does get logged // captures the JSON-RPC envelope (method, id) rather // than large payload bodies. let stdio_debug_enabled = std::env::var("CODEG_ACP_DEBUG") .map(|v| v == "1" || v.eq_ignore_ascii_case("true")) .unwrap_or(false); let agent_name = meta.name.to_string(); Ok( AcpAgent::new(sacp::schema::McpServer::Stdio(server)).with_debug( move |line, dir| { let (tag, enabled) = match dir { sacp_tokio::LineDirection::Stderr => ("stderr", true), sacp_tokio::LineDirection::Stdout => ("stdout", stdio_debug_enabled), sacp_tokio::LineDirection::Stdin => ("stdin", stdio_debug_enabled), }; if !enabled { return; } const MAX: usize = 256; if line.len() > MAX { let head = line .char_indices() .take_while(|(i, _)| *i < MAX) .last() .map(|(i, c)| i + c.len_utf8()) .unwrap_or(MAX); eprintln!( "[ACP][{agent_name}][{tag}] {}... ", &line[..head], line.len() - head ); } else { eprintln!("[ACP][{agent_name}][{tag}] {line}"); } }, ), ) } } } /// Spawn an ACP agent process and run the connection loop in a background task. /// /// On success, the newly created `AgentConnection` is inserted into /// `connections` before this function returns. The background task /// automatically removes the entry from `connections` once `run_connection` /// exits (timeout, error, or clean disconnect), so the manager never /// leaks stale entries after a connection tears down. #[allow(clippy::too_many_arguments)] pub async fn spawn_agent_connection( connection_id: String, agent_type: AgentType, working_dir: Option, session_id: Option, runtime_env: BTreeMap, owner_window_label: String, emitter: EventEmitter, connections: Arc>>, ) -> Result<(), AcpError> { crate::web::event_bridge::emit_event( &emitter, "acp://event", AcpEvent::StatusChanged { connection_id: connection_id.clone(), status: ConnectionStatus::Connecting, }, ); let agent = build_agent(agent_type, &runtime_env).await?; let (cmd_tx, cmd_rx) = mpsc::channel::(32); let conn_id = connection_id.clone(); let emitter_clone = emitter.clone(); let cleanup_connections = connections.clone(); let cleanup_connection_id = connection_id.clone(); // Insert the entry BEFORE spawning the background task so that a // fast-failing `run_connection` can never remove it before it was // inserted (would otherwise leak the entry). connections.lock().await.insert( connection_id.clone(), AgentConnection { id: connection_id, agent_type, status: ConnectionStatus::Connecting, owner_window_label, cmd_tx, }, ); tokio::spawn(async move { // RAII guard: runs on normal exit AND on panic unwinding, so a // panic inside `run_connection` can't leak a stale map entry. let _cleanup = ConnectionCleanupGuard { connections: cleanup_connections, connection_id: cleanup_connection_id, }; let result = run_connection( agent, conn_id.clone(), agent_type, working_dir, session_id, cmd_rx, emitter_clone.clone(), ) .await; if let Err(e) = result { let code = e.code().map(String::from); crate::web::event_bridge::emit_event( &emitter_clone, "acp://event", AcpEvent::Error { connection_id: conn_id.clone(), message: e.to_string(), agent_type: agent_type.to_string(), code, }, ); } crate::web::event_bridge::emit_event( &emitter_clone, "acp://event", AcpEvent::StatusChanged { connection_id: conn_id, status: ConnectionStatus::Disconnected, }, ); // `_cleanup` is dropped here — removes the connection entry from // the manager map. Same drop semantics apply on panic unwinding. }); Ok(()) } /// Shared state for pending permission responders. type PendingPermissions = Arc>>>; fn map_session_modes(mode_state: &SessionModeState) -> SessionModeStateInfo { SessionModeStateInfo { current_mode_id: mode_state.current_mode_id.to_string(), available_modes: mode_state .available_modes .iter() .map(|mode| SessionModeInfo { id: mode.id.to_string(), name: mode.name.clone(), description: mode.description.clone(), }) .collect(), } } fn emit_session_modes( connection_id: &str, emitter: &EventEmitter, modes: &Option, ) { if let Some(mode_state) = modes { crate::web::event_bridge::emit_event( emitter, "acp://event", AcpEvent::SessionModes { connection_id: connection_id.into(), modes: map_session_modes(mode_state), }, ); } } fn map_session_config_category(category: &SessionConfigOptionCategory) -> String { match category { SessionConfigOptionCategory::Mode => "mode".to_string(), SessionConfigOptionCategory::Model => "model".to_string(), SessionConfigOptionCategory::ThoughtLevel => "thought_level".to_string(), SessionConfigOptionCategory::Other(value) => value.clone(), _ => "unknown".to_string(), } } fn map_session_config_select_option( option: &SessionConfigSelectOption, ) -> SessionConfigSelectOptionInfo { SessionConfigSelectOptionInfo { value: option.value.to_string(), name: option.name.clone(), description: option.description.clone(), } } fn map_session_config_select_group( group: &SessionConfigSelectGroup, ) -> SessionConfigSelectGroupInfo { SessionConfigSelectGroupInfo { group: group.group.to_string(), name: group.name.clone(), options: group .options .iter() .map(map_session_config_select_option) .collect(), } } fn map_session_config_option(option: &SessionConfigOption) -> Option { match &option.kind { SessionConfigKind::Select(select) => { let (flat_options, groups) = match &select.options { SessionConfigSelectOptions::Ungrouped(options) => ( options .iter() .map(map_session_config_select_option) .collect::>(), Vec::new(), ), SessionConfigSelectOptions::Grouped(grouped) => ( grouped .iter() .flat_map(|group| { group.options.iter().map(map_session_config_select_option) }) .collect::>(), grouped .iter() .map(map_session_config_select_group) .collect::>(), ), _ => (Vec::new(), Vec::new()), }; Some(SessionConfigOptionInfo { id: option.id.to_string(), name: option.name.clone(), description: option.description.clone(), category: option.category.as_ref().map(map_session_config_category), kind: SessionConfigKindInfo::Select(SessionConfigSelectInfo { current_value: select.current_value.to_string(), options: flat_options, groups, }), }) } _ => None, } } fn map_session_config_options( config_options: &[SessionConfigOption], ) -> Vec { config_options .iter() .filter_map(map_session_config_option) .collect() } /// Codex-acp sometimes omits the "mode" (approval preset) config option when /// the loaded sandbox policy does not exactly match one of the three built-in /// presets (commonly because `writable_roots` was injected during config /// loading). When that happens, synthesize the option so the user can still /// pick a preset. codex-acp's `set_config_option` handler always accepts /// `config_id = "mode"` regardless of whether it was advertised. fn ensure_codex_mode_option(options: &mut Vec) { if options.iter().any(|o| o.id == "mode") { return; } options.insert( 0, SessionConfigOptionInfo { id: "mode".to_string(), name: "Approval Preset".to_string(), description: Some( "Choose an approval and sandboxing preset for your session".to_string(), ), category: Some("mode".to_string()), kind: SessionConfigKindInfo::Select(SessionConfigSelectInfo { current_value: "auto".to_string(), options: vec![ SessionConfigSelectOptionInfo { value: "read-only".to_string(), name: "Read Only".to_string(), description: Some("Codex can only read files".to_string()), }, SessionConfigSelectOptionInfo { value: "auto".to_string(), name: "Default".to_string(), description: Some( "Codex can edit files, but asks before running commands".to_string(), ), }, SessionConfigSelectOptionInfo { value: "full-access".to_string(), name: "Full Access".to_string(), description: Some("Codex runs without asking for approval".to_string()), }, ], groups: vec![], }), }, ); } fn emit_session_config_options_values( connection_id: &str, emitter: &EventEmitter, agent_type: AgentType, config_options: Vec, ) { let mut mapped = map_session_config_options(&config_options); if agent_type == AgentType::Codex { ensure_codex_mode_option(&mut mapped); } crate::web::event_bridge::emit_event( emitter, "acp://event", AcpEvent::SessionConfigOptions { connection_id: connection_id.into(), config_options: mapped, }, ); } fn emit_session_config_options( connection_id: &str, emitter: &EventEmitter, agent_type: AgentType, config_options: &Option>, ) { // Always emit one config-options snapshot after session attach. // Some agents (e.g. Gemini CLI) may not expose session config options // and return `None`; emitting an empty list lets the frontend settle // loading state instead of waiting forever. let options = config_options.clone().unwrap_or_default(); emit_session_config_options_values(connection_id, emitter, agent_type, options); } fn emit_selectors_ready(connection_id: &str, emitter: &EventEmitter) { crate::web::event_bridge::emit_event( emitter, "acp://event", AcpEvent::SelectorsReady { connection_id: connection_id.into(), }, ); } fn emit_prompt_capabilities( connection_id: &str, emitter: &EventEmitter, capabilities: &sacp::schema::PromptCapabilities, ) { crate::web::event_bridge::emit_event( emitter, "acp://event", AcpEvent::PromptCapabilities { connection_id: connection_id.into(), prompt_capabilities: PromptCapabilitiesInfo { image: capabilities.image, audio: capabilities.audio, embedded_context: capabilities.embedded_context, }, }, ); } fn resolve_working_dir(working_dir: Option<&str>) -> PathBuf { match working_dir { Some(dir) => { let path = PathBuf::from(dir); if path.is_absolute() { path } else { std::env::current_dir().unwrap_or_default().join(path) } } None => std::env::current_dir() .unwrap_or_else(|_| dirs::home_dir().unwrap_or_else(|| PathBuf::from("/"))), } } fn claude_raw_sdk_session_meta( agent_type: AgentType, ) -> Option> { if agent_type != AgentType::ClaudeCode { return None; } let mut claude_code = serde_json::Map::new(); claude_code.insert( "emitRawSDKMessages".to_string(), serde_json::Value::Bool(true), ); let mut meta = serde_json::Map::new(); meta.insert( "claudeCode".to_string(), serde_json::Value::Object(claude_code), ); Some(meta) } fn build_new_session_request(agent_type: AgentType, cwd: &Path) -> NewSessionRequest { let mut req = NewSessionRequest::new(cwd.to_path_buf()); if let Some(meta) = claude_raw_sdk_session_meta(agent_type) { req = req.meta(meta); } req } fn build_load_session_request( agent_type: AgentType, session_id: SessionId, cwd: &Path, ) -> LoadSessionRequest { let mut req = LoadSessionRequest::new(session_id, cwd.to_path_buf()); if let Some(meta) = claude_raw_sdk_session_meta(agent_type) { req = req.meta(meta); } req } /// The main ACP connection loop. async fn run_connection( agent: AcpAgent, connection_id: String, agent_type: AgentType, working_dir: Option, session_id: Option, mut cmd_rx: mpsc::Receiver, emitter: EventEmitter, ) -> Result<(), AcpError> { let pending_perms: PendingPermissions = Arc::new(tokio::sync::Mutex::new(HashMap::new())); let terminal_runtime = Arc::new(TerminalRuntime::new()); let cwd = resolve_working_dir(working_dir.as_deref()); let cwd_string = cwd.to_string_lossy().to_string(); let file_system_runtime = Arc::new(FileSystemRuntime::new(cwd.clone())); let conn_id = connection_id.clone(); let emitter_clone = emitter.clone(); let perms = pending_perms.clone(); Client .builder() .name("codeg") .on_receive_request( { let conn_id = conn_id.clone(); let emitter_inner = emitter_clone.clone(); let perms = perms.clone(); let perm_cwd = cwd_string.clone(); async move |req: RequestPermissionRequest, responder: Responder, _cx: ConnectionTo| { handle_permission_request( &conn_id, &emitter_inner, &perms, &perm_cwd, req, responder, ) .await; Ok(()) } }, on_receive_request!(), ) .on_receive_request( { let runtime = file_system_runtime.clone(); async move |req: ReadTextFileRequest, responder: Responder, _cx: ConnectionTo| { respond_file_system_request(responder, runtime.read_text_file(req).await)?; Ok(()) } }, on_receive_request!(), ) .on_receive_request( { let runtime = file_system_runtime.clone(); async move |req: WriteTextFileRequest, responder: Responder, _cx: ConnectionTo| { respond_file_system_request(responder, runtime.write_text_file(req).await)?; Ok(()) } }, on_receive_request!(), ) .on_receive_request( { let runtime = terminal_runtime.clone(); async move |req: CreateTerminalRequest, responder: Responder, _cx: ConnectionTo| { respond_terminal_request(responder, runtime.create_terminal(req).await)?; Ok(()) } }, on_receive_request!(), ) .on_receive_request( { let runtime = terminal_runtime.clone(); async move |req: TerminalOutputRequest, responder: Responder, _cx: ConnectionTo| { respond_terminal_request(responder, runtime.terminal_output(req).await)?; Ok(()) } }, on_receive_request!(), ) .on_receive_request( { let runtime = terminal_runtime.clone(); async move |req: WaitForTerminalExitRequest, responder: Responder, _cx: ConnectionTo| { respond_terminal_request(responder, runtime.wait_for_terminal_exit(req).await)?; Ok(()) } }, on_receive_request!(), ) .on_receive_request( { let runtime = terminal_runtime.clone(); async move |req: KillTerminalRequest, responder: Responder, _cx: ConnectionTo| { respond_terminal_request(responder, runtime.kill_terminal(req).await)?; Ok(()) } }, on_receive_request!(), ) .on_receive_request( { let runtime = terminal_runtime.clone(); async move |req: ReleaseTerminalRequest, responder: Responder, _cx: ConnectionTo| { respond_terminal_request(responder, runtime.release_terminal(req).await)?; Ok(()) } }, on_receive_request!(), ) .connect_with(agent, async move |cx| -> Result<(), sacp::Error> { let agent_name_for_log = registry::get_agent_meta(agent_type).name; // Advertise filesystem + terminal capabilities for ACP tool execution. let init_request = InitializeRequest::new(ProtocolVersion::LATEST).client_capabilities( ClientCapabilities::new() .terminal(true) .fs(FileSystemCapabilities::new() .read_text_file(true) .write_text_file(true)), ); // Bound the Initialize handshake so an outdated / incompatible // cached binary that never responds can't leave the frontend // stuck on "Connecting...". A healthy agent answers in <1s; we // give 60s headroom for cold process startup on slow machines. // // We cannot carry a structured error code through sacp's Error // type, so we tag the timeout with `INIT_TIMEOUT_SENTINEL` and // convert it back to `AcpError::InitializeTimeout` in the // outer `.map_err(...)` below. The outer layer attaches a // stable `code` to the frontend event so it can be localized. eprintln!( "[ACP][{agent_name_for_log}] Sending Initialize (protocol={}, timeout=60s)", ProtocolVersion::LATEST ); let init_started = std::time::Instant::now(); let init_resp = match tokio::time::timeout( std::time::Duration::from_secs(60), cx.send_request_to(Agent, init_request).block_task(), ) .await { Ok(Ok(resp)) => { eprintln!( "[ACP][{agent_name_for_log}] Initialize responded in {:?}", init_started.elapsed() ); resp } Ok(Err(e)) => { eprintln!( "[ACP][{agent_name_for_log}] Initialize failed in {:?}: {e}", init_started.elapsed() ); return Err(e); } Err(_) => { eprintln!( "[ACP][{agent_name_for_log}] Initialize TIMED OUT after {:?} \ — the agent never answered the handshake. Check the \ [stderr] lines above for agent-side errors. For a full \ JSON-RPC trace, re-launch with CODEG_ACP_DEBUG=1.", init_started.elapsed() ); return Err(sacp::util::internal_error(INIT_TIMEOUT_SENTINEL)); } }; emit_prompt_capabilities( &conn_id, &emitter_clone, &init_resp.agent_capabilities.prompt_capabilities, ); let supports_fork = init_resp .agent_capabilities .session_capabilities .fork .is_some(); eprintln!( "[ACP] Agent capabilities: load_session={}, fork={}", init_resp.agent_capabilities.load_session, supports_fork ); // Emit fork support capability crate::web::event_bridge::emit_event( &emitter_clone, "acp://event", AcpEvent::ForkSupported { connection_id: conn_id.clone(), supported: supports_fork, }, ); // Emit connected status early so the frontend can show cached // selectors and enable sending while the session initialises. // Prompts sent before run_conversation_loop are buffered in // the cmd_rx channel and processed as soon as the loop starts. crate::web::event_bridge::emit_event( &emitter_clone, "acp://event", AcpEvent::StatusChanged { connection_id: conn_id.clone(), status: ConnectionStatus::Connected, }, ); if let Some(sid) = session_id { // Load existing session via session/load let load_req = build_load_session_request(agent_type, SessionId::new(sid.clone()), &cwd); let load_result = cx.send_request_to(Agent, load_req).block_task().await; match load_result { Ok(load_resp) => { let initial_config_options = load_resp.config_options.clone(); let new_resp = NewSessionResponse::new(SessionId::new(sid.clone())) .modes(load_resp.modes) .config_options(load_resp.config_options) .meta(load_resp.meta); let mut session = cx.attach_session(new_resp, Default::default())?; // Drain historical replay notifications from session/load, // but forward AvailableCommandsUpdate to the frontend let mut drained = 0u32; while let Ok(Ok(msg)) = tokio::time::timeout( std::time::Duration::from_millis(100), session.read_update(), ) .await { drained += 1; if let SessionMessage::SessionMessage(dispatch) = msg { let cid = conn_id.clone(); let h = emitter_clone.clone(); let dispatch = fix_usage_update_nulls(dispatch); let _ = MatchDispatch::new(dispatch) .if_notification(async |notif: SessionNotification| { if matches!( notif.update, SessionUpdate::AvailableCommandsUpdate(_) ) { // Historical-replay path only // forwards AvailableCommandsUpdate, // which never carries tool output // — a throwaway cache is fine. let mut replay_cache = ToolCallOutputCache::default(); emit_conversation_update( &cid, &h, agent_type, notif.update, None, &mut replay_cache, ); } Ok(()) }) .await .otherwise(async |dispatch| { maybe_emit_claude_sdk_ext_notification(&cid, &h, dispatch); Ok(()) }) .await; } } if drained > 0 { eprintln!("[ACP] Drained {drained} historical replay notifications"); } crate::web::event_bridge::emit_event( &emitter_clone, "acp://event", AcpEvent::SessionStarted { connection_id: conn_id.clone(), session_id: sid.clone(), }, ); emit_session_modes(&conn_id, &emitter_clone, session.modes()); emit_session_config_options( &conn_id, &emitter_clone, agent_type, &initial_config_options, ); emit_selectors_ready(&conn_id, &emitter_clone); let loop_result = run_conversation_loop( &mut session, &conn_id, &emitter_clone, agent_type, &perms, &mut cmd_rx, terminal_runtime.clone(), &cwd_string, supports_fork, ) .await; terminal_runtime.release_all_for_session(&sid).await; drop(session); handle_fork_or_exit( loop_result, &conn_id, &emitter_clone, agent_type, &perms, &mut cmd_rx, terminal_runtime.clone(), &cwd, &cwd_string, ) .await } Err(e) => { // session/load failed (e.g. agent doesn't support resume, // or ephemeral forked session). // Fall back to session/new so the tab still works. let err_str = e.to_string(); eprintln!( "[ACP] session/load failed ({}), falling back to session/new", err_str ); // Only emit a visible error for unexpected failures; // "Method not found" is expected for agents that don't // support session resume (e.g. Cline). // "Authentication required" is expected for agents whose // credentials have expired (e.g. Gemini CLI) — skip // session/new too since it will also fail. if err_str.contains("Authentication required") { return Ok(()); } if !err_str.contains("Method not found") { crate::web::event_bridge::emit_event( &emitter_clone, "acp://event", AcpEvent::Error { connection_id: conn_id.clone(), message: format!("Failed to load session, starting new: {e}"), agent_type: agent_type.to_string(), code: None, }, ); } let new_resp = cx .send_request_to(Agent, build_new_session_request(agent_type, &cwd)) .block_task() .await?; let fallback_sid = new_resp.session_id.0.to_string(); let initial_config_options = new_resp.config_options.clone(); let mut session = cx.attach_session(new_resp, Default::default())?; crate::web::event_bridge::emit_event( &emitter_clone, "acp://event", AcpEvent::SessionStarted { connection_id: conn_id.clone(), session_id: fallback_sid.clone(), }, ); emit_session_modes(&conn_id, &emitter_clone, session.modes()); emit_session_config_options( &conn_id, &emitter_clone, agent_type, &initial_config_options, ); emit_selectors_ready(&conn_id, &emitter_clone); let loop_result = run_conversation_loop( &mut session, &conn_id, &emitter_clone, agent_type, &perms, &mut cmd_rx, terminal_runtime.clone(), &cwd_string, supports_fork, ) .await; terminal_runtime .release_all_for_session(&fallback_sid) .await; drop(session); handle_fork_or_exit( loop_result, &conn_id, &emitter_clone, agent_type, &perms, &mut cmd_rx, terminal_runtime.clone(), &cwd, &cwd_string, ) .await } } } else { // Create new session let new_resp = cx .send_request_to(Agent, build_new_session_request(agent_type, &cwd)) .block_task() .await?; let sid = new_resp.session_id.0.to_string(); let initial_config_options = new_resp.config_options.clone(); let mut session = cx.attach_session(new_resp, Default::default())?; crate::web::event_bridge::emit_event( &emitter_clone, "acp://event", AcpEvent::SessionStarted { connection_id: conn_id.clone(), session_id: sid.clone(), }, ); emit_session_modes(&conn_id, &emitter_clone, session.modes()); emit_session_config_options( &conn_id, &emitter_clone, agent_type, &initial_config_options, ); emit_selectors_ready(&conn_id, &emitter_clone); let loop_result = run_conversation_loop( &mut session, &conn_id, &emitter_clone, agent_type, &perms, &mut cmd_rx, terminal_runtime.clone(), &cwd_string, supports_fork, ) .await; terminal_runtime.release_all_for_session(&sid).await; drop(session); handle_fork_or_exit( loop_result, &conn_id, &emitter_clone, agent_type, &perms, &mut cmd_rx, terminal_runtime.clone(), &cwd, &cwd_string, ) .await } }) .await .map_err(|e| { let raw = e.to_string(); if raw.contains(INIT_TIMEOUT_SENTINEL) { AcpError::InitializeTimeout } else { AcpError::protocol(raw) } }) } /// Store the permission responder and emit event to frontend. async fn handle_permission_request( conn_id: &str, emitter: &EventEmitter, perms: &PendingPermissions, cwd: &str, req: RequestPermissionRequest, responder: Responder, ) { let request_id = uuid::Uuid::new_v4().to_string(); let options: Vec = req .options .iter() .map(|opt| PermissionOptionInfo { option_id: opt.option_id.to_string(), name: opt.name.clone(), kind: match opt.kind { PermissionOptionKind::AllowOnce => "allow_once".into(), PermissionOptionKind::AllowAlways => "allow_always".into(), PermissionOptionKind::RejectOnce => "reject_once".into(), PermissionOptionKind::RejectAlways => "reject_always".into(), _ => "unknown".into(), }, }) .collect(); let mut tool_call_value = serde_json::to_value(&req.tool_call).unwrap_or_default(); // Resolve line numbers in rawInput for edit tool permission requests if let Some(obj) = tool_call_value.as_object_mut() { let key = ["rawInput", "raw_input"] .into_iter() .find(|k| obj.contains_key(*k)); if let Some(key) = key { match obj.get_mut(key) { // rawInput is a JSON object: inject _start_line in place Some(v) if v.is_object() => { inject_start_line(v, Some(cwd)); } // rawInput is a JSON string: parse, inject, write back as object Some(serde_json::Value::String(text)) => { let text = text.clone(); if let Ok(mut parsed) = serde_json::from_str::(&text) { if inject_start_line(&mut parsed, Some(cwd)) { obj.insert(key.to_string(), parsed); } } else if text.contains("@@\n") || text.contains("@@\r\n") { if let Some(resolved) = crate::parsers::resolve_patch_text(&text, Some(cwd)) { obj.insert(key.to_string(), serde_json::Value::String(resolved)); } } } _ => {} } } } perms.lock().await.insert(request_id.clone(), responder); crate::web::event_bridge::emit_event( emitter, "acp://event", AcpEvent::PermissionRequest { connection_id: conn_id.into(), request_id, tool_call: tool_call_value, options, }, ); } fn respond_terminal_request( responder: Responder, result: Result, ) -> Result<(), sacp::Error> { match result { Ok(response) => responder.respond(response), Err(error) => responder.respond_with_error(error.into_rpc_error()), } } fn respond_file_system_request( responder: Responder, result: Result, ) -> Result<(), sacp::Error> { match result { Ok(response) => responder.respond(response), Err(error) => responder.respond_with_error(error.into_rpc_error()), } } async fn set_session_mode( session: &mut sacp::ActiveSession<'_, Agent>, conn_id: &str, emitter: &EventEmitter, mode_id: String, ) -> Result<(), sacp::Error> { let req = SetSessionModeRequest::new(session.session_id().clone(), mode_id.clone()); session .connection() .send_request_to(Agent, req) .block_task() .await?; crate::web::event_bridge::emit_event( emitter, "acp://event", AcpEvent::ModeChanged { connection_id: conn_id.into(), mode_id, }, ); Ok(()) } async fn set_session_config_option( cx: &ConnectionTo, session_id: &SessionId, conn_id: &str, emitter: &EventEmitter, agent_type: AgentType, config_id: String, value_id: String, ) -> Result<(), sacp::Error> { let req = SetSessionConfigOptionRequest::new(session_id.clone(), config_id, value_id); let untyped_req = UntypedMessage::new("session/set_config_option", req).map_err(|e| { sacp::util::internal_error(format!("Failed to build config option request: {e}")) })?; let raw_response = cx.send_request_to(Agent, untyped_req).block_task().await?; let response: SetSessionConfigOptionResponse = serde_json::from_value(raw_response).map_err(|e| { sacp::util::internal_error(format!("Failed to parse config option response: {e}")) })?; emit_session_config_options_values(conn_id, emitter, agent_type, response.config_options); Ok(()) } const TERMINAL_POLL_INTERVAL_MS: u64 = 200; const TERMINAL_POLL_MISSING_LIMIT: u8 = 10; /// Hard cap on the size of a single ACP event's `raw_output` payload. /// /// Agents (e.g. Claude Code, Codex) frequently send `tool_call_update` /// notifications where `raw_output` is the **full accumulated** tool output /// rather than an incremental delta. For long-running terminal tools this /// leads to O(N²) bytes flowing through the event pipeline and multi-GB /// transient allocations (serde_json Value trees, IPC buffers, broadcast /// channel backlog). This constant caps any single emitted chunk so the /// pipeline never sees a multi-MB event. const MAX_SINGLE_EMIT_BYTES: usize = 64 * 1024; /// Byte length of the tail we retain per tool-call to verify that the next /// incoming snapshot is a cumulative extension of the previous one. Small /// enough to keep the cache bounded even in pathological sessions, large /// enough that a matching tail is an extremely unlikely coincidence. const MAX_CACHED_TAIL_BYTES: usize = 8 * 1024; /// Hard cap on the number of tool-call entries the cache retains. Prevents /// unbounded growth in long sessions where agents forget to mark tool calls /// as completed. Entries are evicted FIFO by generation counter. const MAX_CACHE_ENTRIES: usize = 256; /// Prefix used when an emitted chunk had to be truncated. const TRUNCATION_MARKER: &str = "[...truncated...]\n"; #[derive(Debug)] struct CachedOutput { /// Total byte length of the last observed `raw_output`. total_len: usize, /// Tail of the last observed `raw_output`, up to `MAX_CACHED_TAIL_BYTES` /// bytes. Always aligned to a UTF-8 character boundary at the start. tail: String, /// Monotonic insertion/update tick used for FIFO eviction. generation: u64, } /// Per-session cache of the last `raw_output` fingerprint emitted for each /// tool call. Enables delta detection: when an agent sends cumulative /// snapshots, we forward only the suffix (with `raw_output_append=true`) /// and keep the fingerprint bounded so it works even when the full output /// grows into the multi-MB range. #[derive(Debug, Default)] struct ToolCallOutputCache { entries: HashMap, next_generation: u64, } impl ToolCallOutputCache { /// Diff an incoming full `raw_output` snapshot for `tool_call_id` against /// the cache and return what should be emitted downstream. /// /// Returns `None` when the incoming snapshot is identical to the /// previously emitted one (nothing to send). Otherwise returns /// `(payload, append)` where: /// - `append=true` — `payload` is a (possibly truncated) suffix delta; /// the frontend should append it to the existing chunks. /// - `append=false` — `payload` is a (possibly truncated) replacement /// for the full tool output; the frontend should reset chunks. fn consume(&mut self, tool_call_id: &str, curr: &str) -> Option<(String, bool)> { let curr_len = curr.len(); let decision: Option<(String, bool)> = match self.entries.get(tool_call_id) { Some(prev) if curr_len >= prev.total_len && self.is_extension_of(prev, curr) => { if curr_len == prev.total_len { // Identical output — nothing to emit. Cache stays fresh. return None; } let suffix = &curr[prev.total_len..]; Some(build_emit_payload(suffix, true)) } _ => Some(build_emit_payload(curr, false)), }; // Update cache snapshot to current state so the next update can // still detect a prefix extension. let tail = trim_partial_ansi_tail(truncate_tail_at_char_boundary( curr, MAX_CACHED_TAIL_BYTES, )) .to_string(); let generation = self.next_generation; self.next_generation = self.next_generation.wrapping_add(1); self.entries.insert( tool_call_id.to_string(), CachedOutput { total_len: curr_len, tail, generation, }, ); self.enforce_entry_cap(); decision } /// Seed the cache with an initial snapshot for `tool_call_id`, WITHOUT /// attempting to diff against any prior state. Used for the initial /// `SessionUpdate::ToolCall` notification, whose frontend reducer /// treats `raw_output` as a full replacement. fn seed(&mut self, tool_call_id: &str, curr: &str) -> Option { let (payload, _append) = build_emit_payload(curr, false); let tail = trim_partial_ansi_tail(truncate_tail_at_char_boundary( curr, MAX_CACHED_TAIL_BYTES, )) .to_string(); let generation = self.next_generation; self.next_generation = self.next_generation.wrapping_add(1); self.entries.insert( tool_call_id.to_string(), CachedOutput { total_len: curr.len(), tail, generation, }, ); self.enforce_entry_cap(); if payload.is_empty() { None } else { Some(payload) } } /// Drop cached state for a tool call that has finished. Keeps the /// session-scoped cache bounded in long-running sessions. fn remove_if_final(&mut self, tool_call_id: &str, status: Option<&str>) { if matches!( status, Some("completed" | "failed" | "cancelled" | "error") ) { self.entries.remove(tool_call_id); } } /// Returns true when the cached fingerprint matches `curr` at the /// expected offset — i.e. `curr` is a prefix extension (or identity) /// of the previously observed snapshot. fn is_extension_of(&self, prev: &CachedOutput, curr: &str) -> bool { let tail_start = prev.total_len.saturating_sub(prev.tail.len()); curr.get(tail_start..prev.total_len) .is_some_and(|slice| slice == prev.tail.as_str()) } /// Evict oldest entries (by `generation`) once the cache exceeds the /// entry cap. Linear scan over a bounded map, so O(MAX_CACHE_ENTRIES) /// per eviction — acceptable at this size. fn enforce_entry_cap(&mut self) { while self.entries.len() > MAX_CACHE_ENTRIES { let Some(oldest_id) = self .entries .iter() .min_by_key(|(_, v)| v.generation) .map(|(k, _)| k.clone()) else { break; }; self.entries.remove(&oldest_id); } } } /// Apply the per-event size cap + truncation marker. Returns `(payload, /// append)`. An empty `text` yields an empty `payload`; callers should /// decide whether to suppress the emission in that case. fn build_emit_payload(text: &str, append: bool) -> (String, bool) { let truncated = trim_partial_ansi_tail(truncate_tail_at_char_boundary( text, MAX_SINGLE_EMIT_BYTES, )); let out = if truncated.len() < text.len() { format!("{TRUNCATION_MARKER}{truncated}") } else { truncated.to_string() }; (out, append) } /// Return a substring of `s` whose byte length is `<= max_bytes`, aligned to /// a UTF-8 character boundary and taken from the TAIL of `s` (so the most /// recent output is preserved when truncation is required). fn truncate_tail_at_char_boundary(s: &str, max_bytes: usize) -> &str { if s.len() <= max_bytes { return s; } let mut start = s.len() - max_bytes; while start < s.len() && !s.is_char_boundary(start) { start += 1; } &s[start..] } /// If the very end of `s` contains a partial ANSI escape sequence, trim it /// so downstream ANSI parsers (e.g. the frontend `ansi-to-react` renderer) /// don't see a half-emitted escape. /// /// Handles the three common ACP-stream cases: /// - CSI (`ESC [ ... final`): terminator is a byte in 0x40..=0x7E after /// the `[` introducer. /// - OSC (`ESC ] ... ST|BEL`): terminator is BEL (0x07) or `ESC \`. /// - Simple two-byte escape (`ESC `): complete as soon as the byte /// following ESC is present. /// /// ESC is ASCII (1 byte), always a valid UTF-8 char boundary, so slicing /// at `esc_pos` cannot produce an invalid UTF-8 string. fn trim_partial_ansi_tail(s: &str) -> &str { let bytes = s.as_bytes(); let Some(esc_pos) = bytes.iter().rposition(|&b| b == 0x1B) else { return s; }; let after = &bytes[esc_pos + 1..]; if after.is_empty() { return &s[..esc_pos]; } let terminated = match after[0] { b'[' => after[1..] .iter() .any(|&b| (0x40..=0x7E).contains(&b)), b']' => after[1..].contains(&0x07) || after[1..].windows(2).any(|w| w[0] == 0x1B && w[1] == b'\\'), // Two-byte escape sequences (ESC M, ESC D, …) are complete as // soon as the second byte is present. _ => true, }; if terminated { s } else { &s[..esc_pos] } } #[derive(Debug, Default)] struct TrackedTerminalToolCall { terminal_ids: Vec, status: Option, terminal_offsets: HashMap, terminal_exit_reported: HashSet, has_emitted_output: bool, missing_polls: u8, } #[derive(Debug, Default)] struct TerminalPollResult { output: Option, append: bool, any_found: bool, all_exited: bool, } fn is_final_tool_call_status(status: Option<&str>) -> bool { matches!(status, Some("completed" | "failed")) } fn merge_terminal_ids(existing: &mut Vec, incoming: Vec) -> bool { let mut changed = false; for terminal_id in incoming { if !existing.iter().any(|id| id == &terminal_id) { existing.push(terminal_id); changed = true; } } changed } fn extract_terminal_ids(content: &[ToolCallContent]) -> Vec { let mut seen = HashSet::new(); let mut terminal_ids = Vec::new(); for item in content { if let ToolCallContent::Terminal(terminal) = item { let terminal_id = terminal.terminal_id.to_string(); if seen.insert(terminal_id.clone()) { terminal_ids.push(terminal_id); } } } terminal_ids } fn track_terminal_tool_calls( update: &SessionUpdate, tracked: &mut HashMap, ) -> bool { match update { SessionUpdate::ToolCall(tc) => { let terminal_ids = extract_terminal_ids(&tc.content); if terminal_ids.is_empty() { return false; } let status = format!("{:?}", tc.status).to_lowercase(); let entry = tracked.entry(tc.tool_call_id.to_string()).or_default(); let changed = merge_terminal_ids(&mut entry.terminal_ids, terminal_ids); entry.status = Some(status); changed } SessionUpdate::ToolCallUpdate(tcu) => { let mut changed = false; let mut should_track = false; let terminal_ids = tcu .fields .content .as_ref() .map(|content| extract_terminal_ids(content)) .unwrap_or_default(); if !terminal_ids.is_empty() { should_track = true; } if tracked.contains_key(&tcu.tool_call_id.to_string()) { should_track = true; } if !should_track { return false; } let entry = tracked.entry(tcu.tool_call_id.to_string()).or_default(); if !terminal_ids.is_empty() { changed = merge_terminal_ids(&mut entry.terminal_ids, terminal_ids); } if let Some(status) = tcu.fields.status { let status_str = format!("{:?}", status).to_lowercase(); if entry.status.as_deref() != Some(status_str.as_str()) { changed = true; } entry.status = Some(status_str); } changed } _ => false, } } fn format_terminal_exit_status(exit_status: &TerminalExitStatus) -> String { let mut parts = Vec::new(); if let Some(code) = exit_status.exit_code { parts.push(format!("exit code: {code}")); } if let Some(signal) = &exit_status.signal { parts.push(format!("signal: {signal}")); } if parts.is_empty() { "finished".to_string() } else { parts.join(", ") } } async fn poll_terminal_tool_call_output( terminal_runtime: &TerminalRuntime, session_id: &SessionId, tracked: &mut TrackedTerminalToolCall, ) -> Result { let mut chunks: Vec = Vec::new(); let mut any_found = false; let mut all_exited = true; let include_headers = tracked.terminal_ids.len() > 1; for terminal_id in &tracked.terminal_ids { let from_offset = tracked.terminal_offsets.get(terminal_id).copied(); let response = match terminal_runtime .terminal_output_delta(session_id.0.as_ref(), terminal_id, from_offset) .await { Ok(response) => response, Err(TerminalRuntimeError::InvalidParams(_)) => continue, Err(err) => return Err(err), }; any_found = true; tracked .terminal_offsets .insert(terminal_id.clone(), response.next_offset); if response.exit_status.is_none() { all_exited = false; } let mut chunk = String::new(); if include_headers { chunk.push_str(&format!("[Terminal: {terminal_id}]\n")); } if response.had_gap { chunk.push_str("[output truncated]\n"); } if !response.output.is_empty() { chunk.push_str(&response.output); if !chunk.ends_with('\n') { chunk.push('\n'); } } if response.truncated && from_offset.is_none() { chunk.push_str("[output truncated]\n"); } if let Some(exit_status) = response.exit_status { if tracked.terminal_exit_reported.insert(terminal_id.clone()) { chunk.push_str(&format!( "[terminal exited: {}]\n", format_terminal_exit_status(&exit_status) )); } } if chunk.ends_with('\n') { chunk.pop(); } if !chunk.is_empty() { chunks.push(chunk); } } if !any_found { all_exited = false; } let append = tracked.has_emitted_output; if !chunks.is_empty() { tracked.has_emitted_output = true; } Ok(TerminalPollResult { output: if chunks.is_empty() { None } else { Some(chunks.join("\n\n")) }, append, any_found, all_exited, }) } fn emit_terminal_output_update( connection_id: &str, emitter: &EventEmitter, tool_call_id: &str, output: String, append: bool, ) { // Safety cap: when a subprocess writes very fast between poll ticks, // the delta produced by `poll_terminal_tool_call_output` can still be // up to ~1 MB (the terminal buffer limit). Enforce the pipeline-wide // single-event cap (with ANSI-safe truncation) before emission so the // WS/IPC fanout never carries a multi-MB payload. let (payload, _append) = build_emit_payload(&output, append); crate::web::event_bridge::emit_event( emitter, "acp://event", AcpEvent::ToolCallUpdate { connection_id: connection_id.into(), tool_call_id: tool_call_id.to_string(), title: None, status: None, content: None, raw_input: None, raw_output: Some(payload), raw_output_append: Some(append), locations: None, meta: None, }, ); } async fn poll_tracked_terminal_tool_calls( terminal_runtime: &TerminalRuntime, session_id: &SessionId, connection_id: &str, emitter: &EventEmitter, tracked: &mut HashMap, ) { if tracked.is_empty() { return; } let tool_call_ids: Vec = tracked.keys().cloned().collect(); let mut remove_ids: Vec = Vec::new(); for tool_call_id in tool_call_ids { let Some(entry) = tracked.get_mut(&tool_call_id) else { continue; }; if entry.terminal_ids.is_empty() { remove_ids.push(tool_call_id.clone()); continue; } let poll_result = match poll_terminal_tool_call_output(terminal_runtime, session_id, entry).await { Ok(result) => result, Err(err) => { eprintln!( "[ACP] Failed to poll terminal output for tool call {}: {:?}", tool_call_id, err ); continue; } }; if poll_result.any_found { entry.missing_polls = 0; } else { entry.missing_polls = entry.missing_polls.saturating_add(1); } if let Some(output) = poll_result.output { emit_terminal_output_update( connection_id, emitter, &tool_call_id, output, poll_result.append, ); } if (is_final_tool_call_status(entry.status.as_deref()) && (!poll_result.any_found || poll_result.all_exited)) || entry.missing_polls >= TERMINAL_POLL_MISSING_LIMIT { remove_ids.push(tool_call_id.clone()); } } for tool_call_id in remove_ids { tracked.remove(&tool_call_id); } } fn map_prompt_blocks(blocks: Vec) -> Vec { blocks .into_iter() .map(|block| match block { PromptInputBlock::Text { text } => ContentBlock::Text(TextContent::new(text)), PromptInputBlock::Image { data, mime_type, uri, } => ContentBlock::Image(ImageContent::new(data, mime_type).uri(uri)), PromptInputBlock::Resource { uri, mime_type, text, blob, } => { let resource = match (text, blob) { (Some(text_value), _) => { let content = TextResourceContents::new(text_value, uri.clone()).mime_type(mime_type); EmbeddedResourceResource::TextResourceContents(content) } (None, Some(blob_value)) => { let content = BlobResourceContents::new(blob_value, uri.clone()).mime_type(mime_type); EmbeddedResourceResource::BlobResourceContents(content) } (None, None) => { let content = TextResourceContents::new("", uri.clone()).mime_type(mime_type); EmbeddedResourceResource::TextResourceContents(content) } }; ContentBlock::Resource(EmbeddedResource::new(resource)) } PromptInputBlock::ResourceLink { uri, name, mime_type, description, } => { let mut link = ResourceLink::new(name, uri); link.mime_type = mime_type; link.description = description; ContentBlock::ResourceLink(link) } }) .collect() } /// Result when the conversation loop exits due to a fork request. struct ForkExitInfo { fork_response: sacp::schema::ForkSessionResponse, original_session_id: String, reply: tokio::sync::oneshot::Sender>, connection: ConnectionTo, } /// After `run_conversation_loop` returns, handle normal exit or fork transition. /// /// When fork is requested, the original session has already been dropped by the /// caller. We attach to the forked session (S2) directly using the /// `ForkSessionResponse` — no separate `session/load` is needed because S2 was /// just created in-memory by the agent on this connection. #[allow(clippy::too_many_arguments)] async fn handle_fork_or_exit( loop_result: Result, sacp::Error>, conn_id: &str, emitter: &EventEmitter, agent_type: AgentType, perms: &PendingPermissions, cmd_rx: &mut mpsc::Receiver, terminal_runtime: Arc, _cwd: &std::path::Path, cwd_string: &str, ) -> Result<(), sacp::Error> { let fork_info = match loop_result { Ok(Some(info)) => info, Ok(None) => return Ok(()), Err(e) => return Err(e), }; let cx = fork_info.connection; let fork_resp = fork_info.fork_response; let new_sid = fork_resp.session_id.0.to_string(); eprintln!( "[ACP] Fork transition: attaching to forked session {} (original: {})", new_sid, fork_info.original_session_id ); // Reply success to the frontend let _ = fork_info.reply.send(Ok(crate::acp::types::ForkResultInfo { forked_session_id: new_sid.clone(), original_session_id: fork_info.original_session_id, })); // Build a NewSessionResponse from the ForkSessionResponse so we can // attach directly — the forked session is already live on this process. let initial_config_options = fork_resp.config_options.clone(); let new_resp = NewSessionResponse::new(fork_resp.session_id) .modes(fork_resp.modes) .config_options(fork_resp.config_options) .meta(fork_resp.meta); let mut session = cx.attach_session(new_resp, Default::default())?; crate::web::event_bridge::emit_event( emitter, "acp://event", AcpEvent::SessionStarted { connection_id: conn_id.to_string(), session_id: new_sid.clone(), }, ); emit_session_modes(conn_id, emitter, session.modes()); emit_session_config_options(conn_id, emitter, agent_type, &initial_config_options); emit_selectors_ready(conn_id, emitter); let loop_result = run_conversation_loop( &mut session, conn_id, emitter, agent_type, perms, cmd_rx, terminal_runtime.clone(), cwd_string, true, // fork already succeeded on this process ) .await; terminal_runtime.release_all_for_session(&new_sid).await; drop(session); // Recursively handle nested forks Box::pin(handle_fork_or_exit( loop_result, conn_id, emitter, agent_type, perms, cmd_rx, terminal_runtime, _cwd, cwd_string, )) .await } /// Main conversation command loop: wait for frontend commands and process them. /// /// Returns `Ok(None)` on normal exit (disconnect / channel closed) or /// `Ok(Some(ForkExitInfo))` when the loop should be restarted on a forked session. #[allow(clippy::too_many_arguments)] async fn run_conversation_loop<'a>( session: &mut sacp::ActiveSession<'a, Agent>, conn_id: &str, emitter: &EventEmitter, agent_type: AgentType, perms: &PendingPermissions, cmd_rx: &mut mpsc::Receiver, terminal_runtime: Arc, cwd: &str, supports_fork: bool, ) -> Result, sacp::Error> { // Session-scoped cache for diffing cumulative `raw_output` snapshots // into incremental deltas. Shared across the idle loop and the active // turn loop so tool calls that span turns stay consistent. let mut raw_output_cache = ToolCallOutputCache::default(); loop { // Wait for either a user command or a session update (e.g. available_commands_update) let cmd = loop { tokio::select! { biased; cmd = cmd_rx.recv() => break cmd, update = session.read_update() => { match update { Ok(SessionMessage::SessionMessage(dispatch)) => { let cid = conn_id.to_string(); let h = emitter.clone(); let cwd_opt = Some(cwd); let dispatch = fix_usage_update_nulls(dispatch); let _ = MatchDispatch::new(dispatch) .if_notification( async |notif: SessionNotification| { emit_conversation_update(&cid, &h, agent_type, notif.update, cwd_opt, &mut raw_output_cache); Ok(()) }, ) .await .otherwise(async |dispatch| { maybe_emit_claude_sdk_ext_notification(&cid, &h, dispatch); Ok(()) }) .await; } Ok(_) => {} Err(e) => { eprintln!("[ACP] Ignoring unrecognized session update in idle loop: {e}"); } } } } }; match cmd { Some(ConnectionCommand::Prompt { blocks }) => { let prompt_blocks = map_prompt_blocks(blocks); if prompt_blocks.is_empty() { crate::web::event_bridge::emit_event( emitter, "acp://event", AcpEvent::Error { connection_id: conn_id.into(), message: "Prompt must contain at least one content block".into(), agent_type: agent_type.to_string(), code: None, }, ); continue; } crate::web::event_bridge::emit_event( emitter, "acp://event", AcpEvent::StatusChanged { connection_id: conn_id.into(), status: ConnectionStatus::Prompting, }, ); // Clone connection and session ID before entering the // select loop so we can send CancelNotification without // conflicting with session.read_update()'s mutable borrow. let cx = session.connection(); let sid = session.session_id().clone(); let prompt_request = PromptRequest::new(sid.clone(), prompt_blocks); // Use Box::pin (heap) instead of tokio::pin! (stack) so the // future can be moved into a background task on cancel. let mut prompt_response = Box::pin( cx.clone() .send_request_to(Agent, prompt_request) .block_task(), ); let mut tracked_terminal_tool_calls: HashMap = HashMap::new(); let mut terminal_poll_interval = tokio::time::interval( std::time::Duration::from_millis(TERMINAL_POLL_INTERVAL_MS), ); terminal_poll_interval .set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); let mut disconnect_requested = false; // Read updates until turn completes. // We must also listen for commands (e.g. RespondPermission) // to avoid deadlocking when the agent awaits a permission response. loop { tokio::select! { update = session.read_update() => { let update = match update { Ok(u) => u, Err(e) => { eprintln!("[ACP] Ignoring unrecognized session update: {e}"); continue; } }; match update { SessionMessage::SessionMessage(dispatch) => { let cid = conn_id.to_string(); let h = emitter.clone(); let runtime = terminal_runtime.clone(); let session_id = sid.clone(); let cwd_opt = Some(cwd); let dispatch = fix_usage_update_nulls(dispatch); if let Err(e) = MatchDispatch::new(dispatch) .if_notification( async |notif: SessionNotification| { let should_poll_now = track_terminal_tool_calls( ¬if.update, &mut tracked_terminal_tool_calls, ); emit_conversation_update(&cid, &h, agent_type, notif.update, cwd_opt, &mut raw_output_cache); if should_poll_now { poll_tracked_terminal_tool_calls( runtime.as_ref(), &session_id, &cid, &h, &mut tracked_terminal_tool_calls, ) .await; } Ok(()) }, ) .await .otherwise(async |dispatch| { maybe_emit_claude_sdk_ext_notification(&cid, &h, dispatch); Ok(()) }) .await { eprintln!("[ACP] Ignoring dispatch parse error: {e}"); } } SessionMessage::StopReason(reason) => { if !tracked_terminal_tool_calls.is_empty() { poll_tracked_terminal_tool_calls( terminal_runtime.as_ref(), &sid, conn_id, emitter, &mut tracked_terminal_tool_calls, ) .await; } let reason_str = match reason { StopReason::EndTurn => "end_turn", StopReason::Cancelled => "cancelled", _ => "unknown", }; crate::web::event_bridge::emit_event( emitter, "acp://event", AcpEvent::TurnComplete { connection_id: conn_id.into(), session_id: sid.0.to_string(), stop_reason: reason_str.into(), agent_type: agent_type.to_string(), }, ); break; } _ => {} } } prompt_result = &mut prompt_response => { let reason = prompt_result?.stop_reason; if !tracked_terminal_tool_calls.is_empty() { poll_tracked_terminal_tool_calls( terminal_runtime.as_ref(), &sid, conn_id, emitter, &mut tracked_terminal_tool_calls, ) .await; } let reason_str = match reason { StopReason::EndTurn => "end_turn", StopReason::Cancelled => "cancelled", _ => "unknown", }; crate::web::event_bridge::emit_event( emitter, "acp://event", AcpEvent::TurnComplete { connection_id: conn_id.into(), session_id: sid.0.to_string(), stop_reason: reason_str.into(), agent_type: agent_type.to_string(), }, ); break; } _ = terminal_poll_interval.tick(), if !tracked_terminal_tool_calls.is_empty() => { poll_tracked_terminal_tool_calls( terminal_runtime.as_ref(), &sid, conn_id, emitter, &mut tracked_terminal_tool_calls, ) .await; } cmd = cmd_rx.recv() => { match cmd { Some(ConnectionCommand::RespondPermission { request_id, option_id, }) => { if let Some(responder) = perms.lock().await.remove(&request_id) { let outcome = RequestPermissionOutcome::Selected( SelectedPermissionOutcome::new(option_id), ); let _ = responder.respond(RequestPermissionResponse::new(outcome)); } } Some(ConnectionCommand::SetMode { mode_id }) => { let req = SetSessionModeRequest::new(sid.clone(), mode_id.clone()); match cx.send_request_to(Agent, req).block_task().await { Ok(_) => { crate::web::event_bridge::emit_event( emitter, "acp://event", AcpEvent::ModeChanged { connection_id: conn_id.into(), mode_id, }, ); } Err(e) => { crate::web::event_bridge::emit_event( emitter, "acp://event", AcpEvent::Error { connection_id: conn_id.into(), message: format!("Failed to set mode: {e}"), agent_type: agent_type.to_string(), code: None, }, ); } } } Some(ConnectionCommand::SetConfigOption { config_id, value_id, }) => { if let Err(e) = set_session_config_option( &cx, &sid, conn_id, emitter, agent_type, config_id, value_id, ) .await { crate::web::event_bridge::emit_event( emitter, "acp://event", AcpEvent::Error { connection_id: conn_id.into(), message: format!("Failed to set config option: {e}"), agent_type: agent_type.to_string(), code: None, }, ); } } Some(ConnectionCommand::Cancel) => { // Send CancelNotification to agent to stop the current turn let _ = cx.send_notification_to( Agent, CancelNotification::new(sid.clone()), ); // Also terminate any command runtimes created for this // session so cancellation does not hang on long-running // terminal tools. terminal_runtime .release_all_for_session(sid.0.as_ref()) .await; tracked_terminal_tool_calls.clear(); // Also cancel any pending permission requests let mut locked = perms.lock().await; for (_, responder) in locked.drain() { let _ = responder.respond(RequestPermissionResponse::new( RequestPermissionOutcome::Cancelled, )); } // Immediately emit TurnComplete so the frontend // transitions out of "prompting" and the user can // send new messages. Don't wait for the agent -- // it may be slow to respond or not respond at all. crate::web::event_bridge::emit_event( emitter, "acp://event", AcpEvent::TurnComplete { connection_id: conn_id.into(), session_id: sid.0.to_string(), stop_reason: "cancelled".into(), agent_type: agent_type.to_string(), }, ); // Drain the prompt response in the background so // the SACP library doesn't log "receiver dropped" // errors when the agent eventually responds. tokio::spawn(async move { let _ = prompt_response.await; }); break; } Some(ConnectionCommand::Disconnect) | None => { eprintln!( "[ACP] disconnect requested during prompting; connection_id={conn_id}" ); let _ = cx.send_notification_to( Agent, CancelNotification::new(sid.clone()), ); terminal_runtime .release_all_for_session(sid.0.as_ref()) .await; tracked_terminal_tool_calls.clear(); let mut locked = perms.lock().await; for (_, responder) in locked.drain() { let _ = responder.respond(RequestPermissionResponse::new( RequestPermissionOutcome::Cancelled, )); } disconnect_requested = true; break; } _ => {} } } } } if disconnect_requested { eprintln!( "[ACP] closing connection loop after disconnect; connection_id={conn_id}" ); break; } crate::web::event_bridge::emit_event( emitter, "acp://event", AcpEvent::StatusChanged { connection_id: conn_id.into(), status: ConnectionStatus::Connected, }, ); } Some(ConnectionCommand::RespondPermission { request_id, option_id, }) => { if let Some(responder) = perms.lock().await.remove(&request_id) { let outcome = RequestPermissionOutcome::Selected( SelectedPermissionOutcome::new(option_id), ); let _ = responder.respond(RequestPermissionResponse::new(outcome)); } } Some(ConnectionCommand::SetMode { mode_id }) => { if let Err(e) = set_session_mode(session, conn_id, emitter, mode_id).await { crate::web::event_bridge::emit_event( emitter, "acp://event", AcpEvent::Error { connection_id: conn_id.into(), message: format!("Failed to set mode: {e}"), agent_type: agent_type.to_string(), code: None, }, ); } } Some(ConnectionCommand::SetConfigOption { config_id, value_id, }) => { let cx = session.connection(); let sid = session.session_id().clone(); if let Err(e) = set_session_config_option( &cx, &sid, conn_id, emitter, agent_type, config_id, value_id, ) .await { crate::web::event_bridge::emit_event( emitter, "acp://event", AcpEvent::Error { connection_id: conn_id.into(), message: format!("Failed to set config option: {e}"), agent_type: agent_type.to_string(), code: None, }, ); } } Some(ConnectionCommand::Cancel) => { let cx = session.connection(); let sid = session.session_id().clone(); let _ = cx.send_notification_to(Agent, CancelNotification::new(sid.clone())); terminal_runtime .release_all_for_session(sid.0.as_ref()) .await; let mut locked = perms.lock().await; for (_, responder) in locked.drain() { let _ = responder.respond(RequestPermissionResponse::new( RequestPermissionOutcome::Cancelled, )); } } Some(ConnectionCommand::Fork { reply }) => { if !supports_fork { let _ = reply.send(Err(AcpError::protocol( "This agent does not support session/fork".to_string(), ))); continue; } let cx = session.connection(); let sid = session.session_id().clone(); eprintln!( "[ACP] Sending session/fork for session_id={} cwd={}", sid.0, cwd ); let result = crate::acp::fork::fork_session(&cx, &sid, cwd).await; match result { Ok(fork_response) => { eprintln!( "[ACP] Fork succeeded: new_session_id={}", fork_response.session_id.0 ); return Ok(Some(ForkExitInfo { fork_response, original_session_id: sid.0.to_string(), reply, connection: cx, })); } Err(e) => { eprintln!("[ACP] Fork failed: {e}"); let _ = reply.send(Err(e)); } } } Some(ConnectionCommand::Disconnect) | None => { break; } } } Ok(None) } /// Serialize a Vec into a human-readable text string. fn serialize_tool_call_content(content: &[ToolCallContent]) -> Option { let mut parts: Vec = Vec::new(); for item in content { match item { ToolCallContent::Content(c) => { if let ContentBlock::Text(text) = &c.content { parts.push(text.text.clone()); } } ToolCallContent::Diff(diff) => { let path = diff.path.display(); let mut diff_text = format!("--- {path}\n+++ {path}\n"); if let Some(old) = &diff.old_text { for line in old.lines() { diff_text.push_str(&format!("-{line}\n")); } } for line in diff.new_text.lines() { diff_text.push_str(&format!("+{line}\n")); } parts.push(diff_text); } ToolCallContent::Terminal(t) => { parts.push(format!("[Terminal: {}]", t.terminal_id)); } _ => {} } } if parts.is_empty() { None } else { Some(parts.join("\n")) } } /// If the output looks like numbered lines (` 115→content`), strip them /// and return `{"start_line":N,"content":"..."}` — same as the historical path. fn structurize_live_output(text: &str) -> String { if let Some(json) = crate::parsers::strip_numbered_lines(text) { return json; } text.to_string() } /// Resolve line numbers for live tool call input. /// /// Resolve line numbers for live tool call input (string form). /// /// - For apply_patch with bare `@@`: resolve line numbers in place. /// - For canonical edit JSON: inject `_start_line`. fn resolve_live_tool_input(text: &str, cwd: Option<&str>) -> String { if text.contains("@@\n") || text.contains("@@\r\n") { if let Some(resolved) = crate::parsers::resolve_patch_text(text, cwd) { return resolved; } } if let Ok(mut parsed) = serde_json::from_str::(text) { if inject_start_line(&mut parsed, cwd) { return parsed.to_string(); } } text.to_string() } /// Try to inject `_start_line` into a JSON object with `file_path` + `old_string`. /// Returns true if injected. fn inject_start_line(value: &mut serde_json::Value, cwd: Option<&str>) -> bool { let obj = match value.as_object_mut() { Some(o) => o, None => return false, }; let fp = obj .get("file_path") .or_else(|| obj.get("path")) .and_then(|v| v.as_str()) .map(|s| s.to_string()); let old_str = obj .get("old_string") .and_then(|v| v.as_str()) .map(|s| s.to_string()); if let (Some(fp), Some(old_str)) = (fp, old_str) { if let Some(sl) = find_string_start_line(&fp, &old_str, cwd) { obj.insert("_start_line".to_string(), serde_json::json!(sl)); return true; } } false } /// Find the 1-based start line of `needle` in the file at `path`. fn find_string_start_line(path: &str, needle: &str, cwd: Option<&str>) -> Option { if needle.is_empty() { return None; } let file_lines = crate::parsers::load_file_lines(path, cwd)?; let file_content = file_lines.join("\n"); let byte_offset = file_content.find(needle)?; Some(file_content[..byte_offset].matches('\n').count() as u64 + 1) } fn json_value_to_text(val: &Option) -> Option { match val { Some(serde_json::Value::String(text)) => Some(text.clone()), Some(v) if !v.is_null() => Some(v.to_string()), _ => None, } } fn map_plan_priority(priority: &PlanEntryPriority) -> String { match priority { PlanEntryPriority::High => "high", PlanEntryPriority::Medium => "medium", PlanEntryPriority::Low => "low", _ => "unknown", } .to_string() } fn map_plan_status(status: &PlanEntryStatus) -> String { match status { PlanEntryStatus::Pending => "pending", PlanEntryStatus::InProgress => "in_progress", PlanEntryStatus::Completed => "completed", _ => "unknown", } .to_string() } fn map_plan_entries(plan: &Plan) -> Vec { plan.entries .iter() .map(|entry| PlanEntryInfo { content: entry.content.clone(), priority: map_plan_priority(&entry.priority), status: map_plan_status(&entry.status), }) .collect() } fn parse_claude_sdk_message_params( params: &serde_json::Value, ) -> Option<(String, serde_json::Value)> { let obj = params.as_object()?; let session_id = obj.get("sessionId")?.as_str()?.to_string(); let message = obj.get("message")?.clone(); Some((session_id, message)) } fn is_claude_api_retry_message(message: &serde_json::Value) -> bool { let obj = match message.as_object() { Some(obj) => obj, None => return false, }; let message_type = obj.get("type").and_then(|v| v.as_str()); let message_subtype = obj.get("subtype").and_then(|v| v.as_str()); matches!(message_type, Some("system")) && matches!(message_subtype, Some("api_retry")) } fn map_claude_sdk_ext_notification( connection_id: &str, notification: &UntypedMessage, ) -> Option { if notification.method() != "_claude/sdkMessage" { return None; } let (session_id, message) = parse_claude_sdk_message_params(notification.params())?; if !is_claude_api_retry_message(&message) { return None; } Some(AcpEvent::ClaudeSdkMessage { connection_id: connection_id.to_string(), session_id, message, }) } fn maybe_emit_claude_sdk_ext_notification( connection_id: &str, emitter: &EventEmitter, dispatch: Dispatch, ) { let Dispatch::Notification(notification) = dispatch else { return; }; if let Some(event) = map_claude_sdk_ext_notification(connection_id, ¬ification) { crate::web::event_bridge::emit_event(emitter, "acp://event", event); } } /// Fix null fields in `usage_update` notifications that would otherwise fail deserialization. /// /// Some ACP agents send `"used": null` in usage_update notifications, but the /// upstream schema expects `u64`. This function patches the raw JSON params /// so that `null` numeric fields default to `0`. fn fix_usage_update_nulls(mut dispatch: Dispatch) -> Dispatch { if let Dispatch::Notification(ref mut msg) = dispatch { if let Some(update) = msg.params.get_mut("update") { if update.get("sessionUpdate").and_then(|v| v.as_str()) == Some("usage_update") { if update.get("used").map(|v| v.is_null()).unwrap_or(false) { update["used"] = serde_json::Value::from(0u64); } if update.get("size").map(|v| v.is_null()).unwrap_or(false) { update["size"] = serde_json::Value::from(0u64); } } } } dispatch } /// Convert a SessionUpdate into AcpEvent(s) and emit to frontend. /// /// `raw_output_cache` is a per-session cache used to detect cumulative /// snapshots from agents and convert them into incremental deltas so the /// event pipeline never carries a full N-MB tool output more than once. fn emit_conversation_update( connection_id: &str, emitter: &EventEmitter, agent_type: AgentType, update: SessionUpdate, cwd: Option<&str>, raw_output_cache: &mut ToolCallOutputCache, ) { match update { SessionUpdate::UserMessageChunk(_) => { // User echo chunks are informational for transcript sync and // currently not rendered in live ACP UI. } SessionUpdate::AgentMessageChunk(ContentChunk { content: ContentBlock::Text(text), .. }) => { crate::web::event_bridge::emit_event( emitter, "acp://event", AcpEvent::ContentDelta { connection_id: connection_id.into(), text: text.text, }, ); } SessionUpdate::AgentMessageChunk(_) => { // Non-text chunks are currently not surfaced in live streaming UI. } SessionUpdate::AgentThoughtChunk(ContentChunk { content: ContentBlock::Text(text), .. }) => { crate::web::event_bridge::emit_event( emitter, "acp://event", AcpEvent::Thinking { connection_id: connection_id.into(), text: text.text, }, ); } SessionUpdate::AgentThoughtChunk(_) => { // Non-text thought chunks are currently ignored. } SessionUpdate::ToolCall(tc) => { let tool_call_id = tc.tool_call_id.to_string(); let content = serialize_tool_call_content(&tc.content); let raw_input = json_value_to_text(&tc.raw_input).map(|text| resolve_live_tool_input(&text, cwd)); // Initial tool_call notification — the frontend reducer // treats `raw_output` as a full replacement, so we bypass // the diff path and seed the cache with the current snapshot. let raw_output = json_value_to_text(&tc.raw_output) .map(|text| structurize_live_output(&text)) .and_then(|text| raw_output_cache.seed(&tool_call_id, &text)); let locations = if tc.locations.is_empty() { None } else { serde_json::to_value(&tc.locations).ok() }; let meta = tc.meta.map(serde_json::Value::Object); let status = format!("{:?}", tc.status).to_lowercase(); raw_output_cache.remove_if_final(&tool_call_id, Some(status.as_str())); crate::web::event_bridge::emit_event( emitter, "acp://event", AcpEvent::ToolCall { connection_id: connection_id.into(), tool_call_id, title: tc.title, kind: format!("{:?}", tc.kind).to_lowercase(), status, content, raw_input, raw_output, locations, meta, }, ); } SessionUpdate::ToolCallUpdate(tcu) => { let tool_call_id = tcu.tool_call_id.to_string(); let content = tcu .fields .content .as_deref() .and_then(serialize_tool_call_content); let raw_input = json_value_to_text(&tcu.fields.raw_input) .map(|text| resolve_live_tool_input(&text, cwd)); // Diff the incoming raw_output against the last snapshot we // emitted for this tool call. This turns cumulative snapshots // from agents (Claude Code, Codex, …) into incremental deltas // with `raw_output_append=true`, collapsing the O(N²) transfer // problem to O(N) while capping any single emitted chunk to // MAX_SINGLE_EMIT_BYTES. let raw_output_text = json_value_to_text(&tcu.fields.raw_output) .map(|text| structurize_live_output(&text)); let (raw_output, raw_output_append) = match raw_output_text { Some(text) => match raw_output_cache.consume(&tool_call_id, &text) { Some((payload, append)) => (Some(payload), Some(append)), None => (None, None), }, None => (None, None), }; let locations = tcu .fields .locations .as_ref() .filter(|l| !l.is_empty()) .and_then(|l| serde_json::to_value(l).ok()); let meta = tcu.meta.clone().map(serde_json::Value::Object); let status = tcu.fields.status.map(|s| format!("{:?}", s).to_lowercase()); raw_output_cache.remove_if_final(&tool_call_id, status.as_deref()); crate::web::event_bridge::emit_event( emitter, "acp://event", AcpEvent::ToolCallUpdate { connection_id: connection_id.into(), tool_call_id, title: tcu.fields.title, status, content, raw_input, raw_output, raw_output_append, locations, meta, }, ); } SessionUpdate::CurrentModeUpdate(update) => { crate::web::event_bridge::emit_event( emitter, "acp://event", AcpEvent::ModeChanged { connection_id: connection_id.into(), mode_id: update.current_mode_id.to_string(), }, ); } SessionUpdate::Plan(plan) => { crate::web::event_bridge::emit_event( emitter, "acp://event", AcpEvent::PlanUpdate { connection_id: connection_id.into(), entries: map_plan_entries(&plan), }, ); } SessionUpdate::ConfigOptionUpdate(update) => { emit_session_config_options_values( connection_id, emitter, agent_type, update.config_options, ); } SessionUpdate::AvailableCommandsUpdate(update) => { // Some agents (e.g. Claude Code with overlapping user/project slash // commands) emit duplicate entries sharing the same name. Keep the // first occurrence so downstream consumers don't render duplicates; // the frontend reducer also dedupes as a defensive measure. let mut seen = HashSet::new(); let commands: Vec = update .available_commands .iter() .filter(|cmd| seen.insert(cmd.name.clone())) .map(|cmd| { let input_hint = cmd.input.as_ref().map(|input| match input { sacp::schema::AvailableCommandInput::Unstructured(u) => u.hint.clone(), _ => String::new(), }); AvailableCommandInfo { name: cmd.name.clone(), description: cmd.description.clone(), input_hint, } }) .collect(); crate::web::event_bridge::emit_event( emitter, "acp://event", AcpEvent::AvailableCommands { connection_id: connection_id.into(), commands, }, ); } SessionUpdate::UsageUpdate(update) => { crate::web::event_bridge::emit_event( emitter, "acp://event", AcpEvent::UsageUpdate { connection_id: connection_id.into(), used: update.used, size: update.size, }, ); } other => { // Log unhandled update types for debugging eprintln!("[ACP] Unhandled SessionUpdate: {:?}", other); } } } #[cfg(test)] mod tests { use super::*; #[test] fn claude_raw_sdk_meta_enabled_only_for_claude() { let claude_meta = claude_raw_sdk_session_meta(AgentType::ClaudeCode) .expect("Claude must have raw SDK meta"); assert_eq!( claude_meta .get("claudeCode") .and_then(|v| v.get("emitRawSDKMessages")) .and_then(|v| v.as_bool()), Some(true) ); assert!(claude_raw_sdk_session_meta(AgentType::Codex).is_none()); } #[test] fn map_claude_sdk_ext_notification_maps_valid_payload() { let raw = UntypedMessage::new( "_claude/sdkMessage", serde_json::json!({ "sessionId": "session-123", "message": { "type": "system", "subtype": "api_retry", "attempt": 3, "max_retries": 10 } }), ) .unwrap(); let event = map_claude_sdk_ext_notification("conn-1", &raw).expect("valid sdk payload should map"); match event { AcpEvent::ClaudeSdkMessage { connection_id, session_id, message, } => { assert_eq!(connection_id, "conn-1"); assert_eq!(session_id, "session-123"); assert_eq!(message.get("type").and_then(|v| v.as_str()), Some("system")); } _ => panic!("expected ClaudeSdkMessage"), } } #[test] fn map_claude_sdk_ext_notification_rejects_non_api_retry() { let non_retry = UntypedMessage::new( "_claude/sdkMessage", serde_json::json!({ "sessionId": "session-123", "message": {"type": "system", "subtype": "status"} }), ) .unwrap(); assert!(map_claude_sdk_ext_notification("conn-1", &non_retry).is_none()); } #[test] fn map_claude_sdk_ext_notification_rejects_invalid_payload() { let wrong_method = UntypedMessage::new( "_other/method", serde_json::json!({"sessionId": "s", "message": {}}), ) .unwrap(); assert!(map_claude_sdk_ext_notification("conn-1", &wrong_method).is_none()); let missing_fields = UntypedMessage::new("_claude/sdkMessage", serde_json::json!({"sessionId": 1})).unwrap(); assert!(map_claude_sdk_ext_notification("conn-1", &missing_fields).is_none()); } #[test] fn build_new_session_request_sets_claude_raw_meta() { let cwd = std::path::PathBuf::from("/tmp/codeg"); let req = build_new_session_request(AgentType::ClaudeCode, &cwd); assert_eq!( req.meta .as_ref() .and_then(|m| m.get("claudeCode")) .and_then(|v| v.get("emitRawSDKMessages")) .and_then(|v| v.as_bool()), Some(true) ); } #[test] fn build_load_session_request_skips_meta_for_non_claude() { let cwd = std::path::PathBuf::from("/tmp/codeg"); let req = build_load_session_request(AgentType::Codex, SessionId::new("abc".to_string()), &cwd); assert!(req.meta.is_none()); } // ─── ToolCallOutputCache ──────────────────────────────────────────── #[test] fn cache_first_update_emits_full_replace() { let mut cache = ToolCallOutputCache::default(); let (payload, append) = cache.consume("t1", "hello world").expect("should emit"); assert_eq!(payload, "hello world"); assert!(!append, "first emit must be replacement"); } #[test] fn cache_repeated_identical_snapshot_is_noop() { let mut cache = ToolCallOutputCache::default(); cache.consume("t1", "same").unwrap(); assert!( cache.consume("t1", "same").is_none(), "identical snapshot must not emit" ); } #[test] fn cache_prefix_extension_emits_suffix_with_append() { let mut cache = ToolCallOutputCache::default(); cache.consume("t1", "line-1\n").unwrap(); let (payload, append) = cache .consume("t1", "line-1\nline-2\n") .expect("should emit"); assert_eq!(payload, "line-2\n"); assert!(append, "prefix extension must emit with append=true"); } #[test] fn cache_divergent_snapshot_falls_back_to_replace() { let mut cache = ToolCallOutputCache::default(); cache.consume("t1", "hello world").unwrap(); let (payload, append) = cache.consume("t1", "foo bar baz").expect("should emit"); assert_eq!(payload, "foo bar baz"); assert!(!append, "non-extension snapshot must replace"); } #[test] fn cache_tracks_extensions_past_cached_tail_boundary() { // Regression test for the original bug: when cumulative raw_output // exceeds MAX_CACHED_TAIL_BYTES, subsequent extensions must still be // detectable by comparing the cached tail against the expected // offset in the incoming snapshot. let mut cache = ToolCallOutputCache::default(); // First snapshot: 10 KB of 'a' + unique 4 KB marker at the end. let prefix = "a".repeat(10 * 1024); let marker = "M".repeat(4 * 1024); let first = format!("{prefix}{marker}"); cache.consume("t1", &first).unwrap(); // Second snapshot extends first by 16 KB of 'Z'. let delta = "Z".repeat(16 * 1024); let second = format!("{first}{delta}"); let (payload, append) = cache.consume("t1", &second).expect("should emit"); assert!(append, "extension beyond cached tail must still be detected"); // The emitted payload should carry the delta (or its tail when // truncated at MAX_SINGLE_EMIT_BYTES). For a 16 KB delta that's // well below the 64 KB cap, we expect it verbatim. assert_eq!(payload, delta); } #[test] fn cache_extension_larger_than_emit_cap_gets_truncated() { let mut cache = ToolCallOutputCache::default(); cache.consume("t1", "seed").unwrap(); // Build a delta much larger than MAX_SINGLE_EMIT_BYTES. let big_delta = "X".repeat(MAX_SINGLE_EMIT_BYTES * 2); let second = format!("seed{big_delta}"); let (payload, append) = cache.consume("t1", &second).expect("should emit"); assert!(append); assert!( payload.starts_with(TRUNCATION_MARKER), "oversized delta must be prefixed with truncation marker" ); // Payload length: marker + at most MAX_SINGLE_EMIT_BYTES of tail. assert!(payload.len() <= TRUNCATION_MARKER.len() + MAX_SINGLE_EMIT_BYTES); } #[test] fn cache_respects_utf8_char_boundary_on_truncation() { let mut cache = ToolCallOutputCache::default(); // Single first-update whose byte length forces truncation at a // position that would otherwise fall mid-codepoint. 中 is 3 bytes // (E4 B8 AD) and MAX_SINGLE_EMIT_BYTES (65536) is not a multiple // of 3, so naïve byte slicing would land mid-char. let chinese_block = "中".repeat((MAX_SINGLE_EMIT_BYTES / 3) + 100); let (payload, _append) = cache.consume("t1", &chinese_block).expect("should emit"); // Payload must start with the truncation marker (since size > cap). assert!( payload.starts_with(TRUNCATION_MARKER), "oversized snapshot must be truncated" ); // Body after the marker must be valid UTF-8 consisting only of 中. let body = &payload[TRUNCATION_MARKER.len()..]; assert!(!body.is_empty()); assert!( body.chars().all(|c| c == '中'), "truncation boundary must land on a UTF-8 codepoint edge" ); } #[test] fn cache_final_status_clears_entry() { let mut cache = ToolCallOutputCache::default(); cache.consume("t1", "hello").unwrap(); assert!(cache.entries.contains_key("t1")); cache.remove_if_final("t1", Some("completed")); assert!(!cache.entries.contains_key("t1")); cache.consume("t2", "x").unwrap(); cache.remove_if_final("t2", Some("cancelled")); assert!(!cache.entries.contains_key("t2")); cache.consume("t3", "x").unwrap(); cache.remove_if_final("t3", Some("in_progress")); assert!( cache.entries.contains_key("t3"), "in-progress status must not clear cache" ); } #[test] fn cache_enforces_entry_cap_via_fifo_eviction() { let mut cache = ToolCallOutputCache::default(); for i in 0..(MAX_CACHE_ENTRIES + 50) { cache.consume(&format!("tool-{i}"), "body").unwrap(); } assert_eq!(cache.entries.len(), MAX_CACHE_ENTRIES); // Oldest entries should have been evicted; newest must still exist. assert!(!cache.entries.contains_key("tool-0")); assert!(cache .entries .contains_key(&format!("tool-{}", MAX_CACHE_ENTRIES + 49))); } #[test] fn cache_seed_always_replaces_and_caches() { let mut cache = ToolCallOutputCache::default(); cache.consume("t1", "stale").unwrap(); // A hypothetical replay would send another ToolCall for the same // id — seed() must install the new snapshot without trying to // diff against the stale prior entry. let payload = cache.seed("t1", "fresh").expect("seed emits"); assert_eq!(payload, "fresh"); // Next consume should diff against "fresh", not "stale". let (p2, append) = cache.consume("t1", "fresh+more").expect("emit"); assert!(append, "should detect extension of freshly seeded entry"); assert_eq!(p2, "+more"); } // ─── trim_partial_ansi_tail ───────────────────────────────────────── #[test] fn ansi_trim_leaves_pure_text_unchanged() { assert_eq!(trim_partial_ansi_tail("plain text"), "plain text"); } #[test] fn ansi_trim_keeps_completed_sequences() { let s = "\x1b[31mRED\x1b[0m done"; assert_eq!(trim_partial_ansi_tail(s), s); } #[test] fn ansi_trim_cuts_unterminated_trailing_sequence() { let s = "hello \x1b[31"; assert_eq!(trim_partial_ansi_tail(s), "hello "); } #[test] fn ansi_trim_handles_bare_escape_at_end() { let s = "hello\x1b"; assert_eq!(trim_partial_ansi_tail(s), "hello"); } // ─── truncate_tail_at_char_boundary ───────────────────────────────── #[test] fn truncate_under_cap_returns_as_is() { assert_eq!(truncate_tail_at_char_boundary("abc", 10), "abc"); } #[test] fn truncate_returns_tail_on_overflow() { assert_eq!(truncate_tail_at_char_boundary("abcdef", 3), "def"); } #[test] fn truncate_respects_multibyte_utf8_boundary() { // "中中中" is 9 bytes; asking for 4 bytes would land mid-char. let s = "中中中"; let out = truncate_tail_at_char_boundary(s, 4); // Must be valid UTF-8 (indexing an invalid boundary would have // panicked at slicing time). assert!(out.chars().all(|c| c == '中')); assert!(out.len() <= 6); // at most 2 chars (6 bytes) } }