diff --git a/src/components/message/message-list-view.tsx b/src/components/message/message-list-view.tsx index 5936c08..f32e421 100644 --- a/src/components/message/message-list-view.tsx +++ b/src/components/message/message-list-view.tsx @@ -292,13 +292,22 @@ export function MessageListView({ const { threadItems, nonStreamingAdapted } = useMemo(() => { const allTurns = timelineTurns.map((item) => item.turn) const streamingIndices = new Set() + const inProgressToolCallIdsByIndex = new Map>() timelineTurns.forEach((item, i) => { - if (item.phase === "streaming") streamingIndices.add(i) + if (item.phase === "streaming") { + streamingIndices.add(i) + if (item.inProgressToolCallIds && item.inProgressToolCallIds.size > 0) { + inProgressToolCallIdsByIndex.set(i, item.inProgressToolCallIds) + } + } }) const allAdapted = adaptMessageTurns( allTurns, adapterText, - streamingIndices.size > 0 ? streamingIndices : undefined + streamingIndices.size > 0 ? streamingIndices : undefined, + inProgressToolCallIdsByIndex.size > 0 + ? inProgressToolCallIdsByIndex + : undefined ) // Collect non-streaming adapted messages for plan extraction diff --git a/src/contexts/conversation-runtime-context.tsx b/src/contexts/conversation-runtime-context.tsx index 6e8de50..82f6df2 100644 --- a/src/contexts/conversation-runtime-context.tsx +++ b/src/contexts/conversation-runtime-context.tsx @@ -27,6 +27,9 @@ export interface ConversationTimelineTurn { key: string turn: MessageTurn phase: ConversationTimelinePhase + // Tool call IDs whose results are still streaming (only set for streaming-phase items). + // The adapter uses this to keep the tool in "running" state while exposing partial output. + inProgressToolCallIds?: Set } export interface ConversationRuntimeSession { @@ -161,16 +164,38 @@ function formatLivePlanEntries( return `Plan updated:\n${lines.join("\n")}` } +interface BuiltStreamingTurns { + turns: MessageTurn[] + inProgressToolCallIds: Set +} + +// Cache joined chunk output keyed by chunks-array identity. The ACP reducer +// creates a new chunks array only when streaming output actually changes, so +// a WeakMap keyed on the array reference lets repeated renders reuse the +// joined string without re-running O(n) concatenation. +const joinedOutputCache = new WeakMap() + +function getJoinedChunks(chunks: readonly string[]): string { + if (chunks.length === 0) return "" + if (chunks.length === 1) return chunks[0] + const cached = joinedOutputCache.get(chunks) + if (cached !== undefined) return cached + const joined = chunks.join("") + joinedOutputCache.set(chunks, joined) + return joined +} + function buildStreamingTurnsFromLiveMessage( conversationId: number, liveMessage: LiveMessage -): MessageTurn[] { +): BuiltStreamingTurns { // Split streaming content into multiple turns matching the historical // pattern: each "round" (text/thinking + tool calls + tool results) is a // separate turn. A new turn starts when a text/thinking/plan block appears // after completed tool calls in the current group. const groups: MessageTurn["blocks"][] = [[]] let currentGroupHasCompletedTool = false + const inProgressToolCallIds = new Set() for (const block of liveMessage.content) { const isContentBlock = @@ -217,17 +242,35 @@ function buildStreamingTurnsFromLiveMessage( }) const isFinalState = block.info.status === "completed" || block.info.status === "failed" + // Output precedence: raw_output_chunks (terminal polling / SDK + // raw_output field) wins over content. Some agents stream bash output + // via raw_output with raw_output_append, others via content-only + // tool_call_update notifications — we support both. + const resolvedOutput = + block.info.raw_output_chunks.length > 0 + ? getJoinedChunks(block.info.raw_output_chunks) + : block.info.content if (isFinalState) { currentBlocks.push({ type: "tool_result", tool_use_id: block.info.tool_call_id, - output_preview: - block.info.raw_output_chunks.length > 0 - ? block.info.raw_output_chunks.join("") - : block.info.content, + output_preview: resolvedOutput, is_error: block.info.status === "failed", }) currentGroupHasCompletedTool = true + } else if (resolvedOutput) { + // In-progress tool that already produced partial output. Emit the + // running result so the renderer can display live output, and flag + // the tool_call so the adapter keeps state="input-available" (the + // spinner/running visual and 24KB tail truncation both depend on + // this state). + currentBlocks.push({ + type: "tool_result", + tool_use_id: block.info.tool_call_id, + output_preview: resolvedOutput, + is_error: false, + }) + inProgressToolCallIds.add(block.info.tool_call_id) } break } @@ -235,7 +278,7 @@ function buildStreamingTurnsFromLiveMessage( } const timestamp = new Date(liveMessage.startedAt).toISOString() - return groups + const turns = groups .filter((blocks) => blocks.length > 0) .map((blocks, i) => ({ id: @@ -246,6 +289,8 @@ function buildStreamingTurnsFromLiveMessage( blocks, timestamp, })) + + return { turns, inProgressToolCallIds } } function upsertExternalIdIndex( @@ -345,7 +390,7 @@ function reducer( ? buildStreamingTurnsFromLiveMessage( current.conversationId, current.liveMessage - ) + ).turns : [] // Promote: optimisticTurns + streamingTurns → localTurns @@ -636,18 +681,21 @@ export function ConversationRuntimeProvider({ // Phase 4: Streaming turns (live agent response, split into rounds) const streamingMessage = session.liveMessage - const streamingTurns = streamingMessage + const built = streamingMessage ? buildStreamingTurnsFromLiveMessage(conversationId, streamingMessage) - : [] + : null const result = [...persisted, ...local, ...optimistic] - for (const [i, turn] of streamingTurns.entries()) { - result.push({ - key: `streaming-${conversationId}-${streamingMessage?.id ?? "unknown"}-${i}`, - turn, - phase: "streaming", - }) + if (built) { + for (const [i, turn] of built.turns.entries()) { + result.push({ + key: `streaming-${conversationId}-${streamingMessage?.id ?? "unknown"}-${i}`, + turn, + phase: "streaming", + inProgressToolCallIds: built.inProgressToolCallIds, + }) + } } return result diff --git a/src/lib/adapters/ai-elements-adapter.ts b/src/lib/adapters/ai-elements-adapter.ts index f77a629..4bd71a0 100644 --- a/src/lib/adapters/ai-elements-adapter.ts +++ b/src/lib/adapters/ai-elements-adapter.ts @@ -600,11 +600,19 @@ function buildToolResultMap( /** * Transform a MessageTurn (from backend) to AdaptedMessage format. * Same correlation logic as adaptUnifiedMessage but operates on turn.blocks. + * + * `inProgressToolCallIds` lets streaming consumers expose partial tool output + * (e.g. terminal stdout streamed during execution) without flipping the tool + * into a "completed" visual state. When a tool_use's id is in this set, the + * adapter emits state="input-available" with the partial output attached, so + * the renderer can keep showing the running spinner while the live output + * streams in. */ export function adaptMessageTurn( turn: MessageTurn, text: AdapterMessageText, - isStreaming: boolean = false + isStreaming: boolean = false, + inProgressToolCallIds?: Set ): AdaptedMessage { const adaptedContent: AdaptedContentPart[] = [] const resultMap = buildToolResultMap(turn.blocks) @@ -635,6 +643,9 @@ export function adaptMessageTurn( ? resultMap.get(block.tool_use_id) : undefined + const isToolStillRunning = + !!block.tool_use_id && !!inProgressToolCallIds?.has(block.tool_use_id) + if (matchedResult) { matchedResultIds.add(block.tool_use_id!) adaptedContent.push({ @@ -642,7 +653,11 @@ export function adaptMessageTurn( toolCallId, toolName: block.tool_name, input: block.input_preview, - state: matchedResult.is_error ? "output-error" : "output-available", + state: isToolStillRunning + ? "input-available" + : matchedResult.is_error + ? "output-error" + : "output-available", output: matchedResult.output_preview, errorText: matchedResult.is_error ? matchedResult.output_preview || undefined @@ -736,13 +751,24 @@ export function adaptMessageTurn( /** * Transform all turns in a conversation to AdaptedMessage[]. * Internally computes completedToolIds so callers don't need to. + * + * `inProgressToolCallIdsByIndex` carries the set of tool_call_ids that are + * still streaming for each streaming-phase turn (keyed by turn index). The + * adapter forwards this to adaptMessageTurn so partial output renders without + * flipping the tool out of the running visual state. */ export function adaptMessageTurns( turns: MessageTurn[], text: AdapterMessageText, - streamingIndices?: Set + streamingIndices?: Set, + inProgressToolCallIdsByIndex?: Map> ): AdaptedMessage[] { return turns.map((turn, i) => - adaptMessageTurn(turn, text, streamingIndices?.has(i) ?? false) + adaptMessageTurn( + turn, + text, + streamingIndices?.has(i) ?? false, + inProgressToolCallIdsByIndex?.get(i) + ) ) }