From bcfaba10c74170fb578c147660affc8d6c8b9706 Mon Sep 17 00:00:00 2001 From: xintaofei Date: Fri, 24 Apr 2026 14:28:41 +0800 Subject: [PATCH] fix(acp): prevent memory blowup from streaming terminal tool output MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Large raw_output snapshots from tool_call_update notifications caused O(N²) traffic through the event pipeline, multi-GB transient allocations and WKWebView crashes. The fix turns cumulative snapshots into bounded incremental deltas and removes redundant payload copies. - Add ToolCallOutputCache keyed by tool_call_id with a 8KB tail fingerprint and total length. Detects cumulative extensions by matching the cached tail at the expected offset in the incoming snapshot, so it works even when the full output grows into the MB range. Emits suffix deltas with raw_output_append=true; falls back to a truncated replacement when content diverges. - Cap any single emitted raw_output chunk at 64KB (MAX_SINGLE_EMIT_BYTES) with a UTF-8 char-boundary-safe tail and ANSI-sequence-safe trimming. Apply the same cap to emit_terminal_output_update. - Bound the cache at 256 entries with FIFO eviction, and clear entries when the tool call reaches completed / failed / cancelled / error. - Seed the cache via a dedicated seed() method on SessionUpdate::ToolCall so the initial event never emits an accidental append. - Share emit payloads as Arc across broadcast receivers and skip the Tauri-side clone: serialize once and hand the same Arc to both the webview emit and the WebSocket broadcaster. - Add 14 unit tests covering boundary cases: identity, prefix extension past the cached tail, divergence, oversized deltas, multibyte UTF-8 truncation, final-status cleanup, FIFO eviction, seed semantics, and ANSI-safe trimming. --- src-tauri/src/acp/connection.rs | 498 +++++++++++++++++- .../src/chat_channel/event_subscriber.rs | 2 +- .../chat_channel/session_event_subscriber.rs | 2 +- src-tauri/src/web/event_bridge.rs | 62 ++- 4 files changed, 536 insertions(+), 28 deletions(-) diff --git a/src-tauri/src/acp/connection.rs b/src-tauri/src/acp/connection.rs index 691aafd..49974e1 100644 --- a/src-tauri/src/acp/connection.rs +++ b/src-tauri/src/acp/connection.rs @@ -986,12 +986,19 @@ async fn run_connection( notif.update, SessionUpdate::AvailableCommandsUpdate(_) ) { + // Historical-replay path only + // forwards AvailableCommandsUpdate, + // which never carries tool output + // — a throwaway cache is fine. + let mut replay_cache = + ToolCallOutputCache::default(); emit_conversation_update( &cid, &h, agent_type, notif.update, None, + &mut replay_cache, ); } Ok(()) @@ -1346,6 +1353,236 @@ async fn set_session_config_option( const TERMINAL_POLL_INTERVAL_MS: u64 = 200; const TERMINAL_POLL_MISSING_LIMIT: u8 = 10; +/// Hard cap on the size of a single ACP event's `raw_output` payload. +/// +/// Agents (e.g. Claude Code, Codex) frequently send `tool_call_update` +/// notifications where `raw_output` is the **full accumulated** tool output +/// rather than an incremental delta. For long-running terminal tools this +/// leads to O(N²) bytes flowing through the event pipeline and multi-GB +/// transient allocations (serde_json Value trees, IPC buffers, broadcast +/// channel backlog). This constant caps any single emitted chunk so the +/// pipeline never sees a multi-MB event. +const MAX_SINGLE_EMIT_BYTES: usize = 64 * 1024; + +/// Byte length of the tail we retain per tool-call to verify that the next +/// incoming snapshot is a cumulative extension of the previous one. Small +/// enough to keep the cache bounded even in pathological sessions, large +/// enough that a matching tail is an extremely unlikely coincidence. +const MAX_CACHED_TAIL_BYTES: usize = 8 * 1024; + +/// Hard cap on the number of tool-call entries the cache retains. Prevents +/// unbounded growth in long sessions where agents forget to mark tool calls +/// as completed. Entries are evicted FIFO by generation counter. +const MAX_CACHE_ENTRIES: usize = 256; + +/// Prefix used when an emitted chunk had to be truncated. +const TRUNCATION_MARKER: &str = "[...truncated...]\n"; + +#[derive(Debug)] +struct CachedOutput { + /// Total byte length of the last observed `raw_output`. + total_len: usize, + /// Tail of the last observed `raw_output`, up to `MAX_CACHED_TAIL_BYTES` + /// bytes. Always aligned to a UTF-8 character boundary at the start. + tail: String, + /// Monotonic insertion/update tick used for FIFO eviction. + generation: u64, +} + +/// Per-session cache of the last `raw_output` fingerprint emitted for each +/// tool call. Enables delta detection: when an agent sends cumulative +/// snapshots, we forward only the suffix (with `raw_output_append=true`) +/// and keep the fingerprint bounded so it works even when the full output +/// grows into the multi-MB range. +#[derive(Debug, Default)] +struct ToolCallOutputCache { + entries: HashMap, + next_generation: u64, +} + +impl ToolCallOutputCache { + /// Diff an incoming full `raw_output` snapshot for `tool_call_id` against + /// the cache and return what should be emitted downstream. + /// + /// Returns `None` when the incoming snapshot is identical to the + /// previously emitted one (nothing to send). Otherwise returns + /// `(payload, append)` where: + /// - `append=true` — `payload` is a (possibly truncated) suffix delta; + /// the frontend should append it to the existing chunks. + /// - `append=false` — `payload` is a (possibly truncated) replacement + /// for the full tool output; the frontend should reset chunks. + fn consume(&mut self, tool_call_id: &str, curr: &str) -> Option<(String, bool)> { + let curr_len = curr.len(); + + let decision: Option<(String, bool)> = match self.entries.get(tool_call_id) { + Some(prev) if curr_len >= prev.total_len && self.is_extension_of(prev, curr) => { + if curr_len == prev.total_len { + // Identical output — nothing to emit. Cache stays fresh. + return None; + } + let suffix = &curr[prev.total_len..]; + Some(build_emit_payload(suffix, true)) + } + _ => Some(build_emit_payload(curr, false)), + }; + + // Update cache snapshot to current state so the next update can + // still detect a prefix extension. + let tail = trim_partial_ansi_tail(truncate_tail_at_char_boundary( + curr, + MAX_CACHED_TAIL_BYTES, + )) + .to_string(); + let generation = self.next_generation; + self.next_generation = self.next_generation.wrapping_add(1); + self.entries.insert( + tool_call_id.to_string(), + CachedOutput { + total_len: curr_len, + tail, + generation, + }, + ); + self.enforce_entry_cap(); + decision + } + + /// Seed the cache with an initial snapshot for `tool_call_id`, WITHOUT + /// attempting to diff against any prior state. Used for the initial + /// `SessionUpdate::ToolCall` notification, whose frontend reducer + /// treats `raw_output` as a full replacement. + fn seed(&mut self, tool_call_id: &str, curr: &str) -> Option { + let (payload, _append) = build_emit_payload(curr, false); + let tail = trim_partial_ansi_tail(truncate_tail_at_char_boundary( + curr, + MAX_CACHED_TAIL_BYTES, + )) + .to_string(); + let generation = self.next_generation; + self.next_generation = self.next_generation.wrapping_add(1); + self.entries.insert( + tool_call_id.to_string(), + CachedOutput { + total_len: curr.len(), + tail, + generation, + }, + ); + self.enforce_entry_cap(); + if payload.is_empty() { + None + } else { + Some(payload) + } + } + + /// Drop cached state for a tool call that has finished. Keeps the + /// session-scoped cache bounded in long-running sessions. + fn remove_if_final(&mut self, tool_call_id: &str, status: Option<&str>) { + if matches!( + status, + Some("completed" | "failed" | "cancelled" | "error") + ) { + self.entries.remove(tool_call_id); + } + } + + /// Returns true when the cached fingerprint matches `curr` at the + /// expected offset — i.e. `curr` is a prefix extension (or identity) + /// of the previously observed snapshot. + fn is_extension_of(&self, prev: &CachedOutput, curr: &str) -> bool { + let tail_start = prev.total_len.saturating_sub(prev.tail.len()); + curr.get(tail_start..prev.total_len) + .is_some_and(|slice| slice == prev.tail.as_str()) + } + + /// Evict oldest entries (by `generation`) once the cache exceeds the + /// entry cap. Linear scan over a bounded map, so O(MAX_CACHE_ENTRIES) + /// per eviction — acceptable at this size. + fn enforce_entry_cap(&mut self) { + while self.entries.len() > MAX_CACHE_ENTRIES { + let Some(oldest_id) = self + .entries + .iter() + .min_by_key(|(_, v)| v.generation) + .map(|(k, _)| k.clone()) + else { + break; + }; + self.entries.remove(&oldest_id); + } + } +} + +/// Apply the per-event size cap + truncation marker. Returns `(payload, +/// append)`. An empty `text` yields an empty `payload`; callers should +/// decide whether to suppress the emission in that case. +fn build_emit_payload(text: &str, append: bool) -> (String, bool) { + let truncated = trim_partial_ansi_tail(truncate_tail_at_char_boundary( + text, + MAX_SINGLE_EMIT_BYTES, + )); + let out = if truncated.len() < text.len() { + format!("{TRUNCATION_MARKER}{truncated}") + } else { + truncated.to_string() + }; + (out, append) +} + +/// Return a substring of `s` whose byte length is `<= max_bytes`, aligned to +/// a UTF-8 character boundary and taken from the TAIL of `s` (so the most +/// recent output is preserved when truncation is required). +fn truncate_tail_at_char_boundary(s: &str, max_bytes: usize) -> &str { + if s.len() <= max_bytes { + return s; + } + let mut start = s.len() - max_bytes; + while start < s.len() && !s.is_char_boundary(start) { + start += 1; + } + &s[start..] +} + +/// If the very end of `s` contains a partial ANSI escape sequence, trim it +/// so downstream ANSI parsers (e.g. the frontend `ansi-to-react` renderer) +/// don't see a half-emitted escape. +/// +/// Handles the three common ACP-stream cases: +/// - CSI (`ESC [ ... final`): terminator is a byte in 0x40..=0x7E after +/// the `[` introducer. +/// - OSC (`ESC ] ... ST|BEL`): terminator is BEL (0x07) or `ESC \`. +/// - Simple two-byte escape (`ESC `): complete as soon as the byte +/// following ESC is present. +/// +/// ESC is ASCII (1 byte), always a valid UTF-8 char boundary, so slicing +/// at `esc_pos` cannot produce an invalid UTF-8 string. +fn trim_partial_ansi_tail(s: &str) -> &str { + let bytes = s.as_bytes(); + let Some(esc_pos) = bytes.iter().rposition(|&b| b == 0x1B) else { + return s; + }; + let after = &bytes[esc_pos + 1..]; + if after.is_empty() { + return &s[..esc_pos]; + } + let terminated = match after[0] { + b'[' => after[1..] + .iter() + .any(|&b| (0x40..=0x7E).contains(&b)), + b']' => after[1..].contains(&0x07) + || after[1..].windows(2).any(|w| w[0] == 0x1B && w[1] == b'\\'), + // Two-byte escape sequences (ESC M, ESC D, …) are complete as + // soon as the second byte is present. + _ => true, + }; + if terminated { + s + } else { + &s[..esc_pos] + } +} + #[derive(Debug, Default)] struct TrackedTerminalToolCall { terminal_ids: Vec, @@ -1561,6 +1798,12 @@ fn emit_terminal_output_update( output: String, append: bool, ) { + // Safety cap: when a subprocess writes very fast between poll ticks, + // the delta produced by `poll_terminal_tool_call_output` can still be + // up to ~1 MB (the terminal buffer limit). Enforce the pipeline-wide + // single-event cap (with ANSI-safe truncation) before emission so the + // WS/IPC fanout never carries a multi-MB payload. + let (payload, _append) = build_emit_payload(&output, append); crate::web::event_bridge::emit_event( emitter, "acp://event", @@ -1571,7 +1814,7 @@ fn emit_terminal_output_update( status: None, content: None, raw_input: None, - raw_output: Some(output), + raw_output: Some(payload), raw_output_append: Some(append), locations: None, meta: None, @@ -1807,6 +2050,10 @@ async fn run_conversation_loop<'a>( cwd: &str, supports_fork: bool, ) -> Result, sacp::Error> { + // Session-scoped cache for diffing cumulative `raw_output` snapshots + // into incremental deltas. Shared across the idle loop and the active + // turn loop so tool calls that span turns stay consistent. + let mut raw_output_cache = ToolCallOutputCache::default(); loop { // Wait for either a user command or a session update (e.g. available_commands_update) let cmd = loop { @@ -1823,7 +2070,7 @@ async fn run_conversation_loop<'a>( let _ = MatchDispatch::new(dispatch) .if_notification( async |notif: SessionNotification| { - emit_conversation_update(&cid, &h, agent_type, notif.update, cwd_opt); + emit_conversation_update(&cid, &h, agent_type, notif.update, cwd_opt, &mut raw_output_cache); Ok(()) }, ) @@ -1918,7 +2165,7 @@ async fn run_conversation_loop<'a>( ¬if.update, &mut tracked_terminal_tool_calls, ); - emit_conversation_update(&cid, &h, agent_type, notif.update, cwd_opt); + emit_conversation_update(&cid, &h, agent_type, notif.update, cwd_opt, &mut raw_output_cache); if should_poll_now { poll_tracked_terminal_tool_calls( runtime.as_ref(), @@ -2481,12 +2728,17 @@ fn fix_usage_update_nulls(mut dispatch: Dispatch) -> Dispatch { } /// Convert a SessionUpdate into AcpEvent(s) and emit to frontend. +/// +/// `raw_output_cache` is a per-session cache used to detect cumulative +/// snapshots from agents and convert them into incremental deltas so the +/// event pipeline never carries a full N-MB tool output more than once. fn emit_conversation_update( connection_id: &str, emitter: &EventEmitter, agent_type: AgentType, update: SessionUpdate, cwd: Option<&str>, + raw_output_cache: &mut ToolCallOutputCache, ) { match update { SessionUpdate::UserMessageChunk(_) => { @@ -2526,26 +2778,33 @@ fn emit_conversation_update( // Non-text thought chunks are currently ignored. } SessionUpdate::ToolCall(tc) => { + let tool_call_id = tc.tool_call_id.to_string(); let content = serialize_tool_call_content(&tc.content); let raw_input = json_value_to_text(&tc.raw_input).map(|text| resolve_live_tool_input(&text, cwd)); - let raw_output = - json_value_to_text(&tc.raw_output).map(|text| structurize_live_output(&text)); + // Initial tool_call notification — the frontend reducer + // treats `raw_output` as a full replacement, so we bypass + // the diff path and seed the cache with the current snapshot. + let raw_output = json_value_to_text(&tc.raw_output) + .map(|text| structurize_live_output(&text)) + .and_then(|text| raw_output_cache.seed(&tool_call_id, &text)); let locations = if tc.locations.is_empty() { None } else { serde_json::to_value(&tc.locations).ok() }; let meta = tc.meta.map(serde_json::Value::Object); + let status = format!("{:?}", tc.status).to_lowercase(); + raw_output_cache.remove_if_final(&tool_call_id, Some(status.as_str())); crate::web::event_bridge::emit_event( emitter, "acp://event", AcpEvent::ToolCall { connection_id: connection_id.into(), - tool_call_id: tc.tool_call_id.to_string(), + tool_call_id, title: tc.title, kind: format!("{:?}", tc.kind).to_lowercase(), - status: format!("{:?}", tc.status).to_lowercase(), + status, content, raw_input, raw_output, @@ -2555,6 +2814,7 @@ fn emit_conversation_update( ); } SessionUpdate::ToolCallUpdate(tcu) => { + let tool_call_id = tcu.tool_call_id.to_string(); let content = tcu .fields .content @@ -2562,8 +2822,21 @@ fn emit_conversation_update( .and_then(serialize_tool_call_content); let raw_input = json_value_to_text(&tcu.fields.raw_input) .map(|text| resolve_live_tool_input(&text, cwd)); - let raw_output = json_value_to_text(&tcu.fields.raw_output) + // Diff the incoming raw_output against the last snapshot we + // emitted for this tool call. This turns cumulative snapshots + // from agents (Claude Code, Codex, …) into incremental deltas + // with `raw_output_append=true`, collapsing the O(N²) transfer + // problem to O(N) while capping any single emitted chunk to + // MAX_SINGLE_EMIT_BYTES. + let raw_output_text = json_value_to_text(&tcu.fields.raw_output) .map(|text| structurize_live_output(&text)); + let (raw_output, raw_output_append) = match raw_output_text { + Some(text) => match raw_output_cache.consume(&tool_call_id, &text) { + Some((payload, append)) => (Some(payload), Some(append)), + None => (None, None), + }, + None => (None, None), + }; let locations = tcu .fields .locations @@ -2571,18 +2844,20 @@ fn emit_conversation_update( .filter(|l| !l.is_empty()) .and_then(|l| serde_json::to_value(l).ok()); let meta = tcu.meta.clone().map(serde_json::Value::Object); + let status = tcu.fields.status.map(|s| format!("{:?}", s).to_lowercase()); + raw_output_cache.remove_if_final(&tool_call_id, status.as_deref()); crate::web::event_bridge::emit_event( emitter, "acp://event", AcpEvent::ToolCallUpdate { connection_id: connection_id.into(), - tool_call_id: tcu.tool_call_id.to_string(), + tool_call_id, title: tcu.fields.title, - status: tcu.fields.status.map(|s| format!("{:?}", s).to_lowercase()), + status, content, raw_input, raw_output, - raw_output_append: None, + raw_output_append, locations, meta, }, @@ -2761,4 +3036,205 @@ mod tests { assert!(req.meta.is_none()); } + + // ─── ToolCallOutputCache ──────────────────────────────────────────── + + #[test] + fn cache_first_update_emits_full_replace() { + let mut cache = ToolCallOutputCache::default(); + let (payload, append) = cache.consume("t1", "hello world").expect("should emit"); + assert_eq!(payload, "hello world"); + assert!(!append, "first emit must be replacement"); + } + + #[test] + fn cache_repeated_identical_snapshot_is_noop() { + let mut cache = ToolCallOutputCache::default(); + cache.consume("t1", "same").unwrap(); + assert!( + cache.consume("t1", "same").is_none(), + "identical snapshot must not emit" + ); + } + + #[test] + fn cache_prefix_extension_emits_suffix_with_append() { + let mut cache = ToolCallOutputCache::default(); + cache.consume("t1", "line-1\n").unwrap(); + let (payload, append) = cache + .consume("t1", "line-1\nline-2\n") + .expect("should emit"); + assert_eq!(payload, "line-2\n"); + assert!(append, "prefix extension must emit with append=true"); + } + + #[test] + fn cache_divergent_snapshot_falls_back_to_replace() { + let mut cache = ToolCallOutputCache::default(); + cache.consume("t1", "hello world").unwrap(); + let (payload, append) = cache.consume("t1", "foo bar baz").expect("should emit"); + assert_eq!(payload, "foo bar baz"); + assert!(!append, "non-extension snapshot must replace"); + } + + #[test] + fn cache_tracks_extensions_past_cached_tail_boundary() { + // Regression test for the original bug: when cumulative raw_output + // exceeds MAX_CACHED_TAIL_BYTES, subsequent extensions must still be + // detectable by comparing the cached tail against the expected + // offset in the incoming snapshot. + let mut cache = ToolCallOutputCache::default(); + // First snapshot: 10 KB of 'a' + unique 4 KB marker at the end. + let prefix = "a".repeat(10 * 1024); + let marker = "M".repeat(4 * 1024); + let first = format!("{prefix}{marker}"); + cache.consume("t1", &first).unwrap(); + + // Second snapshot extends first by 16 KB of 'Z'. + let delta = "Z".repeat(16 * 1024); + let second = format!("{first}{delta}"); + let (payload, append) = cache.consume("t1", &second).expect("should emit"); + assert!(append, "extension beyond cached tail must still be detected"); + // The emitted payload should carry the delta (or its tail when + // truncated at MAX_SINGLE_EMIT_BYTES). For a 16 KB delta that's + // well below the 64 KB cap, we expect it verbatim. + assert_eq!(payload, delta); + } + + #[test] + fn cache_extension_larger_than_emit_cap_gets_truncated() { + let mut cache = ToolCallOutputCache::default(); + cache.consume("t1", "seed").unwrap(); + // Build a delta much larger than MAX_SINGLE_EMIT_BYTES. + let big_delta = "X".repeat(MAX_SINGLE_EMIT_BYTES * 2); + let second = format!("seed{big_delta}"); + let (payload, append) = cache.consume("t1", &second).expect("should emit"); + assert!(append); + assert!( + payload.starts_with(TRUNCATION_MARKER), + "oversized delta must be prefixed with truncation marker" + ); + // Payload length: marker + at most MAX_SINGLE_EMIT_BYTES of tail. + assert!(payload.len() <= TRUNCATION_MARKER.len() + MAX_SINGLE_EMIT_BYTES); + } + + #[test] + fn cache_respects_utf8_char_boundary_on_truncation() { + let mut cache = ToolCallOutputCache::default(); + // Single first-update whose byte length forces truncation at a + // position that would otherwise fall mid-codepoint. 中 is 3 bytes + // (E4 B8 AD) and MAX_SINGLE_EMIT_BYTES (65536) is not a multiple + // of 3, so naïve byte slicing would land mid-char. + let chinese_block = "中".repeat((MAX_SINGLE_EMIT_BYTES / 3) + 100); + let (payload, _append) = cache.consume("t1", &chinese_block).expect("should emit"); + // Payload must start with the truncation marker (since size > cap). + assert!( + payload.starts_with(TRUNCATION_MARKER), + "oversized snapshot must be truncated" + ); + // Body after the marker must be valid UTF-8 consisting only of 中. + let body = &payload[TRUNCATION_MARKER.len()..]; + assert!(!body.is_empty()); + assert!( + body.chars().all(|c| c == '中'), + "truncation boundary must land on a UTF-8 codepoint edge" + ); + } + + #[test] + fn cache_final_status_clears_entry() { + let mut cache = ToolCallOutputCache::default(); + cache.consume("t1", "hello").unwrap(); + assert!(cache.entries.contains_key("t1")); + cache.remove_if_final("t1", Some("completed")); + assert!(!cache.entries.contains_key("t1")); + + cache.consume("t2", "x").unwrap(); + cache.remove_if_final("t2", Some("cancelled")); + assert!(!cache.entries.contains_key("t2")); + + cache.consume("t3", "x").unwrap(); + cache.remove_if_final("t3", Some("in_progress")); + assert!( + cache.entries.contains_key("t3"), + "in-progress status must not clear cache" + ); + } + + #[test] + fn cache_enforces_entry_cap_via_fifo_eviction() { + let mut cache = ToolCallOutputCache::default(); + for i in 0..(MAX_CACHE_ENTRIES + 50) { + cache.consume(&format!("tool-{i}"), "body").unwrap(); + } + assert_eq!(cache.entries.len(), MAX_CACHE_ENTRIES); + // Oldest entries should have been evicted; newest must still exist. + assert!(!cache.entries.contains_key("tool-0")); + assert!(cache + .entries + .contains_key(&format!("tool-{}", MAX_CACHE_ENTRIES + 49))); + } + + #[test] + fn cache_seed_always_replaces_and_caches() { + let mut cache = ToolCallOutputCache::default(); + cache.consume("t1", "stale").unwrap(); + // A hypothetical replay would send another ToolCall for the same + // id — seed() must install the new snapshot without trying to + // diff against the stale prior entry. + let payload = cache.seed("t1", "fresh").expect("seed emits"); + assert_eq!(payload, "fresh"); + // Next consume should diff against "fresh", not "stale". + let (p2, append) = cache.consume("t1", "fresh+more").expect("emit"); + assert!(append, "should detect extension of freshly seeded entry"); + assert_eq!(p2, "+more"); + } + + // ─── trim_partial_ansi_tail ───────────────────────────────────────── + + #[test] + fn ansi_trim_leaves_pure_text_unchanged() { + assert_eq!(trim_partial_ansi_tail("plain text"), "plain text"); + } + + #[test] + fn ansi_trim_keeps_completed_sequences() { + let s = "\x1b[31mRED\x1b[0m done"; + assert_eq!(trim_partial_ansi_tail(s), s); + } + + #[test] + fn ansi_trim_cuts_unterminated_trailing_sequence() { + let s = "hello \x1b[31"; + assert_eq!(trim_partial_ansi_tail(s), "hello "); + } + + #[test] + fn ansi_trim_handles_bare_escape_at_end() { + let s = "hello\x1b"; + assert_eq!(trim_partial_ansi_tail(s), "hello"); + } + + // ─── truncate_tail_at_char_boundary ───────────────────────────────── + + #[test] + fn truncate_under_cap_returns_as_is() { + assert_eq!(truncate_tail_at_char_boundary("abc", 10), "abc"); + } + + #[test] + fn truncate_returns_tail_on_overflow() { + assert_eq!(truncate_tail_at_char_boundary("abcdef", 3), "def"); + } + + #[test] + fn truncate_respects_multibyte_utf8_boundary() { + // "中中中" is 9 bytes; asking for 4 bytes would land mid-char. + let s = "中中中"; + let out = truncate_tail_at_char_boundary(s, 4); + // Must be valid UTF-8 (indexing an invalid boundary would have + // panicked at slicing time). + assert!(out.chars().all(|c| c == '中')); + assert!(out.len() <= 6); // at most 2 chars (6 bytes) + } } diff --git a/src-tauri/src/chat_channel/event_subscriber.rs b/src-tauri/src/chat_channel/event_subscriber.rs index 5166531..b520aef 100644 --- a/src-tauri/src/chat_channel/event_subscriber.rs +++ b/src-tauri/src/chat_channel/event_subscriber.rs @@ -108,7 +108,7 @@ pub fn spawn_event_subscriber( last_push.retain(|_, t| t.elapsed() < Duration::from_secs(DEBOUNCE_SECS * 2)); if let Some((event_type, msg)) = - parse_event(&event.channel, &event.payload, config.lang) + parse_event(&event.channel, event.payload.as_ref(), config.lang) { // Global event filter check if let Some(filter) = &config.global_filter { diff --git a/src-tauri/src/chat_channel/session_event_subscriber.rs b/src-tauri/src/chat_channel/session_event_subscriber.rs index f841ef6..0a1c922 100644 --- a/src-tauri/src/chat_channel/session_event_subscriber.rs +++ b/src-tauri/src/chat_channel/session_event_subscriber.rs @@ -48,7 +48,7 @@ pub fn spawn_session_event_subscriber( if event.channel == "acp://event" { handle_acp_event_payload( - &event.payload, + event.payload.as_ref(), &bridge, &manager, &conn_mgr, diff --git a/src-tauri/src/web/event_bridge.rs b/src-tauri/src/web/event_bridge.rs index 9b52cb7..c26f4a7 100644 --- a/src-tauri/src/web/event_bridge.rs +++ b/src-tauri/src/web/event_bridge.rs @@ -1,12 +1,25 @@ use std::sync::Arc; -use serde::Serialize; +use serde::{ser::SerializeStruct, Serialize, Serializer}; use tokio::sync::broadcast; -#[derive(Clone, Debug, Serialize)] +/// Broadcast-delivered event. +/// +/// `payload` is wrapped in `Arc` so cloning across broadcast receivers is +/// refcount-only — avoids copying multi-MB JSON trees per subscriber. +#[derive(Clone, Debug)] pub struct WebEvent { pub channel: String, - pub payload: serde_json::Value, + pub payload: Arc, +} + +impl Serialize for WebEvent { + fn serialize(&self, serializer: S) -> Result { + let mut state = serializer.serialize_struct("WebEvent", 2)?; + state.serialize_field("channel", &self.channel)?; + state.serialize_field("payload", self.payload.as_ref())?; + state.end() + } } pub struct WebEventBroadcaster { @@ -25,16 +38,28 @@ impl WebEventBroadcaster { Self { sender } } - pub fn send(&self, channel: &str, payload: &impl Serialize) { + /// Serialize `payload` once and broadcast. Returns the serialized + /// `Value` so Tauri callers can reuse it without serializing twice. + pub fn send(&self, channel: &str, payload: &impl Serialize) -> Option> { + let value = Arc::new(serde_json::to_value(payload).ok()?); + if self.sender.receiver_count() > 0 { + let _ = self.sender.send(WebEvent { + channel: channel.to_string(), + payload: value.clone(), + }); + } + Some(value) + } + + /// Broadcast a pre-serialized `Value` without re-serialization. + pub fn send_value(&self, channel: &str, payload: Arc) { if self.sender.receiver_count() == 0 { return; } - if let Ok(value) = serde_json::to_value(payload) { - let _ = self.sender.send(WebEvent { - channel: channel.to_string(), - payload: value, - }); - } + let _ = self.sender.send(WebEvent { + channel: channel.to_string(), + payload, + }); } pub fn subscribe(&self) -> broadcast::Receiver { @@ -55,19 +80,26 @@ pub enum EventEmitter { Noop, } -/// Unified event emission: sends to both Tauri webview and Web clients (if applicable). -pub fn emit_event(emitter: &EventEmitter, event: &str, payload: impl Serialize + Clone) { +/// Unified event emission: serializes the payload exactly once and dispatches +/// the shared `Arc` to both the Tauri webview and the web broadcaster. +pub fn emit_event(emitter: &EventEmitter, event: &str, payload: impl Serialize) { match emitter { #[cfg(feature = "tauri-runtime")] EventEmitter::Tauri(app) => { use tauri::{Emitter, Manager}; - let _ = app.emit(event, payload.clone()); + let Ok(value) = serde_json::to_value(&payload) else { + return; + }; + let shared = Arc::new(value); + // `&Value` is Copy, so Tauri's `Clone` bound is satisfied without + // copying the payload — Tauri serializes through the reference. + let _ = app.emit(event, shared.as_ref()); if let Some(web) = app.try_state::>() { - web.send(event, &payload); + web.send_value(event, shared); } } EventEmitter::WebOnly(broadcaster) => { - broadcaster.send(event, &payload); + let _ = broadcaster.send(event, &payload); } EventEmitter::Noop => {} }