继续重构会话消息处理逻辑

This commit is contained in:
xintaofei
2026-03-12 18:34:34 +08:00
parent 4e49e2f16a
commit bd5456423f
9 changed files with 452 additions and 779 deletions

View File

@@ -6,17 +6,15 @@ import {
useContext,
useMemo,
useReducer,
useRef,
type ReactNode,
} from "react"
import type { LiveMessage } from "@/contexts/acp-connections-context"
import { getFolderConversation } from "@/lib/tauri"
import type { DbConversationDetail, MessageTurn } from "@/lib/types"
import { inferLiveToolName } from "@/lib/tool-call-normalization"
export type ConversationSyncState =
| "idle"
| "awaiting_persist"
| "reconciling"
| "failed"
export type ConversationSyncState = "idle" | "awaiting_persist"
export type ConversationTimelinePhase = "persisted" | "optimistic" | "streaming"
@@ -29,15 +27,25 @@ export interface ConversationTimelineTurn {
export interface ConversationRuntimeSession {
conversationId: number
externalId: string | null
persistedTurns: MessageTurn[]
// DB data (cold open only)
detail: DbConversationDetail | null
detailLoading: boolean
detailError: string | null
// Active session accumulated turns (promoted optimistic + completed streaming)
localTurns: MessageTurn[]
// Temporary state
optimisticTurns: MessageTurn[]
liveMessage: LiveMessage | null
// Sync
syncState: ConversationSyncState
activeTurnToken: string | null
lastHydratedAt: number | null
lastPersistedAt: number | null
persistedUpdatedAt: string | null
persistedMessageCount: number
// Cleanup
pendingCleanup: boolean
}
interface ConversationRuntimeState {
@@ -51,7 +59,24 @@ const initialState: ConversationRuntimeState = {
}
type Action =
| { type: "HYDRATE_FROM_DETAIL"; detail: DbConversationDetail }
| {
type: "FETCH_DETAIL_START"
conversationId: number
}
| {
type: "FETCH_DETAIL_SUCCESS"
conversationId: number
detail: DbConversationDetail
}
| {
type: "FETCH_DETAIL_ERROR"
conversationId: number
error: string
}
| {
type: "COMPLETE_TURN"
conversationId: number
}
| {
type: "APPEND_OPTIMISTIC_TURN"
conversationId: number
@@ -63,12 +88,6 @@ type Action =
conversationId: number
liveMessage: LiveMessage | null
}
| {
type: "ACK_PERSISTED_DETAIL"
conversationId: number
detail: DbConversationDetail
turnToken?: string | null
}
| {
type: "SET_EXTERNAL_ID"
conversationId: number
@@ -84,6 +103,11 @@ type Action =
fromConversationId: number
toConversationId: number
}
| {
type: "SET_PENDING_CLEANUP"
conversationId: number
pendingCleanup: boolean
}
| { type: "REMOVE_CONVERSATION"; conversationId: number }
| { type: "RESET" }
@@ -93,15 +117,15 @@ function createEmptySession(
return {
conversationId,
externalId: null,
persistedTurns: [],
detail: null,
detailLoading: false,
detailError: null,
localTurns: [],
optimisticTurns: [],
liveMessage: null,
syncState: "idle",
activeTurnToken: null,
lastHydratedAt: null,
lastPersistedAt: null,
persistedUpdatedAt: null,
persistedMessageCount: 0,
pendingCleanup: false,
}
}
@@ -179,24 +203,6 @@ function buildStreamingTurnFromLiveMessage(
}
}
function shouldAcceptPersistedSnapshot(
current: ConversationRuntimeSession | undefined,
detail: DbConversationDetail
): boolean {
if (!current) return true
const nextUpdatedAt = detail.summary.updated_at ?? null
const nextMessageCount = detail.summary.message_count
const nextTurnCount = detail.turns.length
if (nextMessageCount < current.persistedMessageCount) return false
if (nextTurnCount < current.persistedTurns.length) return false
if (!current.persistedUpdatedAt || !nextUpdatedAt) return true
if (nextUpdatedAt < current.persistedUpdatedAt) return false
return true
}
function upsertExternalIdIndex(
index: Map<string, number>,
previousExternalId: string | null,
@@ -213,74 +219,18 @@ function upsertExternalIdIndex(
return next
}
function reduceHydrateDetail(
function updateSessionInState(
state: ConversationRuntimeState,
conversationId: number,
detail: DbConversationDetail
updater: (current: ConversationRuntimeSession) => ConversationRuntimeSession
): ConversationRuntimeState {
const current = state.byConversationId.get(conversationId)
const nextExternalId = detail.summary.external_id ?? null
const acceptSnapshot = shouldAcceptPersistedSnapshot(current, detail)
const prevPersistedTurnCount = current?.persistedTurns.length ?? 0
const prevPersistedMessageCount = current?.persistedMessageCount ?? 0
const optimisticTurns = current?.optimisticTurns ?? []
const persistedTurns = acceptSnapshot
? detail.turns
: (current?.persistedTurns ?? [])
const nextPersistedUpdatedAt = acceptSnapshot
? (detail.summary.updated_at ?? null)
: (current?.persistedUpdatedAt ?? null)
const nextPersistedMessageCount = acceptSnapshot
? detail.summary.message_count
: (current?.persistedMessageCount ?? 0)
const shouldDropOptimistic =
optimisticTurns.length > 0 &&
persistedTurns.length >= (current?.persistedTurns.length ?? 0) + 1
// Content advance: actual turns or messages grew — safe to clear
// liveMessage because persisted data now covers the streamed content.
const hasContentAdvance =
acceptSnapshot &&
(detail.turns.length > prevPersistedTurnCount ||
detail.summary.message_count > prevPersistedMessageCount)
// Note: updated_at changes (e.g. status update bumping the timestamp)
// are NOT treated as content advance. Only actual turns / message_count
// growth should clear liveMessage, because a metadata-only bump could
// arrive before the session file is flushed to disk.
const nextSession: ConversationRuntimeSession = {
...(current ?? createEmptySession(conversationId)),
externalId: nextExternalId,
persistedTurns,
liveMessage:
hasContentAdvance && current?.syncState !== "awaiting_persist"
? null
: (current?.liveMessage ?? null),
optimisticTurns: shouldDropOptimistic ? [] : optimisticTurns,
syncState: shouldDropOptimistic ? "idle" : (current?.syncState ?? "idle"),
activeTurnToken: shouldDropOptimistic
? null
: (current?.activeTurnToken ?? null),
lastHydratedAt: Date.now(),
lastPersistedAt: acceptSnapshot
? Date.now()
: (current?.lastPersistedAt ?? null),
persistedUpdatedAt: nextPersistedUpdatedAt,
persistedMessageCount: nextPersistedMessageCount,
}
const current =
state.byConversationId.get(conversationId) ??
createEmptySession(conversationId)
const nextSession = updater(current)
const nextByConversationId = new Map(state.byConversationId)
nextByConversationId.set(conversationId, nextSession)
const nextExternalIndex = upsertExternalIdIndex(
state.conversationIdByExternalId,
current?.externalId ?? null,
nextExternalId,
conversationId
)
return {
byConversationId: nextByConversationId,
conversationIdByExternalId: nextExternalIndex,
}
return { ...state, byConversationId: nextByConversationId }
}
function reducer(
@@ -288,73 +238,125 @@ function reducer(
action: Action
): ConversationRuntimeState {
switch (action.type) {
case "HYDRATE_FROM_DETAIL":
return reduceHydrateDetail(state, action.detail.summary.id, action.detail)
case "FETCH_DETAIL_START":
return updateSessionInState(state, action.conversationId, (current) => ({
...current,
detailLoading: true,
detailError: null,
}))
case "APPEND_OPTIMISTIC_TURN": {
case "FETCH_DETAIL_SUCCESS": {
const current =
state.byConversationId.get(action.conversationId) ??
createEmptySession(action.conversationId)
const nextExternalId = action.detail.summary.external_id ?? null
// DB data is authoritative for completed turns — always clear localTurns.
// Only preserve optimisticTurns + liveMessage if user actively sent
// a message and is awaiting agent response.
const isActivelyInteracting =
current.syncState === "awaiting_persist"
const nextSession: ConversationRuntimeSession = {
...current,
detail: action.detail,
detailLoading: false,
detailError: null,
externalId: nextExternalId ?? current.externalId,
localTurns: [],
...(isActivelyInteracting
? {}
: { optimisticTurns: [], liveMessage: null }),
}
const nextByConversationId = new Map(state.byConversationId)
nextByConversationId.set(action.conversationId, nextSession)
const nextExternalIndex = upsertExternalIdIndex(
state.conversationIdByExternalId,
current.externalId,
nextExternalId ?? current.externalId,
action.conversationId
)
return {
byConversationId: nextByConversationId,
conversationIdByExternalId: nextExternalIndex,
}
}
case "FETCH_DETAIL_ERROR":
return updateSessionInState(state, action.conversationId, (current) => ({
...current,
detailLoading: false,
detailError: action.error,
}))
case "COMPLETE_TURN": {
const current = state.byConversationId.get(action.conversationId)
if (!current) return state
// Convert liveMessage to a completed MessageTurn
const streamingTurn = current.liveMessage
? buildStreamingTurnFromLiveMessage(
current.conversationId,
current.liveMessage
)
: null
// Promote: optimisticTurns + streamingTurn → localTurns
const promoted = [...current.localTurns, ...current.optimisticTurns]
if (streamingTurn) promoted.push(streamingTurn)
return updateSessionInState(state, action.conversationId, () => ({
...current,
localTurns: promoted,
optimisticTurns: [],
liveMessage: null,
syncState: "idle",
activeTurnToken: null,
}))
}
case "APPEND_OPTIMISTIC_TURN":
return updateSessionInState(state, action.conversationId, (current) => ({
...current,
optimisticTurns: [...current.optimisticTurns, action.turn],
syncState: "awaiting_persist",
activeTurnToken: action.turnToken,
}
const nextByConversationId = new Map(state.byConversationId)
nextByConversationId.set(action.conversationId, nextSession)
return { ...state, byConversationId: nextByConversationId }
}
}))
case "SET_LIVE_MESSAGE": {
const current =
state.byConversationId.get(action.conversationId) ??
createEmptySession(action.conversationId)
const current = state.byConversationId.get(action.conversationId)
// Avoid creating a ghost session when clearing liveMessage on a deleted session
if (!current && action.liveMessage === null) return state
const session = current ?? createEmptySession(action.conversationId)
// Guard: prevent stale liveMessage from ACP reconnects overriding
// persisted data. When a session has no active liveMessage and no
// pending interaction (idle or reconciling without a live turn),
// a SET_LIVE_MESSAGE from a reconnected ACP connection carries
// the completed response that is already in persistedTurns.
// pending interaction (idle without a live turn), a SET_LIVE_MESSAGE
// from a reconnected ACP connection carries the completed response
// that is already in localTurns/detail.turns.
// Accepting it would cause duplicate assistant text in the timeline.
// Also block during cold loading (detailLoading) — the reconnect
// liveMessage arrives before DB data, causing overlap after fetch.
const hasExistingTurns =
(session.detail?.turns.length ?? 0) > 0 ||
session.localTurns.length > 0
if (
action.liveMessage !== null &&
current.liveMessage === null &&
current.syncState !== "awaiting_persist" &&
current.persistedTurns.length > 0
session.liveMessage === null &&
session.syncState !== "awaiting_persist" &&
(hasExistingTurns || session.detailLoading)
) {
return state
}
const nextSession: ConversationRuntimeSession = {
...current,
liveMessage: action.liveMessage,
}
const nextByConversationId = new Map(state.byConversationId)
nextByConversationId.set(action.conversationId, nextSession)
return { ...state, byConversationId: nextByConversationId }
}
case "ACK_PERSISTED_DETAIL": {
const nextState = reduceHydrateDetail(
state,
action.conversationId,
action.detail
)
const session = nextState.byConversationId.get(action.conversationId)
if (!session) return nextState
const nextSession: ConversationRuntimeSession = {
return updateSessionInState(state, action.conversationId, () => ({
...session,
syncState: "idle",
activeTurnToken:
action.turnToken != null &&
action.turnToken === session.activeTurnToken
? null
: session.activeTurnToken,
}
const nextByConversationId = new Map(nextState.byConversationId)
nextByConversationId.set(action.conversationId, nextSession)
return { ...nextState, byConversationId: nextByConversationId }
liveMessage: action.liveMessage,
}))
}
case "SET_EXTERNAL_ID": {
@@ -379,18 +381,11 @@ function reducer(
}
}
case "SET_SYNC_STATE": {
const current =
state.byConversationId.get(action.conversationId) ??
createEmptySession(action.conversationId)
const nextSession: ConversationRuntimeSession = {
case "SET_SYNC_STATE":
return updateSessionInState(state, action.conversationId, (current) => ({
...current,
syncState: action.syncState,
}
const nextByConversationId = new Map(state.byConversationId)
nextByConversationId.set(action.conversationId, nextSession)
return { ...state, byConversationId: nextByConversationId }
}
}))
case "MIGRATE_CONVERSATION": {
if (action.fromConversationId === action.toConversationId) return state
@@ -400,33 +395,20 @@ function reducer(
state.byConversationId.get(action.toConversationId) ??
createEmptySession(action.toConversationId)
const preferFromSnapshot =
from.persistedTurns.length >= to.persistedTurns.length
const mergedLiveMessage = to.liveMessage ?? from.liveMessage
const merged: ConversationRuntimeSession = {
...to,
...from,
conversationId: action.toConversationId,
persistedTurns: preferFromSnapshot
? from.persistedTurns
: to.persistedTurns,
detail: to.detail ?? from.detail,
detailLoading: to.detailLoading || from.detailLoading,
detailError: to.detailError ?? from.detailError,
localTurns: [...from.localTurns, ...to.localTurns],
optimisticTurns: [...from.optimisticTurns, ...to.optimisticTurns],
liveMessage: mergedLiveMessage,
syncState: to.syncState !== "idle" ? to.syncState : from.syncState,
activeTurnToken: to.activeTurnToken ?? from.activeTurnToken,
lastHydratedAt:
Math.max(from.lastHydratedAt ?? 0, to.lastHydratedAt ?? 0) || null,
lastPersistedAt:
Math.max(from.lastPersistedAt ?? 0, to.lastPersistedAt ?? 0) || null,
persistedUpdatedAt:
(to.persistedUpdatedAt ?? "") > (from.persistedUpdatedAt ?? "")
? to.persistedUpdatedAt
: from.persistedUpdatedAt,
persistedMessageCount: Math.max(
from.persistedMessageCount,
to.persistedMessageCount
),
}
const nextByConversationId = new Map(state.byConversationId)
@@ -449,6 +431,12 @@ function reducer(
}
}
case "SET_PENDING_CLEANUP":
return updateSessionInState(state, action.conversationId, (current) => ({
...current,
pendingCleanup: action.pendingCleanup,
}))
case "REMOVE_CONVERSATION": {
const current = state.byConversationId.get(action.conversationId)
if (!current) return state
@@ -473,7 +461,9 @@ interface ConversationRuntimeContextValue {
getSession: (conversationId: number) => ConversationRuntimeSession | null
getConversationIdByExternalId: (externalId: string) => number | null
getTimelineTurns: (conversationId: number) => ConversationTimelineTurn[]
hydrateFromDetail: (detail: DbConversationDetail) => void
fetchDetail: (conversationId: number) => void
refetchDetail: (conversationId: number) => void
completeTurn: (conversationId: number) => void
appendOptimisticTurn: (
conversationId: number,
turn: MessageTurn,
@@ -483,11 +473,6 @@ interface ConversationRuntimeContextValue {
conversationId: number,
liveMessage: LiveMessage | null
) => void
acknowledgePersistedDetail: (
conversationId: number,
detail: DbConversationDetail,
turnToken?: string | null
) => void
setExternalId: (conversationId: number, externalId: string | null) => void
setSyncState: (
conversationId: number,
@@ -497,6 +482,7 @@ interface ConversationRuntimeContextValue {
fromConversationId: number,
toConversationId: number
) => void
setPendingCleanup: (conversationId: number, pendingCleanup: boolean) => void
removeConversation: (conversationId: number) => void
reset: () => void
}
@@ -511,6 +497,9 @@ export function ConversationRuntimeProvider({
}) {
const [state, dispatch] = useReducer(reducer, initialState)
const stateRef = useRef(state)
stateRef.current = state
const getSession = useCallback(
(conversationId: number) =>
state.byConversationId.get(conversationId) ?? null,
@@ -528,43 +517,98 @@ export function ConversationRuntimeProvider({
const session = state.byConversationId.get(conversationId)
if (!session) return []
const persisted: ConversationTimelineTurn[] = session.persistedTurns.map(
// Phase 1: DB historical turns
const persisted: ConversationTimelineTurn[] = (
session.detail?.turns ?? []
).map((turn, index) => ({
key: `persisted-${conversationId}-${turn.id}-${index}`,
turn,
phase: "persisted",
}))
// Phase 2: Locally completed turns (promoted optimistic + completed streaming)
const local: ConversationTimelineTurn[] = session.localTurns.map(
(turn, index) => ({
key: `persisted-${conversationId}-${turn.id}-${index}`,
key: `local-${conversationId}-${turn.id}-${index}`,
turn,
phase: "persisted",
})
)
// Phase 3: Optimistic turns (pending user messages)
const optimistic: ConversationTimelineTurn[] =
session.optimisticTurns.map((turn, index) => ({
key: `optimistic-${conversationId}-${turn.id}-${index}`,
turn,
phase: "optimistic",
}))
// Phase 4: Streaming turn (live agent response)
const streamingMessage = session.liveMessage
const streamingTurn = streamingMessage
? buildStreamingTurnFromLiveMessage(conversationId, streamingMessage)
: null
if (!streamingTurn) {
return [...persisted, ...optimistic]
}
const result = [...persisted, ...local, ...optimistic]
return [
...persisted,
...optimistic,
{
if (streamingTurn) {
result.push({
key: `streaming-${conversationId}-${streamingMessage?.id ?? "unknown"}`,
turn: streamingTurn,
phase: "streaming",
},
]
})
}
return result
},
[state.byConversationId]
)
const hydrateFromDetail = useCallback((detail: DbConversationDetail) => {
dispatch({ type: "HYDRATE_FROM_DETAIL", detail })
const fetchDetail = useCallback((conversationId: number) => {
const session = stateRef.current.byConversationId.get(conversationId)
if (session?.detail || session?.detailLoading) return
// Skip fetch if session has active data (ongoing conversation)
if (
session &&
(session.optimisticTurns.length > 0 ||
session.liveMessage !== null ||
session.localTurns.length > 0)
) {
return
}
dispatch({ type: "FETCH_DETAIL_START", conversationId })
getFolderConversation(conversationId)
.then((detail) => {
dispatch({ type: "FETCH_DETAIL_SUCCESS", conversationId, detail })
})
.catch((error: unknown) => {
dispatch({
type: "FETCH_DETAIL_ERROR",
conversationId,
error: error instanceof Error ? error.message : String(error),
})
})
}, [])
const refetchDetail = useCallback((conversationId: number) => {
dispatch({ type: "FETCH_DETAIL_START", conversationId })
getFolderConversation(conversationId)
.then((detail) => {
dispatch({ type: "FETCH_DETAIL_SUCCESS", conversationId, detail })
})
.catch((error: unknown) => {
dispatch({
type: "FETCH_DETAIL_ERROR",
conversationId,
error: error instanceof Error ? error.message : String(error),
})
})
}, [])
const completeTurn = useCallback((conversationId: number) => {
dispatch({ type: "COMPLETE_TURN", conversationId })
}, [])
const appendOptimisticTurn = useCallback(
@@ -586,22 +630,6 @@ export function ConversationRuntimeProvider({
[]
)
const acknowledgePersistedDetail = useCallback(
(
conversationId: number,
detail: DbConversationDetail,
turnToken?: string | null
) => {
dispatch({
type: "ACK_PERSISTED_DETAIL",
conversationId,
detail,
turnToken,
})
},
[]
)
const setExternalId = useCallback(
(conversationId: number, externalId: string | null) => {
dispatch({ type: "SET_EXTERNAL_ID", conversationId, externalId })
@@ -627,6 +655,13 @@ export function ConversationRuntimeProvider({
[]
)
const setPendingCleanup = useCallback(
(conversationId: number, pendingCleanup: boolean) => {
dispatch({ type: "SET_PENDING_CLEANUP", conversationId, pendingCleanup })
},
[]
)
const removeConversation = useCallback((conversationId: number) => {
dispatch({ type: "REMOVE_CONVERSATION", conversationId })
}, [])
@@ -640,13 +675,15 @@ export function ConversationRuntimeProvider({
getSession,
getConversationIdByExternalId,
getTimelineTurns,
hydrateFromDetail,
fetchDetail,
refetchDetail,
completeTurn,
appendOptimisticTurn,
setLiveMessage,
acknowledgePersistedDetail,
setExternalId,
setSyncState,
migrateConversation,
setPendingCleanup,
removeConversation,
reset,
}),
@@ -654,13 +691,15 @@ export function ConversationRuntimeProvider({
getSession,
getConversationIdByExternalId,
getTimelineTurns,
hydrateFromDetail,
fetchDetail,
refetchDetail,
completeTurn,
appendOptimisticTurn,
setLiveMessage,
acknowledgePersistedDetail,
setExternalId,
setSyncState,
migrateConversation,
setPendingCleanup,
removeConversation,
reset,
]