feat(chat): stream partial tool output during live agent response

Live tool calls that produce output while running (e.g. streaming bash
stdout from Codex) now render their partial output in real time instead
of appearing blank until the tool completes. The tool card keeps its
running visual state — spinner and 24KB tail truncation — while chunks
arrive, and transitions to the completed state once the final status
lands. A WeakMap keyed on the ACP reducer's chunks-array identity
memoizes the joined output so repeated renders don't re-run O(n) string
concatenation.
This commit is contained in:
xintaofei
2026-04-15 19:16:50 +08:00
parent 189399e5cf
commit 2cec462594
3 changed files with 104 additions and 21 deletions

View File

@@ -292,13 +292,22 @@ export function MessageListView({
const { threadItems, nonStreamingAdapted } = useMemo(() => { const { threadItems, nonStreamingAdapted } = useMemo(() => {
const allTurns = timelineTurns.map((item) => item.turn) const allTurns = timelineTurns.map((item) => item.turn)
const streamingIndices = new Set<number>() const streamingIndices = new Set<number>()
const inProgressToolCallIdsByIndex = new Map<number, Set<string>>()
timelineTurns.forEach((item, i) => { timelineTurns.forEach((item, i) => {
if (item.phase === "streaming") streamingIndices.add(i) if (item.phase === "streaming") {
streamingIndices.add(i)
if (item.inProgressToolCallIds && item.inProgressToolCallIds.size > 0) {
inProgressToolCallIdsByIndex.set(i, item.inProgressToolCallIds)
}
}
}) })
const allAdapted = adaptMessageTurns( const allAdapted = adaptMessageTurns(
allTurns, allTurns,
adapterText, adapterText,
streamingIndices.size > 0 ? streamingIndices : undefined streamingIndices.size > 0 ? streamingIndices : undefined,
inProgressToolCallIdsByIndex.size > 0
? inProgressToolCallIdsByIndex
: undefined
) )
// Collect non-streaming adapted messages for plan extraction // Collect non-streaming adapted messages for plan extraction

View File

@@ -27,6 +27,9 @@ export interface ConversationTimelineTurn {
key: string key: string
turn: MessageTurn turn: MessageTurn
phase: ConversationTimelinePhase phase: ConversationTimelinePhase
// Tool call IDs whose results are still streaming (only set for streaming-phase items).
// The adapter uses this to keep the tool in "running" state while exposing partial output.
inProgressToolCallIds?: Set<string>
} }
export interface ConversationRuntimeSession { export interface ConversationRuntimeSession {
@@ -161,16 +164,38 @@ function formatLivePlanEntries(
return `Plan updated:\n${lines.join("\n")}` return `Plan updated:\n${lines.join("\n")}`
} }
interface BuiltStreamingTurns {
turns: MessageTurn[]
inProgressToolCallIds: Set<string>
}
// Cache joined chunk output keyed by chunks-array identity. The ACP reducer
// creates a new chunks array only when streaming output actually changes, so
// a WeakMap keyed on the array reference lets repeated renders reuse the
// joined string without re-running O(n) concatenation.
const joinedOutputCache = new WeakMap<readonly string[], string>()
function getJoinedChunks(chunks: readonly string[]): string {
if (chunks.length === 0) return ""
if (chunks.length === 1) return chunks[0]
const cached = joinedOutputCache.get(chunks)
if (cached !== undefined) return cached
const joined = chunks.join("")
joinedOutputCache.set(chunks, joined)
return joined
}
function buildStreamingTurnsFromLiveMessage( function buildStreamingTurnsFromLiveMessage(
conversationId: number, conversationId: number,
liveMessage: LiveMessage liveMessage: LiveMessage
): MessageTurn[] { ): BuiltStreamingTurns {
// Split streaming content into multiple turns matching the historical // Split streaming content into multiple turns matching the historical
// pattern: each "round" (text/thinking + tool calls + tool results) is a // pattern: each "round" (text/thinking + tool calls + tool results) is a
// separate turn. A new turn starts when a text/thinking/plan block appears // separate turn. A new turn starts when a text/thinking/plan block appears
// after completed tool calls in the current group. // after completed tool calls in the current group.
const groups: MessageTurn["blocks"][] = [[]] const groups: MessageTurn["blocks"][] = [[]]
let currentGroupHasCompletedTool = false let currentGroupHasCompletedTool = false
const inProgressToolCallIds = new Set<string>()
for (const block of liveMessage.content) { for (const block of liveMessage.content) {
const isContentBlock = const isContentBlock =
@@ -217,17 +242,35 @@ function buildStreamingTurnsFromLiveMessage(
}) })
const isFinalState = const isFinalState =
block.info.status === "completed" || block.info.status === "failed" block.info.status === "completed" || block.info.status === "failed"
// Output precedence: raw_output_chunks (terminal polling / SDK
// raw_output field) wins over content. Some agents stream bash output
// via raw_output with raw_output_append, others via content-only
// tool_call_update notifications — we support both.
const resolvedOutput =
block.info.raw_output_chunks.length > 0
? getJoinedChunks(block.info.raw_output_chunks)
: block.info.content
if (isFinalState) { if (isFinalState) {
currentBlocks.push({ currentBlocks.push({
type: "tool_result", type: "tool_result",
tool_use_id: block.info.tool_call_id, tool_use_id: block.info.tool_call_id,
output_preview: output_preview: resolvedOutput,
block.info.raw_output_chunks.length > 0
? block.info.raw_output_chunks.join("")
: block.info.content,
is_error: block.info.status === "failed", is_error: block.info.status === "failed",
}) })
currentGroupHasCompletedTool = true currentGroupHasCompletedTool = true
} else if (resolvedOutput) {
// In-progress tool that already produced partial output. Emit the
// running result so the renderer can display live output, and flag
// the tool_call so the adapter keeps state="input-available" (the
// spinner/running visual and 24KB tail truncation both depend on
// this state).
currentBlocks.push({
type: "tool_result",
tool_use_id: block.info.tool_call_id,
output_preview: resolvedOutput,
is_error: false,
})
inProgressToolCallIds.add(block.info.tool_call_id)
} }
break break
} }
@@ -235,7 +278,7 @@ function buildStreamingTurnsFromLiveMessage(
} }
const timestamp = new Date(liveMessage.startedAt).toISOString() const timestamp = new Date(liveMessage.startedAt).toISOString()
return groups const turns = groups
.filter((blocks) => blocks.length > 0) .filter((blocks) => blocks.length > 0)
.map((blocks, i) => ({ .map((blocks, i) => ({
id: id:
@@ -246,6 +289,8 @@ function buildStreamingTurnsFromLiveMessage(
blocks, blocks,
timestamp, timestamp,
})) }))
return { turns, inProgressToolCallIds }
} }
function upsertExternalIdIndex( function upsertExternalIdIndex(
@@ -345,7 +390,7 @@ function reducer(
? buildStreamingTurnsFromLiveMessage( ? buildStreamingTurnsFromLiveMessage(
current.conversationId, current.conversationId,
current.liveMessage current.liveMessage
) ).turns
: [] : []
// Promote: optimisticTurns + streamingTurns → localTurns // Promote: optimisticTurns + streamingTurns → localTurns
@@ -636,18 +681,21 @@ export function ConversationRuntimeProvider({
// Phase 4: Streaming turns (live agent response, split into rounds) // Phase 4: Streaming turns (live agent response, split into rounds)
const streamingMessage = session.liveMessage const streamingMessage = session.liveMessage
const streamingTurns = streamingMessage const built = streamingMessage
? buildStreamingTurnsFromLiveMessage(conversationId, streamingMessage) ? buildStreamingTurnsFromLiveMessage(conversationId, streamingMessage)
: [] : null
const result = [...persisted, ...local, ...optimistic] const result = [...persisted, ...local, ...optimistic]
for (const [i, turn] of streamingTurns.entries()) { if (built) {
result.push({ for (const [i, turn] of built.turns.entries()) {
key: `streaming-${conversationId}-${streamingMessage?.id ?? "unknown"}-${i}`, result.push({
turn, key: `streaming-${conversationId}-${streamingMessage?.id ?? "unknown"}-${i}`,
phase: "streaming", turn,
}) phase: "streaming",
inProgressToolCallIds: built.inProgressToolCallIds,
})
}
} }
return result return result

View File

@@ -600,11 +600,19 @@ function buildToolResultMap(
/** /**
* Transform a MessageTurn (from backend) to AdaptedMessage format. * Transform a MessageTurn (from backend) to AdaptedMessage format.
* Same correlation logic as adaptUnifiedMessage but operates on turn.blocks. * Same correlation logic as adaptUnifiedMessage but operates on turn.blocks.
*
* `inProgressToolCallIds` lets streaming consumers expose partial tool output
* (e.g. terminal stdout streamed during execution) without flipping the tool
* into a "completed" visual state. When a tool_use's id is in this set, the
* adapter emits state="input-available" with the partial output attached, so
* the renderer can keep showing the running spinner while the live output
* streams in.
*/ */
export function adaptMessageTurn( export function adaptMessageTurn(
turn: MessageTurn, turn: MessageTurn,
text: AdapterMessageText, text: AdapterMessageText,
isStreaming: boolean = false isStreaming: boolean = false,
inProgressToolCallIds?: Set<string>
): AdaptedMessage { ): AdaptedMessage {
const adaptedContent: AdaptedContentPart[] = [] const adaptedContent: AdaptedContentPart[] = []
const resultMap = buildToolResultMap(turn.blocks) const resultMap = buildToolResultMap(turn.blocks)
@@ -635,6 +643,9 @@ export function adaptMessageTurn(
? resultMap.get(block.tool_use_id) ? resultMap.get(block.tool_use_id)
: undefined : undefined
const isToolStillRunning =
!!block.tool_use_id && !!inProgressToolCallIds?.has(block.tool_use_id)
if (matchedResult) { if (matchedResult) {
matchedResultIds.add(block.tool_use_id!) matchedResultIds.add(block.tool_use_id!)
adaptedContent.push({ adaptedContent.push({
@@ -642,7 +653,11 @@ export function adaptMessageTurn(
toolCallId, toolCallId,
toolName: block.tool_name, toolName: block.tool_name,
input: block.input_preview, input: block.input_preview,
state: matchedResult.is_error ? "output-error" : "output-available", state: isToolStillRunning
? "input-available"
: matchedResult.is_error
? "output-error"
: "output-available",
output: matchedResult.output_preview, output: matchedResult.output_preview,
errorText: matchedResult.is_error errorText: matchedResult.is_error
? matchedResult.output_preview || undefined ? matchedResult.output_preview || undefined
@@ -736,13 +751,24 @@ export function adaptMessageTurn(
/** /**
* Transform all turns in a conversation to AdaptedMessage[]. * Transform all turns in a conversation to AdaptedMessage[].
* Internally computes completedToolIds so callers don't need to. * Internally computes completedToolIds so callers don't need to.
*
* `inProgressToolCallIdsByIndex` carries the set of tool_call_ids that are
* still streaming for each streaming-phase turn (keyed by turn index). The
* adapter forwards this to adaptMessageTurn so partial output renders without
* flipping the tool out of the running visual state.
*/ */
export function adaptMessageTurns( export function adaptMessageTurns(
turns: MessageTurn[], turns: MessageTurn[],
text: AdapterMessageText, text: AdapterMessageText,
streamingIndices?: Set<number> streamingIndices?: Set<number>,
inProgressToolCallIdsByIndex?: Map<number, Set<string>>
): AdaptedMessage[] { ): AdaptedMessage[] {
return turns.map((turn, i) => return turns.map((turn, i) =>
adaptMessageTurn(turn, text, streamingIndices?.has(i) ?? false) adaptMessageTurn(
turn,
text,
streamingIndices?.has(i) ?? false,
inProgressToolCallIdsByIndex?.get(i)
)
) )
} }