|
|
|
|
@@ -986,12 +986,19 @@ async fn run_connection(
|
|
|
|
|
notif.update,
|
|
|
|
|
SessionUpdate::AvailableCommandsUpdate(_)
|
|
|
|
|
) {
|
|
|
|
|
// Historical-replay path only
|
|
|
|
|
// forwards AvailableCommandsUpdate,
|
|
|
|
|
// which never carries tool output
|
|
|
|
|
// — a throwaway cache is fine.
|
|
|
|
|
let mut replay_cache =
|
|
|
|
|
ToolCallOutputCache::default();
|
|
|
|
|
emit_conversation_update(
|
|
|
|
|
&cid,
|
|
|
|
|
&h,
|
|
|
|
|
agent_type,
|
|
|
|
|
notif.update,
|
|
|
|
|
None,
|
|
|
|
|
&mut replay_cache,
|
|
|
|
|
);
|
|
|
|
|
}
|
|
|
|
|
Ok(())
|
|
|
|
|
@@ -1346,6 +1353,236 @@ async fn set_session_config_option(
|
|
|
|
|
const TERMINAL_POLL_INTERVAL_MS: u64 = 200;
|
|
|
|
|
const TERMINAL_POLL_MISSING_LIMIT: u8 = 10;
|
|
|
|
|
|
|
|
|
|
/// Hard cap on the size of a single ACP event's `raw_output` payload.
|
|
|
|
|
///
|
|
|
|
|
/// Agents (e.g. Claude Code, Codex) frequently send `tool_call_update`
|
|
|
|
|
/// notifications where `raw_output` is the **full accumulated** tool output
|
|
|
|
|
/// rather than an incremental delta. For long-running terminal tools this
|
|
|
|
|
/// leads to O(N²) bytes flowing through the event pipeline and multi-GB
|
|
|
|
|
/// transient allocations (serde_json Value trees, IPC buffers, broadcast
|
|
|
|
|
/// channel backlog). This constant caps any single emitted chunk so the
|
|
|
|
|
/// pipeline never sees a multi-MB event.
|
|
|
|
|
const MAX_SINGLE_EMIT_BYTES: usize = 64 * 1024;
|
|
|
|
|
|
|
|
|
|
/// Byte length of the tail we retain per tool-call to verify that the next
|
|
|
|
|
/// incoming snapshot is a cumulative extension of the previous one. Small
|
|
|
|
|
/// enough to keep the cache bounded even in pathological sessions, large
|
|
|
|
|
/// enough that a matching tail is an extremely unlikely coincidence.
|
|
|
|
|
const MAX_CACHED_TAIL_BYTES: usize = 8 * 1024;
|
|
|
|
|
|
|
|
|
|
/// Hard cap on the number of tool-call entries the cache retains. Prevents
|
|
|
|
|
/// unbounded growth in long sessions where agents forget to mark tool calls
|
|
|
|
|
/// as completed. Entries are evicted FIFO by generation counter.
|
|
|
|
|
const MAX_CACHE_ENTRIES: usize = 256;
|
|
|
|
|
|
|
|
|
|
/// Prefix used when an emitted chunk had to be truncated.
|
|
|
|
|
const TRUNCATION_MARKER: &str = "[...truncated...]\n";
|
|
|
|
|
|
|
|
|
|
#[derive(Debug)]
|
|
|
|
|
struct CachedOutput {
|
|
|
|
|
/// Total byte length of the last observed `raw_output`.
|
|
|
|
|
total_len: usize,
|
|
|
|
|
/// Tail of the last observed `raw_output`, up to `MAX_CACHED_TAIL_BYTES`
|
|
|
|
|
/// bytes. Always aligned to a UTF-8 character boundary at the start.
|
|
|
|
|
tail: String,
|
|
|
|
|
/// Monotonic insertion/update tick used for FIFO eviction.
|
|
|
|
|
generation: u64,
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Per-session cache of the last `raw_output` fingerprint emitted for each
|
|
|
|
|
/// tool call. Enables delta detection: when an agent sends cumulative
|
|
|
|
|
/// snapshots, we forward only the suffix (with `raw_output_append=true`)
|
|
|
|
|
/// and keep the fingerprint bounded so it works even when the full output
|
|
|
|
|
/// grows into the multi-MB range.
|
|
|
|
|
#[derive(Debug, Default)]
|
|
|
|
|
struct ToolCallOutputCache {
|
|
|
|
|
entries: HashMap<String, CachedOutput>,
|
|
|
|
|
next_generation: u64,
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
impl ToolCallOutputCache {
|
|
|
|
|
/// Diff an incoming full `raw_output` snapshot for `tool_call_id` against
|
|
|
|
|
/// the cache and return what should be emitted downstream.
|
|
|
|
|
///
|
|
|
|
|
/// Returns `None` when the incoming snapshot is identical to the
|
|
|
|
|
/// previously emitted one (nothing to send). Otherwise returns
|
|
|
|
|
/// `(payload, append)` where:
|
|
|
|
|
/// - `append=true` — `payload` is a (possibly truncated) suffix delta;
|
|
|
|
|
/// the frontend should append it to the existing chunks.
|
|
|
|
|
/// - `append=false` — `payload` is a (possibly truncated) replacement
|
|
|
|
|
/// for the full tool output; the frontend should reset chunks.
|
|
|
|
|
fn consume(&mut self, tool_call_id: &str, curr: &str) -> Option<(String, bool)> {
|
|
|
|
|
let curr_len = curr.len();
|
|
|
|
|
|
|
|
|
|
let decision: Option<(String, bool)> = match self.entries.get(tool_call_id) {
|
|
|
|
|
Some(prev) if curr_len >= prev.total_len && self.is_extension_of(prev, curr) => {
|
|
|
|
|
if curr_len == prev.total_len {
|
|
|
|
|
// Identical output — nothing to emit. Cache stays fresh.
|
|
|
|
|
return None;
|
|
|
|
|
}
|
|
|
|
|
let suffix = &curr[prev.total_len..];
|
|
|
|
|
Some(build_emit_payload(suffix, true))
|
|
|
|
|
}
|
|
|
|
|
_ => Some(build_emit_payload(curr, false)),
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
// Update cache snapshot to current state so the next update can
|
|
|
|
|
// still detect a prefix extension.
|
|
|
|
|
let tail = trim_partial_ansi_tail(truncate_tail_at_char_boundary(
|
|
|
|
|
curr,
|
|
|
|
|
MAX_CACHED_TAIL_BYTES,
|
|
|
|
|
))
|
|
|
|
|
.to_string();
|
|
|
|
|
let generation = self.next_generation;
|
|
|
|
|
self.next_generation = self.next_generation.wrapping_add(1);
|
|
|
|
|
self.entries.insert(
|
|
|
|
|
tool_call_id.to_string(),
|
|
|
|
|
CachedOutput {
|
|
|
|
|
total_len: curr_len,
|
|
|
|
|
tail,
|
|
|
|
|
generation,
|
|
|
|
|
},
|
|
|
|
|
);
|
|
|
|
|
self.enforce_entry_cap();
|
|
|
|
|
decision
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Seed the cache with an initial snapshot for `tool_call_id`, WITHOUT
|
|
|
|
|
/// attempting to diff against any prior state. Used for the initial
|
|
|
|
|
/// `SessionUpdate::ToolCall` notification, whose frontend reducer
|
|
|
|
|
/// treats `raw_output` as a full replacement.
|
|
|
|
|
fn seed(&mut self, tool_call_id: &str, curr: &str) -> Option<String> {
|
|
|
|
|
let (payload, _append) = build_emit_payload(curr, false);
|
|
|
|
|
let tail = trim_partial_ansi_tail(truncate_tail_at_char_boundary(
|
|
|
|
|
curr,
|
|
|
|
|
MAX_CACHED_TAIL_BYTES,
|
|
|
|
|
))
|
|
|
|
|
.to_string();
|
|
|
|
|
let generation = self.next_generation;
|
|
|
|
|
self.next_generation = self.next_generation.wrapping_add(1);
|
|
|
|
|
self.entries.insert(
|
|
|
|
|
tool_call_id.to_string(),
|
|
|
|
|
CachedOutput {
|
|
|
|
|
total_len: curr.len(),
|
|
|
|
|
tail,
|
|
|
|
|
generation,
|
|
|
|
|
},
|
|
|
|
|
);
|
|
|
|
|
self.enforce_entry_cap();
|
|
|
|
|
if payload.is_empty() {
|
|
|
|
|
None
|
|
|
|
|
} else {
|
|
|
|
|
Some(payload)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Drop cached state for a tool call that has finished. Keeps the
|
|
|
|
|
/// session-scoped cache bounded in long-running sessions.
|
|
|
|
|
fn remove_if_final(&mut self, tool_call_id: &str, status: Option<&str>) {
|
|
|
|
|
if matches!(
|
|
|
|
|
status,
|
|
|
|
|
Some("completed" | "failed" | "cancelled" | "error")
|
|
|
|
|
) {
|
|
|
|
|
self.entries.remove(tool_call_id);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Returns true when the cached fingerprint matches `curr` at the
|
|
|
|
|
/// expected offset — i.e. `curr` is a prefix extension (or identity)
|
|
|
|
|
/// of the previously observed snapshot.
|
|
|
|
|
fn is_extension_of(&self, prev: &CachedOutput, curr: &str) -> bool {
|
|
|
|
|
let tail_start = prev.total_len.saturating_sub(prev.tail.len());
|
|
|
|
|
curr.get(tail_start..prev.total_len)
|
|
|
|
|
.is_some_and(|slice| slice == prev.tail.as_str())
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Evict oldest entries (by `generation`) once the cache exceeds the
|
|
|
|
|
/// entry cap. Linear scan over a bounded map, so O(MAX_CACHE_ENTRIES)
|
|
|
|
|
/// per eviction — acceptable at this size.
|
|
|
|
|
fn enforce_entry_cap(&mut self) {
|
|
|
|
|
while self.entries.len() > MAX_CACHE_ENTRIES {
|
|
|
|
|
let Some(oldest_id) = self
|
|
|
|
|
.entries
|
|
|
|
|
.iter()
|
|
|
|
|
.min_by_key(|(_, v)| v.generation)
|
|
|
|
|
.map(|(k, _)| k.clone())
|
|
|
|
|
else {
|
|
|
|
|
break;
|
|
|
|
|
};
|
|
|
|
|
self.entries.remove(&oldest_id);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Apply the per-event size cap + truncation marker. Returns `(payload,
|
|
|
|
|
/// append)`. An empty `text` yields an empty `payload`; callers should
|
|
|
|
|
/// decide whether to suppress the emission in that case.
|
|
|
|
|
fn build_emit_payload(text: &str, append: bool) -> (String, bool) {
|
|
|
|
|
let truncated = trim_partial_ansi_tail(truncate_tail_at_char_boundary(
|
|
|
|
|
text,
|
|
|
|
|
MAX_SINGLE_EMIT_BYTES,
|
|
|
|
|
));
|
|
|
|
|
let out = if truncated.len() < text.len() {
|
|
|
|
|
format!("{TRUNCATION_MARKER}{truncated}")
|
|
|
|
|
} else {
|
|
|
|
|
truncated.to_string()
|
|
|
|
|
};
|
|
|
|
|
(out, append)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Return a substring of `s` whose byte length is `<= max_bytes`, aligned to
|
|
|
|
|
/// a UTF-8 character boundary and taken from the TAIL of `s` (so the most
|
|
|
|
|
/// recent output is preserved when truncation is required).
|
|
|
|
|
fn truncate_tail_at_char_boundary(s: &str, max_bytes: usize) -> &str {
|
|
|
|
|
if s.len() <= max_bytes {
|
|
|
|
|
return s;
|
|
|
|
|
}
|
|
|
|
|
let mut start = s.len() - max_bytes;
|
|
|
|
|
while start < s.len() && !s.is_char_boundary(start) {
|
|
|
|
|
start += 1;
|
|
|
|
|
}
|
|
|
|
|
&s[start..]
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// If the very end of `s` contains a partial ANSI escape sequence, trim it
|
|
|
|
|
/// so downstream ANSI parsers (e.g. the frontend `ansi-to-react` renderer)
|
|
|
|
|
/// don't see a half-emitted escape.
|
|
|
|
|
///
|
|
|
|
|
/// Handles the three common ACP-stream cases:
|
|
|
|
|
/// - CSI (`ESC [ ... final`): terminator is a byte in 0x40..=0x7E after
|
|
|
|
|
/// the `[` introducer.
|
|
|
|
|
/// - OSC (`ESC ] ... ST|BEL`): terminator is BEL (0x07) or `ESC \`.
|
|
|
|
|
/// - Simple two-byte escape (`ESC <byte>`): complete as soon as the byte
|
|
|
|
|
/// following ESC is present.
|
|
|
|
|
///
|
|
|
|
|
/// ESC is ASCII (1 byte), always a valid UTF-8 char boundary, so slicing
|
|
|
|
|
/// at `esc_pos` cannot produce an invalid UTF-8 string.
|
|
|
|
|
fn trim_partial_ansi_tail(s: &str) -> &str {
|
|
|
|
|
let bytes = s.as_bytes();
|
|
|
|
|
let Some(esc_pos) = bytes.iter().rposition(|&b| b == 0x1B) else {
|
|
|
|
|
return s;
|
|
|
|
|
};
|
|
|
|
|
let after = &bytes[esc_pos + 1..];
|
|
|
|
|
if after.is_empty() {
|
|
|
|
|
return &s[..esc_pos];
|
|
|
|
|
}
|
|
|
|
|
let terminated = match after[0] {
|
|
|
|
|
b'[' => after[1..]
|
|
|
|
|
.iter()
|
|
|
|
|
.any(|&b| (0x40..=0x7E).contains(&b)),
|
|
|
|
|
b']' => after[1..].contains(&0x07)
|
|
|
|
|
|| after[1..].windows(2).any(|w| w[0] == 0x1B && w[1] == b'\\'),
|
|
|
|
|
// Two-byte escape sequences (ESC M, ESC D, …) are complete as
|
|
|
|
|
// soon as the second byte is present.
|
|
|
|
|
_ => true,
|
|
|
|
|
};
|
|
|
|
|
if terminated {
|
|
|
|
|
s
|
|
|
|
|
} else {
|
|
|
|
|
&s[..esc_pos]
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#[derive(Debug, Default)]
|
|
|
|
|
struct TrackedTerminalToolCall {
|
|
|
|
|
terminal_ids: Vec<String>,
|
|
|
|
|
@@ -1561,6 +1798,12 @@ fn emit_terminal_output_update(
|
|
|
|
|
output: String,
|
|
|
|
|
append: bool,
|
|
|
|
|
) {
|
|
|
|
|
// Safety cap: when a subprocess writes very fast between poll ticks,
|
|
|
|
|
// the delta produced by `poll_terminal_tool_call_output` can still be
|
|
|
|
|
// up to ~1 MB (the terminal buffer limit). Enforce the pipeline-wide
|
|
|
|
|
// single-event cap (with ANSI-safe truncation) before emission so the
|
|
|
|
|
// WS/IPC fanout never carries a multi-MB payload.
|
|
|
|
|
let (payload, _append) = build_emit_payload(&output, append);
|
|
|
|
|
crate::web::event_bridge::emit_event(
|
|
|
|
|
emitter,
|
|
|
|
|
"acp://event",
|
|
|
|
|
@@ -1571,7 +1814,7 @@ fn emit_terminal_output_update(
|
|
|
|
|
status: None,
|
|
|
|
|
content: None,
|
|
|
|
|
raw_input: None,
|
|
|
|
|
raw_output: Some(output),
|
|
|
|
|
raw_output: Some(payload),
|
|
|
|
|
raw_output_append: Some(append),
|
|
|
|
|
locations: None,
|
|
|
|
|
meta: None,
|
|
|
|
|
@@ -1807,6 +2050,10 @@ async fn run_conversation_loop<'a>(
|
|
|
|
|
cwd: &str,
|
|
|
|
|
supports_fork: bool,
|
|
|
|
|
) -> Result<Option<ForkExitInfo>, sacp::Error> {
|
|
|
|
|
// Session-scoped cache for diffing cumulative `raw_output` snapshots
|
|
|
|
|
// into incremental deltas. Shared across the idle loop and the active
|
|
|
|
|
// turn loop so tool calls that span turns stay consistent.
|
|
|
|
|
let mut raw_output_cache = ToolCallOutputCache::default();
|
|
|
|
|
loop {
|
|
|
|
|
// Wait for either a user command or a session update (e.g. available_commands_update)
|
|
|
|
|
let cmd = loop {
|
|
|
|
|
@@ -1823,7 +2070,7 @@ async fn run_conversation_loop<'a>(
|
|
|
|
|
let _ = MatchDispatch::new(dispatch)
|
|
|
|
|
.if_notification(
|
|
|
|
|
async |notif: SessionNotification| {
|
|
|
|
|
emit_conversation_update(&cid, &h, agent_type, notif.update, cwd_opt);
|
|
|
|
|
emit_conversation_update(&cid, &h, agent_type, notif.update, cwd_opt, &mut raw_output_cache);
|
|
|
|
|
Ok(())
|
|
|
|
|
},
|
|
|
|
|
)
|
|
|
|
|
@@ -1918,7 +2165,7 @@ async fn run_conversation_loop<'a>(
|
|
|
|
|
¬if.update,
|
|
|
|
|
&mut tracked_terminal_tool_calls,
|
|
|
|
|
);
|
|
|
|
|
emit_conversation_update(&cid, &h, agent_type, notif.update, cwd_opt);
|
|
|
|
|
emit_conversation_update(&cid, &h, agent_type, notif.update, cwd_opt, &mut raw_output_cache);
|
|
|
|
|
if should_poll_now {
|
|
|
|
|
poll_tracked_terminal_tool_calls(
|
|
|
|
|
runtime.as_ref(),
|
|
|
|
|
@@ -2481,12 +2728,17 @@ fn fix_usage_update_nulls(mut dispatch: Dispatch) -> Dispatch {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Convert a SessionUpdate into AcpEvent(s) and emit to frontend.
|
|
|
|
|
///
|
|
|
|
|
/// `raw_output_cache` is a per-session cache used to detect cumulative
|
|
|
|
|
/// snapshots from agents and convert them into incremental deltas so the
|
|
|
|
|
/// event pipeline never carries a full N-MB tool output more than once.
|
|
|
|
|
fn emit_conversation_update(
|
|
|
|
|
connection_id: &str,
|
|
|
|
|
emitter: &EventEmitter,
|
|
|
|
|
agent_type: AgentType,
|
|
|
|
|
update: SessionUpdate,
|
|
|
|
|
cwd: Option<&str>,
|
|
|
|
|
raw_output_cache: &mut ToolCallOutputCache,
|
|
|
|
|
) {
|
|
|
|
|
match update {
|
|
|
|
|
SessionUpdate::UserMessageChunk(_) => {
|
|
|
|
|
@@ -2526,26 +2778,33 @@ fn emit_conversation_update(
|
|
|
|
|
// Non-text thought chunks are currently ignored.
|
|
|
|
|
}
|
|
|
|
|
SessionUpdate::ToolCall(tc) => {
|
|
|
|
|
let tool_call_id = tc.tool_call_id.to_string();
|
|
|
|
|
let content = serialize_tool_call_content(&tc.content);
|
|
|
|
|
let raw_input =
|
|
|
|
|
json_value_to_text(&tc.raw_input).map(|text| resolve_live_tool_input(&text, cwd));
|
|
|
|
|
let raw_output =
|
|
|
|
|
json_value_to_text(&tc.raw_output).map(|text| structurize_live_output(&text));
|
|
|
|
|
// Initial tool_call notification — the frontend reducer
|
|
|
|
|
// treats `raw_output` as a full replacement, so we bypass
|
|
|
|
|
// the diff path and seed the cache with the current snapshot.
|
|
|
|
|
let raw_output = json_value_to_text(&tc.raw_output)
|
|
|
|
|
.map(|text| structurize_live_output(&text))
|
|
|
|
|
.and_then(|text| raw_output_cache.seed(&tool_call_id, &text));
|
|
|
|
|
let locations = if tc.locations.is_empty() {
|
|
|
|
|
None
|
|
|
|
|
} else {
|
|
|
|
|
serde_json::to_value(&tc.locations).ok()
|
|
|
|
|
};
|
|
|
|
|
let meta = tc.meta.map(serde_json::Value::Object);
|
|
|
|
|
let status = format!("{:?}", tc.status).to_lowercase();
|
|
|
|
|
raw_output_cache.remove_if_final(&tool_call_id, Some(status.as_str()));
|
|
|
|
|
crate::web::event_bridge::emit_event(
|
|
|
|
|
emitter,
|
|
|
|
|
"acp://event",
|
|
|
|
|
AcpEvent::ToolCall {
|
|
|
|
|
connection_id: connection_id.into(),
|
|
|
|
|
tool_call_id: tc.tool_call_id.to_string(),
|
|
|
|
|
tool_call_id,
|
|
|
|
|
title: tc.title,
|
|
|
|
|
kind: format!("{:?}", tc.kind).to_lowercase(),
|
|
|
|
|
status: format!("{:?}", tc.status).to_lowercase(),
|
|
|
|
|
status,
|
|
|
|
|
content,
|
|
|
|
|
raw_input,
|
|
|
|
|
raw_output,
|
|
|
|
|
@@ -2555,6 +2814,7 @@ fn emit_conversation_update(
|
|
|
|
|
);
|
|
|
|
|
}
|
|
|
|
|
SessionUpdate::ToolCallUpdate(tcu) => {
|
|
|
|
|
let tool_call_id = tcu.tool_call_id.to_string();
|
|
|
|
|
let content = tcu
|
|
|
|
|
.fields
|
|
|
|
|
.content
|
|
|
|
|
@@ -2562,8 +2822,21 @@ fn emit_conversation_update(
|
|
|
|
|
.and_then(serialize_tool_call_content);
|
|
|
|
|
let raw_input = json_value_to_text(&tcu.fields.raw_input)
|
|
|
|
|
.map(|text| resolve_live_tool_input(&text, cwd));
|
|
|
|
|
let raw_output = json_value_to_text(&tcu.fields.raw_output)
|
|
|
|
|
// Diff the incoming raw_output against the last snapshot we
|
|
|
|
|
// emitted for this tool call. This turns cumulative snapshots
|
|
|
|
|
// from agents (Claude Code, Codex, …) into incremental deltas
|
|
|
|
|
// with `raw_output_append=true`, collapsing the O(N²) transfer
|
|
|
|
|
// problem to O(N) while capping any single emitted chunk to
|
|
|
|
|
// MAX_SINGLE_EMIT_BYTES.
|
|
|
|
|
let raw_output_text = json_value_to_text(&tcu.fields.raw_output)
|
|
|
|
|
.map(|text| structurize_live_output(&text));
|
|
|
|
|
let (raw_output, raw_output_append) = match raw_output_text {
|
|
|
|
|
Some(text) => match raw_output_cache.consume(&tool_call_id, &text) {
|
|
|
|
|
Some((payload, append)) => (Some(payload), Some(append)),
|
|
|
|
|
None => (None, None),
|
|
|
|
|
},
|
|
|
|
|
None => (None, None),
|
|
|
|
|
};
|
|
|
|
|
let locations = tcu
|
|
|
|
|
.fields
|
|
|
|
|
.locations
|
|
|
|
|
@@ -2571,18 +2844,20 @@ fn emit_conversation_update(
|
|
|
|
|
.filter(|l| !l.is_empty())
|
|
|
|
|
.and_then(|l| serde_json::to_value(l).ok());
|
|
|
|
|
let meta = tcu.meta.clone().map(serde_json::Value::Object);
|
|
|
|
|
let status = tcu.fields.status.map(|s| format!("{:?}", s).to_lowercase());
|
|
|
|
|
raw_output_cache.remove_if_final(&tool_call_id, status.as_deref());
|
|
|
|
|
crate::web::event_bridge::emit_event(
|
|
|
|
|
emitter,
|
|
|
|
|
"acp://event",
|
|
|
|
|
AcpEvent::ToolCallUpdate {
|
|
|
|
|
connection_id: connection_id.into(),
|
|
|
|
|
tool_call_id: tcu.tool_call_id.to_string(),
|
|
|
|
|
tool_call_id,
|
|
|
|
|
title: tcu.fields.title,
|
|
|
|
|
status: tcu.fields.status.map(|s| format!("{:?}", s).to_lowercase()),
|
|
|
|
|
status,
|
|
|
|
|
content,
|
|
|
|
|
raw_input,
|
|
|
|
|
raw_output,
|
|
|
|
|
raw_output_append: None,
|
|
|
|
|
raw_output_append,
|
|
|
|
|
locations,
|
|
|
|
|
meta,
|
|
|
|
|
},
|
|
|
|
|
@@ -2761,4 +3036,205 @@ mod tests {
|
|
|
|
|
|
|
|
|
|
assert!(req.meta.is_none());
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// ─── ToolCallOutputCache ────────────────────────────────────────────
|
|
|
|
|
|
|
|
|
|
#[test]
|
|
|
|
|
fn cache_first_update_emits_full_replace() {
|
|
|
|
|
let mut cache = ToolCallOutputCache::default();
|
|
|
|
|
let (payload, append) = cache.consume("t1", "hello world").expect("should emit");
|
|
|
|
|
assert_eq!(payload, "hello world");
|
|
|
|
|
assert!(!append, "first emit must be replacement");
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#[test]
|
|
|
|
|
fn cache_repeated_identical_snapshot_is_noop() {
|
|
|
|
|
let mut cache = ToolCallOutputCache::default();
|
|
|
|
|
cache.consume("t1", "same").unwrap();
|
|
|
|
|
assert!(
|
|
|
|
|
cache.consume("t1", "same").is_none(),
|
|
|
|
|
"identical snapshot must not emit"
|
|
|
|
|
);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#[test]
|
|
|
|
|
fn cache_prefix_extension_emits_suffix_with_append() {
|
|
|
|
|
let mut cache = ToolCallOutputCache::default();
|
|
|
|
|
cache.consume("t1", "line-1\n").unwrap();
|
|
|
|
|
let (payload, append) = cache
|
|
|
|
|
.consume("t1", "line-1\nline-2\n")
|
|
|
|
|
.expect("should emit");
|
|
|
|
|
assert_eq!(payload, "line-2\n");
|
|
|
|
|
assert!(append, "prefix extension must emit with append=true");
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#[test]
|
|
|
|
|
fn cache_divergent_snapshot_falls_back_to_replace() {
|
|
|
|
|
let mut cache = ToolCallOutputCache::default();
|
|
|
|
|
cache.consume("t1", "hello world").unwrap();
|
|
|
|
|
let (payload, append) = cache.consume("t1", "foo bar baz").expect("should emit");
|
|
|
|
|
assert_eq!(payload, "foo bar baz");
|
|
|
|
|
assert!(!append, "non-extension snapshot must replace");
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#[test]
|
|
|
|
|
fn cache_tracks_extensions_past_cached_tail_boundary() {
|
|
|
|
|
// Regression test for the original bug: when cumulative raw_output
|
|
|
|
|
// exceeds MAX_CACHED_TAIL_BYTES, subsequent extensions must still be
|
|
|
|
|
// detectable by comparing the cached tail against the expected
|
|
|
|
|
// offset in the incoming snapshot.
|
|
|
|
|
let mut cache = ToolCallOutputCache::default();
|
|
|
|
|
// First snapshot: 10 KB of 'a' + unique 4 KB marker at the end.
|
|
|
|
|
let prefix = "a".repeat(10 * 1024);
|
|
|
|
|
let marker = "M".repeat(4 * 1024);
|
|
|
|
|
let first = format!("{prefix}{marker}");
|
|
|
|
|
cache.consume("t1", &first).unwrap();
|
|
|
|
|
|
|
|
|
|
// Second snapshot extends first by 16 KB of 'Z'.
|
|
|
|
|
let delta = "Z".repeat(16 * 1024);
|
|
|
|
|
let second = format!("{first}{delta}");
|
|
|
|
|
let (payload, append) = cache.consume("t1", &second).expect("should emit");
|
|
|
|
|
assert!(append, "extension beyond cached tail must still be detected");
|
|
|
|
|
// The emitted payload should carry the delta (or its tail when
|
|
|
|
|
// truncated at MAX_SINGLE_EMIT_BYTES). For a 16 KB delta that's
|
|
|
|
|
// well below the 64 KB cap, we expect it verbatim.
|
|
|
|
|
assert_eq!(payload, delta);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#[test]
|
|
|
|
|
fn cache_extension_larger_than_emit_cap_gets_truncated() {
|
|
|
|
|
let mut cache = ToolCallOutputCache::default();
|
|
|
|
|
cache.consume("t1", "seed").unwrap();
|
|
|
|
|
// Build a delta much larger than MAX_SINGLE_EMIT_BYTES.
|
|
|
|
|
let big_delta = "X".repeat(MAX_SINGLE_EMIT_BYTES * 2);
|
|
|
|
|
let second = format!("seed{big_delta}");
|
|
|
|
|
let (payload, append) = cache.consume("t1", &second).expect("should emit");
|
|
|
|
|
assert!(append);
|
|
|
|
|
assert!(
|
|
|
|
|
payload.starts_with(TRUNCATION_MARKER),
|
|
|
|
|
"oversized delta must be prefixed with truncation marker"
|
|
|
|
|
);
|
|
|
|
|
// Payload length: marker + at most MAX_SINGLE_EMIT_BYTES of tail.
|
|
|
|
|
assert!(payload.len() <= TRUNCATION_MARKER.len() + MAX_SINGLE_EMIT_BYTES);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#[test]
|
|
|
|
|
fn cache_respects_utf8_char_boundary_on_truncation() {
|
|
|
|
|
let mut cache = ToolCallOutputCache::default();
|
|
|
|
|
// Single first-update whose byte length forces truncation at a
|
|
|
|
|
// position that would otherwise fall mid-codepoint. 中 is 3 bytes
|
|
|
|
|
// (E4 B8 AD) and MAX_SINGLE_EMIT_BYTES (65536) is not a multiple
|
|
|
|
|
// of 3, so naïve byte slicing would land mid-char.
|
|
|
|
|
let chinese_block = "中".repeat((MAX_SINGLE_EMIT_BYTES / 3) + 100);
|
|
|
|
|
let (payload, _append) = cache.consume("t1", &chinese_block).expect("should emit");
|
|
|
|
|
// Payload must start with the truncation marker (since size > cap).
|
|
|
|
|
assert!(
|
|
|
|
|
payload.starts_with(TRUNCATION_MARKER),
|
|
|
|
|
"oversized snapshot must be truncated"
|
|
|
|
|
);
|
|
|
|
|
// Body after the marker must be valid UTF-8 consisting only of 中.
|
|
|
|
|
let body = &payload[TRUNCATION_MARKER.len()..];
|
|
|
|
|
assert!(!body.is_empty());
|
|
|
|
|
assert!(
|
|
|
|
|
body.chars().all(|c| c == '中'),
|
|
|
|
|
"truncation boundary must land on a UTF-8 codepoint edge"
|
|
|
|
|
);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#[test]
|
|
|
|
|
fn cache_final_status_clears_entry() {
|
|
|
|
|
let mut cache = ToolCallOutputCache::default();
|
|
|
|
|
cache.consume("t1", "hello").unwrap();
|
|
|
|
|
assert!(cache.entries.contains_key("t1"));
|
|
|
|
|
cache.remove_if_final("t1", Some("completed"));
|
|
|
|
|
assert!(!cache.entries.contains_key("t1"));
|
|
|
|
|
|
|
|
|
|
cache.consume("t2", "x").unwrap();
|
|
|
|
|
cache.remove_if_final("t2", Some("cancelled"));
|
|
|
|
|
assert!(!cache.entries.contains_key("t2"));
|
|
|
|
|
|
|
|
|
|
cache.consume("t3", "x").unwrap();
|
|
|
|
|
cache.remove_if_final("t3", Some("in_progress"));
|
|
|
|
|
assert!(
|
|
|
|
|
cache.entries.contains_key("t3"),
|
|
|
|
|
"in-progress status must not clear cache"
|
|
|
|
|
);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#[test]
|
|
|
|
|
fn cache_enforces_entry_cap_via_fifo_eviction() {
|
|
|
|
|
let mut cache = ToolCallOutputCache::default();
|
|
|
|
|
for i in 0..(MAX_CACHE_ENTRIES + 50) {
|
|
|
|
|
cache.consume(&format!("tool-{i}"), "body").unwrap();
|
|
|
|
|
}
|
|
|
|
|
assert_eq!(cache.entries.len(), MAX_CACHE_ENTRIES);
|
|
|
|
|
// Oldest entries should have been evicted; newest must still exist.
|
|
|
|
|
assert!(!cache.entries.contains_key("tool-0"));
|
|
|
|
|
assert!(cache
|
|
|
|
|
.entries
|
|
|
|
|
.contains_key(&format!("tool-{}", MAX_CACHE_ENTRIES + 49)));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#[test]
|
|
|
|
|
fn cache_seed_always_replaces_and_caches() {
|
|
|
|
|
let mut cache = ToolCallOutputCache::default();
|
|
|
|
|
cache.consume("t1", "stale").unwrap();
|
|
|
|
|
// A hypothetical replay would send another ToolCall for the same
|
|
|
|
|
// id — seed() must install the new snapshot without trying to
|
|
|
|
|
// diff against the stale prior entry.
|
|
|
|
|
let payload = cache.seed("t1", "fresh").expect("seed emits");
|
|
|
|
|
assert_eq!(payload, "fresh");
|
|
|
|
|
// Next consume should diff against "fresh", not "stale".
|
|
|
|
|
let (p2, append) = cache.consume("t1", "fresh+more").expect("emit");
|
|
|
|
|
assert!(append, "should detect extension of freshly seeded entry");
|
|
|
|
|
assert_eq!(p2, "+more");
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// ─── trim_partial_ansi_tail ─────────────────────────────────────────
|
|
|
|
|
|
|
|
|
|
#[test]
|
|
|
|
|
fn ansi_trim_leaves_pure_text_unchanged() {
|
|
|
|
|
assert_eq!(trim_partial_ansi_tail("plain text"), "plain text");
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#[test]
|
|
|
|
|
fn ansi_trim_keeps_completed_sequences() {
|
|
|
|
|
let s = "\x1b[31mRED\x1b[0m done";
|
|
|
|
|
assert_eq!(trim_partial_ansi_tail(s), s);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#[test]
|
|
|
|
|
fn ansi_trim_cuts_unterminated_trailing_sequence() {
|
|
|
|
|
let s = "hello \x1b[31";
|
|
|
|
|
assert_eq!(trim_partial_ansi_tail(s), "hello ");
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#[test]
|
|
|
|
|
fn ansi_trim_handles_bare_escape_at_end() {
|
|
|
|
|
let s = "hello\x1b";
|
|
|
|
|
assert_eq!(trim_partial_ansi_tail(s), "hello");
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// ─── truncate_tail_at_char_boundary ─────────────────────────────────
|
|
|
|
|
|
|
|
|
|
#[test]
|
|
|
|
|
fn truncate_under_cap_returns_as_is() {
|
|
|
|
|
assert_eq!(truncate_tail_at_char_boundary("abc", 10), "abc");
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#[test]
|
|
|
|
|
fn truncate_returns_tail_on_overflow() {
|
|
|
|
|
assert_eq!(truncate_tail_at_char_boundary("abcdef", 3), "def");
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#[test]
|
|
|
|
|
fn truncate_respects_multibyte_utf8_boundary() {
|
|
|
|
|
// "中中中" is 9 bytes; asking for 4 bytes would land mid-char.
|
|
|
|
|
let s = "中中中";
|
|
|
|
|
let out = truncate_tail_at_char_boundary(s, 4);
|
|
|
|
|
// Must be valid UTF-8 (indexing an invalid boundary would have
|
|
|
|
|
// panicked at slicing time).
|
|
|
|
|
assert!(out.chars().all(|c| c == '中'));
|
|
|
|
|
assert!(out.len() <= 6); // at most 2 chars (6 bytes)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|