fix(parser): harden Agent subagent state machine, file matching, query performance and streaming child grouping

- Codex: decouple active_agent_count decrement from close_agent target
  parsing and reset counter on turn_context to prevent main assistant
  messages from being swallowed when close_agent events are malformed
- Codex: use exact filename match with separator-aware fallback and
  sorted candidates for deterministic subagent session file resolution
- Codex/OpenCode: truncate subagent tool call previews to 500 chars
- OpenCode: batch-load all subagent tool calls in a single SQL query
  instead of per-task N+1 queries to avoid slow detail page loads
- Streaming: restrict positional child grouping fallback to in-progress
  agents only, preventing top-level tool calls from being incorrectly
  folded into completed Agent cards
- Tests: update Claude context window assertions to match 1M default
This commit is contained in:
xintaofei
2026-04-17 09:38:52 +08:00
parent 73a910bb62
commit 3e30ab7d60
4 changed files with 143 additions and 66 deletions

View File

@@ -1369,7 +1369,7 @@ mod tests {
fn defaults_context_limit_for_claude_models() {
assert_eq!(
claude_context_window_max_tokens_for_model(Some("claude-sonnet-4-6")),
Some(200_000)
Some(1_000_000)
);
assert_eq!(
claude_context_window_max_tokens_for_model(Some("custom-model-x")),
@@ -1472,11 +1472,11 @@ mod tests {
let stats = detail.session_stats.expect("session stats");
assert_eq!(stats.context_window_used_tokens, Some(1700));
assert_eq!(stats.context_window_max_tokens, Some(200_000));
assert_eq!(stats.context_window_max_tokens, Some(1_000_000));
let percent = stats
.context_window_usage_percent
.expect("context window usage percent");
assert!((percent - 0.85).abs() < f64::EPSILON);
assert!((percent - 0.17).abs() < 0.01);
}
#[test]
@@ -1586,7 +1586,7 @@ mod tests {
// Stats should reflect only the real assistant usage
let stats = detail.session_stats.expect("session stats");
assert_eq!(stats.context_window_used_tokens, Some(1700));
assert_eq!(stats.context_window_max_tokens, Some(200_000));
assert_eq!(stats.context_window_max_tokens, Some(1_000_000));
let total = stats.total_tokens.expect("total tokens");
assert_eq!(total, 1900); // 1000 + 200 + 300 + 400
}

View File

@@ -482,20 +482,37 @@ fn parse_codex_subagent_stats(
return None;
}
let session_file = fs::read_dir(session_dir).ok()?.find_map(|entry| {
let path = entry.ok()?.path();
if path.extension().and_then(|e| e.to_str()) == Some("jsonl")
&& path
.file_name()
.unwrap_or_default()
.to_string_lossy()
.contains(agent_id)
{
Some(path)
} else {
None
}
})?;
// Try exact filename first (e.g., "agent-{agent_id}.jsonl"), then fall
// back to files whose stem ends with the agent_id. Collect and sort
// candidates to ensure deterministic selection across platforms.
let exact_path = session_dir.join(format!("agent-{}.jsonl", agent_id));
let session_file = if exact_path.is_file() {
exact_path
} else {
let mut candidates: Vec<_> = fs::read_dir(session_dir)
.ok()?
.filter_map(|entry| {
let path = entry.ok()?.path();
if path.extension().and_then(|e| e.to_str()) != Some("jsonl") {
return None;
}
let stem = path.file_stem()?.to_string_lossy().into_owned();
// Match only if the stem ends with the agent_id after a separator
// (e.g., "session-abc123" matches agent_id "abc123")
if stem == agent_id
|| stem
.strip_suffix(agent_id)
.is_some_and(|prefix| prefix.ends_with('-') || prefix.ends_with('_'))
{
Some(path)
} else {
None
}
})
.collect();
candidates.sort();
candidates.into_iter().next()?
};
let file = fs::File::open(&session_file).ok()?;
let reader = BufReader::new(file);
@@ -569,7 +586,7 @@ fn parse_codex_subagent_stats(
let tc = AgentToolCall {
tool_name,
input_preview,
input_preview: input_preview.map(|s| truncate_str(&s, 500)),
output_preview: None,
is_error: false,
};
@@ -592,9 +609,9 @@ fn parse_codex_subagent_stats(
let raw_output = value_to_preview(output_value);
if tc.tool_name == "exec_command" {
tc.output_preview =
raw_output.map(|s| clean_codex_exec_output(&s));
raw_output.map(|s| truncate_str(&clean_codex_exec_output(&s), 500));
} else {
tc.output_preview = raw_output;
tc.output_preview = raw_output.map(|s| truncate_str(&s, 500));
}
tc.is_error = infer_tool_call_output_is_error(
payload,
@@ -669,6 +686,7 @@ impl CodexParser {
let mut agent_id_to_spawn_call_id: HashMap<String, String> = HashMap::new();
let mut agent_final_results: HashMap<String, String> = HashMap::new();
let mut wait_agent_call_ids: HashSet<String> = HashSet::new();
let mut close_agent_call_ids: HashSet<String> = HashSet::new();
let mut close_agent_targets: HashMap<String, String> = HashMap::new();
let mut active_agent_count: u32 = 0;
let mut call_id_tool_names: HashMap<String, String> = HashMap::new();
@@ -713,6 +731,8 @@ impl CodexParser {
}
}
"turn_context" => {
// A new API turn means any prior agent lifecycle is complete.
active_agent_count = 0;
if model.is_none() {
model = value
.get("payload")
@@ -972,6 +992,7 @@ impl CodexParser {
}
"close_agent" => {
if let Some(ref id) = tool_use_id {
close_agent_call_ids.insert(id.clone());
let target = parse_codex_json_arg(payload)
.and_then(|a| {
a.get("target")
@@ -1045,7 +1066,7 @@ impl CodexParser {
.is_some_and(|id| wait_agent_call_ids.contains(id));
let is_close = tool_use_id
.as_ref()
.is_some_and(|id| close_agent_targets.contains_key(id));
.is_some_and(|id| close_agent_call_ids.contains(id));
if is_spawn {
if let Some(output_obj) = parse_codex_json_output(payload) {

View File

@@ -1,3 +1,4 @@
use std::collections::HashMap;
use std::future::Future;
use std::path::PathBuf;
use std::time::Duration;
@@ -9,7 +10,7 @@ use sea_orm::{
};
use crate::models::*;
use crate::parsers::{folder_name_from_path, AgentParser, ParseError};
use crate::parsers::{folder_name_from_path, truncate_str, AgentParser, ParseError};
pub struct OpenCodeParser {
base_dir: PathBuf,
@@ -225,6 +226,13 @@ impl OpenCodeParser {
))
.await?;
// Pre-scan: collect all subagent session IDs from task tool parts so we
// can batch-load their tool calls in a single query instead of N queries.
let subagent_session_ids = self
.scan_subagent_session_ids(conn, conversation_id)
.await;
let subagent_tools = batch_load_subagent_tool_calls(conn, &subagent_session_ids).await;
let mut messages = Vec::with_capacity(rows.len());
for row in rows {
@@ -263,7 +271,7 @@ impl OpenCodeParser {
};
let (content_blocks, usage_from_step_finish) =
self.load_sqlite_parts(conn, &msg_id).await?;
self.load_sqlite_parts(conn, &msg_id, &subagent_tools).await?;
let usage = if is_assistant {
extract_opencode_usage(&value).or(usage_from_step_finish)
@@ -298,10 +306,43 @@ impl OpenCodeParser {
Ok(messages)
}
/// Scan all tool parts in this conversation to extract subagent session IDs.
async fn scan_subagent_session_ids(
&self,
conn: &DatabaseConnection,
conversation_id: &str,
) -> Vec<String> {
let rows = match conn
.query_all(Statement::from_sql_and_values(
DbBackend::Sqlite,
r#"
SELECT DISTINCT json_extract(p.data, '$.state.metadata.sessionId') AS sid
FROM part p
INNER JOIN message m ON m.id = p.message_id
WHERE m.session_id = ?
AND json_extract(p.data, '$.type') = 'tool'
AND json_extract(p.data, '$.tool') = 'task'
AND json_extract(p.data, '$.state.input.subagent_type') IS NOT NULL
AND sid IS NOT NULL
"#,
[conversation_id.into()],
))
.await
{
Ok(r) => r,
Err(_) => return Vec::new(),
};
rows.iter()
.filter_map(|row| row.try_get::<String>("", "sid").ok())
.collect()
}
async fn load_sqlite_parts(
&self,
conn: &DatabaseConnection,
message_id: &str,
subagent_tools: &HashMap<String, Vec<AgentToolCall>>,
) -> Result<(Vec<ContentBlock>, Option<TurnUsage>), ParseError> {
let rows = conn
.query_all(Statement::from_sql_and_values(
@@ -435,12 +476,11 @@ impl OpenCodeParser {
_ => None,
};
// Load sub-agent tool calls from the sub-agent session
let tool_calls = if let Some(sid) = session_id {
load_subagent_tool_calls(conn, sid).await
} else {
Vec::new()
};
// Look up pre-fetched sub-agent tool calls
let tool_calls = session_id
.and_then(|sid| subagent_tools.get(sid))
.cloned()
.unwrap_or_default();
let tool_count = tool_calls.len() as u32;
let agent_stats = Some(AgentExecutionStats {
@@ -820,36 +860,47 @@ fn extract_task_result_content(raw: &str) -> String {
raw.to_string()
}
/// Load tool calls from a sub-agent's session in the OpenCode SQLite database.
/// Batch-load tool calls from multiple sub-agent sessions in a single query.
///
/// Queries all messages and their parts in the given session, extracts tool-type
/// parts, and returns a compact list of `AgentToolCall` records for display
/// inside the parent Agent card.
async fn load_subagent_tool_calls(
/// Returns a map from session_id to its list of `AgentToolCall` records.
/// This avoids N+1 queries when a conversation has many agent tasks.
async fn batch_load_subagent_tool_calls(
conn: &DatabaseConnection,
session_id: &str,
) -> Vec<AgentToolCall> {
session_ids: &[String],
) -> HashMap<String, Vec<AgentToolCall>> {
if session_ids.is_empty() {
return HashMap::new();
}
// Build parameterized IN clause
let placeholders: Vec<&str> = session_ids.iter().map(|_| "?").collect();
let sql = format!(
r#"
SELECT m.session_id, p.data
FROM part p
INNER JOIN message m ON m.id = p.message_id
WHERE m.session_id IN ({})
AND json_extract(p.data, '$.type') = 'tool'
ORDER BY m.session_id, p.time_created ASC, p.id ASC
"#,
placeholders.join(", ")
);
let values: Vec<sea_orm::Value> = session_ids.iter().map(|s| s.as_str().into()).collect();
let rows = match conn
.query_all(Statement::from_sql_and_values(
DbBackend::Sqlite,
r#"
SELECT p.data
FROM part p
INNER JOIN message m ON m.id = p.message_id
WHERE m.session_id = ?
AND json_extract(p.data, '$.type') = 'tool'
ORDER BY p.time_created ASC, p.id ASC
"#,
[session_id.into()],
))
.query_all(Statement::from_sql_and_values(DbBackend::Sqlite, &sql, values))
.await
{
Ok(r) => r,
Err(_) => return Vec::new(),
Err(_) => return HashMap::new(),
};
let mut tool_calls = Vec::new();
let mut result: HashMap<String, Vec<AgentToolCall>> = HashMap::new();
for row in rows {
let sid: String = match row.try_get("", "session_id") {
Ok(s) => s,
Err(_) => continue,
};
let data_raw: String = match row.try_get("", "data") {
Ok(d) => d,
Err(_) => continue,
@@ -879,17 +930,19 @@ async fn load_subagent_tool_calls(
let state = value.get("state");
let input_preview = state
.and_then(|s| s.get("input"))
.and_then(|v| value_to_preview(Some(v)));
.and_then(|v| value_to_preview(Some(v)))
.map(|s| truncate_str(&s, 500));
let output_preview = state
.and_then(|s| s.get("output"))
.and_then(|v| value_to_preview(Some(v)));
.and_then(|v| value_to_preview(Some(v)))
.map(|s| truncate_str(&s, 500));
let status = state
.and_then(|s| s.get("status"))
.and_then(|s| s.as_str())
.unwrap_or("");
let has_error_field = state.and_then(|s| s.get("error")).is_some();
tool_calls.push(AgentToolCall {
result.entry(sid).or_default().push(AgentToolCall {
tool_name,
input_preview,
output_preview,
@@ -897,7 +950,7 @@ async fn load_subagent_tool_calls(
});
}
tool_calls
result
}
#[cfg(test)]

View File

@@ -295,18 +295,20 @@ function buildStreamingTurnsFromLiveMessage(
}
}
// Second pass: assign children using parentToolUseId or position fallback
// Second pass: assign children using parentToolUseId or position fallback.
// Positional fallback only captures while the agent is still in-progress;
// once it completes/fails, subsequent tool calls are treated as top-level.
let positionalAgentId: string | null = null
let positionalAgentCompleted = false
for (const block of liveMessage.content) {
if (block.type === "tool_call") {
const toolName = getToolName(block.info)
if (toolName === "agent") {
positionalAgentId = block.info.tool_call_id
positionalAgentCompleted =
const isFinal =
block.info.status === "completed" || block.info.status === "failed"
// Only capture children while the agent is still running
positionalAgentId = isFinal ? null : block.info.tool_call_id
} else {
// Extract parentToolUseId from ACP meta (Claude Code embeds this
// under meta.claudeCode.parentToolUseId). Guard each access level
@@ -321,21 +323,22 @@ function buildStreamingTurnsFromLiveMessage(
}
}
// Use explicit parentToolUseId when available, positional fallback
// only for in-progress agents
const resolvedParent =
parentId && agentIds.has(parentId) ? parentId : positionalAgentId // fallback
parentId && agentIds.has(parentId) ? parentId : positionalAgentId
if (resolvedParent) {
childToolCallIds.add(block.info.tool_call_id)
agentChildren
.get(resolvedParent)!
.push({ info: block.info, toolName })
.get(resolvedParent)
?.push({ info: block.info, toolName })
}
}
} else if (positionalAgentId && positionalAgentCompleted) {
// A text/thinking/plan block after a completed agent means the main
// agent is producing new content — stop position-based capture.
} else if (positionalAgentId) {
// A non-tool block (text/thinking/plan) means the main agent is
// producing new content — stop position-based capture.
positionalAgentId = null
positionalAgentCompleted = false
}
}