diff --git a/src-tauri/src/acp/connection.rs b/src-tauri/src/acp/connection.rs index 691aafd..49974e1 100644 --- a/src-tauri/src/acp/connection.rs +++ b/src-tauri/src/acp/connection.rs @@ -986,12 +986,19 @@ async fn run_connection( notif.update, SessionUpdate::AvailableCommandsUpdate(_) ) { + // Historical-replay path only + // forwards AvailableCommandsUpdate, + // which never carries tool output + // — a throwaway cache is fine. + let mut replay_cache = + ToolCallOutputCache::default(); emit_conversation_update( &cid, &h, agent_type, notif.update, None, + &mut replay_cache, ); } Ok(()) @@ -1346,6 +1353,236 @@ async fn set_session_config_option( const TERMINAL_POLL_INTERVAL_MS: u64 = 200; const TERMINAL_POLL_MISSING_LIMIT: u8 = 10; +/// Hard cap on the size of a single ACP event's `raw_output` payload. +/// +/// Agents (e.g. Claude Code, Codex) frequently send `tool_call_update` +/// notifications where `raw_output` is the **full accumulated** tool output +/// rather than an incremental delta. For long-running terminal tools this +/// leads to O(N²) bytes flowing through the event pipeline and multi-GB +/// transient allocations (serde_json Value trees, IPC buffers, broadcast +/// channel backlog). This constant caps any single emitted chunk so the +/// pipeline never sees a multi-MB event. +const MAX_SINGLE_EMIT_BYTES: usize = 64 * 1024; + +/// Byte length of the tail we retain per tool-call to verify that the next +/// incoming snapshot is a cumulative extension of the previous one. Small +/// enough to keep the cache bounded even in pathological sessions, large +/// enough that a matching tail is an extremely unlikely coincidence. +const MAX_CACHED_TAIL_BYTES: usize = 8 * 1024; + +/// Hard cap on the number of tool-call entries the cache retains. Prevents +/// unbounded growth in long sessions where agents forget to mark tool calls +/// as completed. Entries are evicted FIFO by generation counter. +const MAX_CACHE_ENTRIES: usize = 256; + +/// Prefix used when an emitted chunk had to be truncated. +const TRUNCATION_MARKER: &str = "[...truncated...]\n"; + +#[derive(Debug)] +struct CachedOutput { + /// Total byte length of the last observed `raw_output`. + total_len: usize, + /// Tail of the last observed `raw_output`, up to `MAX_CACHED_TAIL_BYTES` + /// bytes. Always aligned to a UTF-8 character boundary at the start. + tail: String, + /// Monotonic insertion/update tick used for FIFO eviction. + generation: u64, +} + +/// Per-session cache of the last `raw_output` fingerprint emitted for each +/// tool call. Enables delta detection: when an agent sends cumulative +/// snapshots, we forward only the suffix (with `raw_output_append=true`) +/// and keep the fingerprint bounded so it works even when the full output +/// grows into the multi-MB range. +#[derive(Debug, Default)] +struct ToolCallOutputCache { + entries: HashMap, + next_generation: u64, +} + +impl ToolCallOutputCache { + /// Diff an incoming full `raw_output` snapshot for `tool_call_id` against + /// the cache and return what should be emitted downstream. + /// + /// Returns `None` when the incoming snapshot is identical to the + /// previously emitted one (nothing to send). Otherwise returns + /// `(payload, append)` where: + /// - `append=true` — `payload` is a (possibly truncated) suffix delta; + /// the frontend should append it to the existing chunks. + /// - `append=false` — `payload` is a (possibly truncated) replacement + /// for the full tool output; the frontend should reset chunks. + fn consume(&mut self, tool_call_id: &str, curr: &str) -> Option<(String, bool)> { + let curr_len = curr.len(); + + let decision: Option<(String, bool)> = match self.entries.get(tool_call_id) { + Some(prev) if curr_len >= prev.total_len && self.is_extension_of(prev, curr) => { + if curr_len == prev.total_len { + // Identical output — nothing to emit. Cache stays fresh. + return None; + } + let suffix = &curr[prev.total_len..]; + Some(build_emit_payload(suffix, true)) + } + _ => Some(build_emit_payload(curr, false)), + }; + + // Update cache snapshot to current state so the next update can + // still detect a prefix extension. + let tail = trim_partial_ansi_tail(truncate_tail_at_char_boundary( + curr, + MAX_CACHED_TAIL_BYTES, + )) + .to_string(); + let generation = self.next_generation; + self.next_generation = self.next_generation.wrapping_add(1); + self.entries.insert( + tool_call_id.to_string(), + CachedOutput { + total_len: curr_len, + tail, + generation, + }, + ); + self.enforce_entry_cap(); + decision + } + + /// Seed the cache with an initial snapshot for `tool_call_id`, WITHOUT + /// attempting to diff against any prior state. Used for the initial + /// `SessionUpdate::ToolCall` notification, whose frontend reducer + /// treats `raw_output` as a full replacement. + fn seed(&mut self, tool_call_id: &str, curr: &str) -> Option { + let (payload, _append) = build_emit_payload(curr, false); + let tail = trim_partial_ansi_tail(truncate_tail_at_char_boundary( + curr, + MAX_CACHED_TAIL_BYTES, + )) + .to_string(); + let generation = self.next_generation; + self.next_generation = self.next_generation.wrapping_add(1); + self.entries.insert( + tool_call_id.to_string(), + CachedOutput { + total_len: curr.len(), + tail, + generation, + }, + ); + self.enforce_entry_cap(); + if payload.is_empty() { + None + } else { + Some(payload) + } + } + + /// Drop cached state for a tool call that has finished. Keeps the + /// session-scoped cache bounded in long-running sessions. + fn remove_if_final(&mut self, tool_call_id: &str, status: Option<&str>) { + if matches!( + status, + Some("completed" | "failed" | "cancelled" | "error") + ) { + self.entries.remove(tool_call_id); + } + } + + /// Returns true when the cached fingerprint matches `curr` at the + /// expected offset — i.e. `curr` is a prefix extension (or identity) + /// of the previously observed snapshot. + fn is_extension_of(&self, prev: &CachedOutput, curr: &str) -> bool { + let tail_start = prev.total_len.saturating_sub(prev.tail.len()); + curr.get(tail_start..prev.total_len) + .is_some_and(|slice| slice == prev.tail.as_str()) + } + + /// Evict oldest entries (by `generation`) once the cache exceeds the + /// entry cap. Linear scan over a bounded map, so O(MAX_CACHE_ENTRIES) + /// per eviction — acceptable at this size. + fn enforce_entry_cap(&mut self) { + while self.entries.len() > MAX_CACHE_ENTRIES { + let Some(oldest_id) = self + .entries + .iter() + .min_by_key(|(_, v)| v.generation) + .map(|(k, _)| k.clone()) + else { + break; + }; + self.entries.remove(&oldest_id); + } + } +} + +/// Apply the per-event size cap + truncation marker. Returns `(payload, +/// append)`. An empty `text` yields an empty `payload`; callers should +/// decide whether to suppress the emission in that case. +fn build_emit_payload(text: &str, append: bool) -> (String, bool) { + let truncated = trim_partial_ansi_tail(truncate_tail_at_char_boundary( + text, + MAX_SINGLE_EMIT_BYTES, + )); + let out = if truncated.len() < text.len() { + format!("{TRUNCATION_MARKER}{truncated}") + } else { + truncated.to_string() + }; + (out, append) +} + +/// Return a substring of `s` whose byte length is `<= max_bytes`, aligned to +/// a UTF-8 character boundary and taken from the TAIL of `s` (so the most +/// recent output is preserved when truncation is required). +fn truncate_tail_at_char_boundary(s: &str, max_bytes: usize) -> &str { + if s.len() <= max_bytes { + return s; + } + let mut start = s.len() - max_bytes; + while start < s.len() && !s.is_char_boundary(start) { + start += 1; + } + &s[start..] +} + +/// If the very end of `s` contains a partial ANSI escape sequence, trim it +/// so downstream ANSI parsers (e.g. the frontend `ansi-to-react` renderer) +/// don't see a half-emitted escape. +/// +/// Handles the three common ACP-stream cases: +/// - CSI (`ESC [ ... final`): terminator is a byte in 0x40..=0x7E after +/// the `[` introducer. +/// - OSC (`ESC ] ... ST|BEL`): terminator is BEL (0x07) or `ESC \`. +/// - Simple two-byte escape (`ESC `): complete as soon as the byte +/// following ESC is present. +/// +/// ESC is ASCII (1 byte), always a valid UTF-8 char boundary, so slicing +/// at `esc_pos` cannot produce an invalid UTF-8 string. +fn trim_partial_ansi_tail(s: &str) -> &str { + let bytes = s.as_bytes(); + let Some(esc_pos) = bytes.iter().rposition(|&b| b == 0x1B) else { + return s; + }; + let after = &bytes[esc_pos + 1..]; + if after.is_empty() { + return &s[..esc_pos]; + } + let terminated = match after[0] { + b'[' => after[1..] + .iter() + .any(|&b| (0x40..=0x7E).contains(&b)), + b']' => after[1..].contains(&0x07) + || after[1..].windows(2).any(|w| w[0] == 0x1B && w[1] == b'\\'), + // Two-byte escape sequences (ESC M, ESC D, …) are complete as + // soon as the second byte is present. + _ => true, + }; + if terminated { + s + } else { + &s[..esc_pos] + } +} + #[derive(Debug, Default)] struct TrackedTerminalToolCall { terminal_ids: Vec, @@ -1561,6 +1798,12 @@ fn emit_terminal_output_update( output: String, append: bool, ) { + // Safety cap: when a subprocess writes very fast between poll ticks, + // the delta produced by `poll_terminal_tool_call_output` can still be + // up to ~1 MB (the terminal buffer limit). Enforce the pipeline-wide + // single-event cap (with ANSI-safe truncation) before emission so the + // WS/IPC fanout never carries a multi-MB payload. + let (payload, _append) = build_emit_payload(&output, append); crate::web::event_bridge::emit_event( emitter, "acp://event", @@ -1571,7 +1814,7 @@ fn emit_terminal_output_update( status: None, content: None, raw_input: None, - raw_output: Some(output), + raw_output: Some(payload), raw_output_append: Some(append), locations: None, meta: None, @@ -1807,6 +2050,10 @@ async fn run_conversation_loop<'a>( cwd: &str, supports_fork: bool, ) -> Result, sacp::Error> { + // Session-scoped cache for diffing cumulative `raw_output` snapshots + // into incremental deltas. Shared across the idle loop and the active + // turn loop so tool calls that span turns stay consistent. + let mut raw_output_cache = ToolCallOutputCache::default(); loop { // Wait for either a user command or a session update (e.g. available_commands_update) let cmd = loop { @@ -1823,7 +2070,7 @@ async fn run_conversation_loop<'a>( let _ = MatchDispatch::new(dispatch) .if_notification( async |notif: SessionNotification| { - emit_conversation_update(&cid, &h, agent_type, notif.update, cwd_opt); + emit_conversation_update(&cid, &h, agent_type, notif.update, cwd_opt, &mut raw_output_cache); Ok(()) }, ) @@ -1918,7 +2165,7 @@ async fn run_conversation_loop<'a>( ¬if.update, &mut tracked_terminal_tool_calls, ); - emit_conversation_update(&cid, &h, agent_type, notif.update, cwd_opt); + emit_conversation_update(&cid, &h, agent_type, notif.update, cwd_opt, &mut raw_output_cache); if should_poll_now { poll_tracked_terminal_tool_calls( runtime.as_ref(), @@ -2481,12 +2728,17 @@ fn fix_usage_update_nulls(mut dispatch: Dispatch) -> Dispatch { } /// Convert a SessionUpdate into AcpEvent(s) and emit to frontend. +/// +/// `raw_output_cache` is a per-session cache used to detect cumulative +/// snapshots from agents and convert them into incremental deltas so the +/// event pipeline never carries a full N-MB tool output more than once. fn emit_conversation_update( connection_id: &str, emitter: &EventEmitter, agent_type: AgentType, update: SessionUpdate, cwd: Option<&str>, + raw_output_cache: &mut ToolCallOutputCache, ) { match update { SessionUpdate::UserMessageChunk(_) => { @@ -2526,26 +2778,33 @@ fn emit_conversation_update( // Non-text thought chunks are currently ignored. } SessionUpdate::ToolCall(tc) => { + let tool_call_id = tc.tool_call_id.to_string(); let content = serialize_tool_call_content(&tc.content); let raw_input = json_value_to_text(&tc.raw_input).map(|text| resolve_live_tool_input(&text, cwd)); - let raw_output = - json_value_to_text(&tc.raw_output).map(|text| structurize_live_output(&text)); + // Initial tool_call notification — the frontend reducer + // treats `raw_output` as a full replacement, so we bypass + // the diff path and seed the cache with the current snapshot. + let raw_output = json_value_to_text(&tc.raw_output) + .map(|text| structurize_live_output(&text)) + .and_then(|text| raw_output_cache.seed(&tool_call_id, &text)); let locations = if tc.locations.is_empty() { None } else { serde_json::to_value(&tc.locations).ok() }; let meta = tc.meta.map(serde_json::Value::Object); + let status = format!("{:?}", tc.status).to_lowercase(); + raw_output_cache.remove_if_final(&tool_call_id, Some(status.as_str())); crate::web::event_bridge::emit_event( emitter, "acp://event", AcpEvent::ToolCall { connection_id: connection_id.into(), - tool_call_id: tc.tool_call_id.to_string(), + tool_call_id, title: tc.title, kind: format!("{:?}", tc.kind).to_lowercase(), - status: format!("{:?}", tc.status).to_lowercase(), + status, content, raw_input, raw_output, @@ -2555,6 +2814,7 @@ fn emit_conversation_update( ); } SessionUpdate::ToolCallUpdate(tcu) => { + let tool_call_id = tcu.tool_call_id.to_string(); let content = tcu .fields .content @@ -2562,8 +2822,21 @@ fn emit_conversation_update( .and_then(serialize_tool_call_content); let raw_input = json_value_to_text(&tcu.fields.raw_input) .map(|text| resolve_live_tool_input(&text, cwd)); - let raw_output = json_value_to_text(&tcu.fields.raw_output) + // Diff the incoming raw_output against the last snapshot we + // emitted for this tool call. This turns cumulative snapshots + // from agents (Claude Code, Codex, …) into incremental deltas + // with `raw_output_append=true`, collapsing the O(N²) transfer + // problem to O(N) while capping any single emitted chunk to + // MAX_SINGLE_EMIT_BYTES. + let raw_output_text = json_value_to_text(&tcu.fields.raw_output) .map(|text| structurize_live_output(&text)); + let (raw_output, raw_output_append) = match raw_output_text { + Some(text) => match raw_output_cache.consume(&tool_call_id, &text) { + Some((payload, append)) => (Some(payload), Some(append)), + None => (None, None), + }, + None => (None, None), + }; let locations = tcu .fields .locations @@ -2571,18 +2844,20 @@ fn emit_conversation_update( .filter(|l| !l.is_empty()) .and_then(|l| serde_json::to_value(l).ok()); let meta = tcu.meta.clone().map(serde_json::Value::Object); + let status = tcu.fields.status.map(|s| format!("{:?}", s).to_lowercase()); + raw_output_cache.remove_if_final(&tool_call_id, status.as_deref()); crate::web::event_bridge::emit_event( emitter, "acp://event", AcpEvent::ToolCallUpdate { connection_id: connection_id.into(), - tool_call_id: tcu.tool_call_id.to_string(), + tool_call_id, title: tcu.fields.title, - status: tcu.fields.status.map(|s| format!("{:?}", s).to_lowercase()), + status, content, raw_input, raw_output, - raw_output_append: None, + raw_output_append, locations, meta, }, @@ -2761,4 +3036,205 @@ mod tests { assert!(req.meta.is_none()); } + + // ─── ToolCallOutputCache ──────────────────────────────────────────── + + #[test] + fn cache_first_update_emits_full_replace() { + let mut cache = ToolCallOutputCache::default(); + let (payload, append) = cache.consume("t1", "hello world").expect("should emit"); + assert_eq!(payload, "hello world"); + assert!(!append, "first emit must be replacement"); + } + + #[test] + fn cache_repeated_identical_snapshot_is_noop() { + let mut cache = ToolCallOutputCache::default(); + cache.consume("t1", "same").unwrap(); + assert!( + cache.consume("t1", "same").is_none(), + "identical snapshot must not emit" + ); + } + + #[test] + fn cache_prefix_extension_emits_suffix_with_append() { + let mut cache = ToolCallOutputCache::default(); + cache.consume("t1", "line-1\n").unwrap(); + let (payload, append) = cache + .consume("t1", "line-1\nline-2\n") + .expect("should emit"); + assert_eq!(payload, "line-2\n"); + assert!(append, "prefix extension must emit with append=true"); + } + + #[test] + fn cache_divergent_snapshot_falls_back_to_replace() { + let mut cache = ToolCallOutputCache::default(); + cache.consume("t1", "hello world").unwrap(); + let (payload, append) = cache.consume("t1", "foo bar baz").expect("should emit"); + assert_eq!(payload, "foo bar baz"); + assert!(!append, "non-extension snapshot must replace"); + } + + #[test] + fn cache_tracks_extensions_past_cached_tail_boundary() { + // Regression test for the original bug: when cumulative raw_output + // exceeds MAX_CACHED_TAIL_BYTES, subsequent extensions must still be + // detectable by comparing the cached tail against the expected + // offset in the incoming snapshot. + let mut cache = ToolCallOutputCache::default(); + // First snapshot: 10 KB of 'a' + unique 4 KB marker at the end. + let prefix = "a".repeat(10 * 1024); + let marker = "M".repeat(4 * 1024); + let first = format!("{prefix}{marker}"); + cache.consume("t1", &first).unwrap(); + + // Second snapshot extends first by 16 KB of 'Z'. + let delta = "Z".repeat(16 * 1024); + let second = format!("{first}{delta}"); + let (payload, append) = cache.consume("t1", &second).expect("should emit"); + assert!(append, "extension beyond cached tail must still be detected"); + // The emitted payload should carry the delta (or its tail when + // truncated at MAX_SINGLE_EMIT_BYTES). For a 16 KB delta that's + // well below the 64 KB cap, we expect it verbatim. + assert_eq!(payload, delta); + } + + #[test] + fn cache_extension_larger_than_emit_cap_gets_truncated() { + let mut cache = ToolCallOutputCache::default(); + cache.consume("t1", "seed").unwrap(); + // Build a delta much larger than MAX_SINGLE_EMIT_BYTES. + let big_delta = "X".repeat(MAX_SINGLE_EMIT_BYTES * 2); + let second = format!("seed{big_delta}"); + let (payload, append) = cache.consume("t1", &second).expect("should emit"); + assert!(append); + assert!( + payload.starts_with(TRUNCATION_MARKER), + "oversized delta must be prefixed with truncation marker" + ); + // Payload length: marker + at most MAX_SINGLE_EMIT_BYTES of tail. + assert!(payload.len() <= TRUNCATION_MARKER.len() + MAX_SINGLE_EMIT_BYTES); + } + + #[test] + fn cache_respects_utf8_char_boundary_on_truncation() { + let mut cache = ToolCallOutputCache::default(); + // Single first-update whose byte length forces truncation at a + // position that would otherwise fall mid-codepoint. 中 is 3 bytes + // (E4 B8 AD) and MAX_SINGLE_EMIT_BYTES (65536) is not a multiple + // of 3, so naïve byte slicing would land mid-char. + let chinese_block = "中".repeat((MAX_SINGLE_EMIT_BYTES / 3) + 100); + let (payload, _append) = cache.consume("t1", &chinese_block).expect("should emit"); + // Payload must start with the truncation marker (since size > cap). + assert!( + payload.starts_with(TRUNCATION_MARKER), + "oversized snapshot must be truncated" + ); + // Body after the marker must be valid UTF-8 consisting only of 中. + let body = &payload[TRUNCATION_MARKER.len()..]; + assert!(!body.is_empty()); + assert!( + body.chars().all(|c| c == '中'), + "truncation boundary must land on a UTF-8 codepoint edge" + ); + } + + #[test] + fn cache_final_status_clears_entry() { + let mut cache = ToolCallOutputCache::default(); + cache.consume("t1", "hello").unwrap(); + assert!(cache.entries.contains_key("t1")); + cache.remove_if_final("t1", Some("completed")); + assert!(!cache.entries.contains_key("t1")); + + cache.consume("t2", "x").unwrap(); + cache.remove_if_final("t2", Some("cancelled")); + assert!(!cache.entries.contains_key("t2")); + + cache.consume("t3", "x").unwrap(); + cache.remove_if_final("t3", Some("in_progress")); + assert!( + cache.entries.contains_key("t3"), + "in-progress status must not clear cache" + ); + } + + #[test] + fn cache_enforces_entry_cap_via_fifo_eviction() { + let mut cache = ToolCallOutputCache::default(); + for i in 0..(MAX_CACHE_ENTRIES + 50) { + cache.consume(&format!("tool-{i}"), "body").unwrap(); + } + assert_eq!(cache.entries.len(), MAX_CACHE_ENTRIES); + // Oldest entries should have been evicted; newest must still exist. + assert!(!cache.entries.contains_key("tool-0")); + assert!(cache + .entries + .contains_key(&format!("tool-{}", MAX_CACHE_ENTRIES + 49))); + } + + #[test] + fn cache_seed_always_replaces_and_caches() { + let mut cache = ToolCallOutputCache::default(); + cache.consume("t1", "stale").unwrap(); + // A hypothetical replay would send another ToolCall for the same + // id — seed() must install the new snapshot without trying to + // diff against the stale prior entry. + let payload = cache.seed("t1", "fresh").expect("seed emits"); + assert_eq!(payload, "fresh"); + // Next consume should diff against "fresh", not "stale". + let (p2, append) = cache.consume("t1", "fresh+more").expect("emit"); + assert!(append, "should detect extension of freshly seeded entry"); + assert_eq!(p2, "+more"); + } + + // ─── trim_partial_ansi_tail ───────────────────────────────────────── + + #[test] + fn ansi_trim_leaves_pure_text_unchanged() { + assert_eq!(trim_partial_ansi_tail("plain text"), "plain text"); + } + + #[test] + fn ansi_trim_keeps_completed_sequences() { + let s = "\x1b[31mRED\x1b[0m done"; + assert_eq!(trim_partial_ansi_tail(s), s); + } + + #[test] + fn ansi_trim_cuts_unterminated_trailing_sequence() { + let s = "hello \x1b[31"; + assert_eq!(trim_partial_ansi_tail(s), "hello "); + } + + #[test] + fn ansi_trim_handles_bare_escape_at_end() { + let s = "hello\x1b"; + assert_eq!(trim_partial_ansi_tail(s), "hello"); + } + + // ─── truncate_tail_at_char_boundary ───────────────────────────────── + + #[test] + fn truncate_under_cap_returns_as_is() { + assert_eq!(truncate_tail_at_char_boundary("abc", 10), "abc"); + } + + #[test] + fn truncate_returns_tail_on_overflow() { + assert_eq!(truncate_tail_at_char_boundary("abcdef", 3), "def"); + } + + #[test] + fn truncate_respects_multibyte_utf8_boundary() { + // "中中中" is 9 bytes; asking for 4 bytes would land mid-char. + let s = "中中中"; + let out = truncate_tail_at_char_boundary(s, 4); + // Must be valid UTF-8 (indexing an invalid boundary would have + // panicked at slicing time). + assert!(out.chars().all(|c| c == '中')); + assert!(out.len() <= 6); // at most 2 chars (6 bytes) + } } diff --git a/src-tauri/src/chat_channel/event_subscriber.rs b/src-tauri/src/chat_channel/event_subscriber.rs index 5166531..b520aef 100644 --- a/src-tauri/src/chat_channel/event_subscriber.rs +++ b/src-tauri/src/chat_channel/event_subscriber.rs @@ -108,7 +108,7 @@ pub fn spawn_event_subscriber( last_push.retain(|_, t| t.elapsed() < Duration::from_secs(DEBOUNCE_SECS * 2)); if let Some((event_type, msg)) = - parse_event(&event.channel, &event.payload, config.lang) + parse_event(&event.channel, event.payload.as_ref(), config.lang) { // Global event filter check if let Some(filter) = &config.global_filter { diff --git a/src-tauri/src/chat_channel/session_event_subscriber.rs b/src-tauri/src/chat_channel/session_event_subscriber.rs index f841ef6..0a1c922 100644 --- a/src-tauri/src/chat_channel/session_event_subscriber.rs +++ b/src-tauri/src/chat_channel/session_event_subscriber.rs @@ -48,7 +48,7 @@ pub fn spawn_session_event_subscriber( if event.channel == "acp://event" { handle_acp_event_payload( - &event.payload, + event.payload.as_ref(), &bridge, &manager, &conn_mgr, diff --git a/src-tauri/src/web/event_bridge.rs b/src-tauri/src/web/event_bridge.rs index 9b52cb7..c26f4a7 100644 --- a/src-tauri/src/web/event_bridge.rs +++ b/src-tauri/src/web/event_bridge.rs @@ -1,12 +1,25 @@ use std::sync::Arc; -use serde::Serialize; +use serde::{ser::SerializeStruct, Serialize, Serializer}; use tokio::sync::broadcast; -#[derive(Clone, Debug, Serialize)] +/// Broadcast-delivered event. +/// +/// `payload` is wrapped in `Arc` so cloning across broadcast receivers is +/// refcount-only — avoids copying multi-MB JSON trees per subscriber. +#[derive(Clone, Debug)] pub struct WebEvent { pub channel: String, - pub payload: serde_json::Value, + pub payload: Arc, +} + +impl Serialize for WebEvent { + fn serialize(&self, serializer: S) -> Result { + let mut state = serializer.serialize_struct("WebEvent", 2)?; + state.serialize_field("channel", &self.channel)?; + state.serialize_field("payload", self.payload.as_ref())?; + state.end() + } } pub struct WebEventBroadcaster { @@ -25,16 +38,28 @@ impl WebEventBroadcaster { Self { sender } } - pub fn send(&self, channel: &str, payload: &impl Serialize) { + /// Serialize `payload` once and broadcast. Returns the serialized + /// `Value` so Tauri callers can reuse it without serializing twice. + pub fn send(&self, channel: &str, payload: &impl Serialize) -> Option> { + let value = Arc::new(serde_json::to_value(payload).ok()?); + if self.sender.receiver_count() > 0 { + let _ = self.sender.send(WebEvent { + channel: channel.to_string(), + payload: value.clone(), + }); + } + Some(value) + } + + /// Broadcast a pre-serialized `Value` without re-serialization. + pub fn send_value(&self, channel: &str, payload: Arc) { if self.sender.receiver_count() == 0 { return; } - if let Ok(value) = serde_json::to_value(payload) { - let _ = self.sender.send(WebEvent { - channel: channel.to_string(), - payload: value, - }); - } + let _ = self.sender.send(WebEvent { + channel: channel.to_string(), + payload, + }); } pub fn subscribe(&self) -> broadcast::Receiver { @@ -55,19 +80,26 @@ pub enum EventEmitter { Noop, } -/// Unified event emission: sends to both Tauri webview and Web clients (if applicable). -pub fn emit_event(emitter: &EventEmitter, event: &str, payload: impl Serialize + Clone) { +/// Unified event emission: serializes the payload exactly once and dispatches +/// the shared `Arc` to both the Tauri webview and the web broadcaster. +pub fn emit_event(emitter: &EventEmitter, event: &str, payload: impl Serialize) { match emitter { #[cfg(feature = "tauri-runtime")] EventEmitter::Tauri(app) => { use tauri::{Emitter, Manager}; - let _ = app.emit(event, payload.clone()); + let Ok(value) = serde_json::to_value(&payload) else { + return; + }; + let shared = Arc::new(value); + // `&Value` is Copy, so Tauri's `Clone` bound is satisfied without + // copying the payload — Tauri serializes through the reference. + let _ = app.emit(event, shared.as_ref()); if let Some(web) = app.try_state::>() { - web.send(event, &payload); + web.send_value(event, shared); } } EventEmitter::WebOnly(broadcaster) => { - broadcaster.send(event, &payload); + let _ = broadcaster.send(event, &payload); } EventEmitter::Noop => {} }