优化OpenClaw会话解析

This commit is contained in:
xintaofei
2026-03-17 15:35:34 +08:00
parent 2240f3054d
commit f517f4fcbb
4 changed files with 739 additions and 287 deletions

View File

@@ -156,6 +156,15 @@ async fn build_agent(
parts.push("--session".into()); parts.push("--session".into());
parts.push(key.clone()); parts.push(key.clone());
} }
// When creating a new conversation (no session_id to resume),
// pass --reset-session so OpenClaw mints a fresh transcript
// instead of appending to the previous one.
if runtime_env
.get("OPENCLAW_RESET_SESSION")
.map_or(false, |v| v == "1")
{
parts.push("--reset-session".into());
}
} }
let refs: Vec<&str> = parts.iter().map(|s| s.as_str()).collect(); let refs: Vec<&str> = parts.iter().map(|s| s.as_str()).collect();
AcpAgent::from_args(&refs).map_err(|e| AcpError::SpawnFailed(e.to_string())) AcpAgent::from_args(&refs).map_err(|e| AcpError::SpawnFailed(e.to_string()))

View File

@@ -1306,9 +1306,15 @@ pub async fn acp_connect(
))); )));
} }
let local_config_json = load_agent_local_config_json(agent_type); let local_config_json = load_agent_local_config_json(agent_type);
let runtime_env = let mut runtime_env =
build_runtime_env_from_setting(agent_type, setting.as_ref(), local_config_json.as_deref()); build_runtime_env_from_setting(agent_type, setting.as_ref(), local_config_json.as_deref());
// For OpenClaw: when creating a new conversation (no session_id to resume),
// signal that we want a fresh transcript via --reset-session.
if agent_type == AgentType::OpenClaw && session_id.is_none() {
runtime_env.insert("OPENCLAW_RESET_SESSION".into(), "1".into());
}
if let registry::AgentDistribution::Npx { package, .. } = meta.distribution { if let registry::AgentDistribution::Npx { package, .. } = meta.distribution {
if detect_npx_cached_version(package).await.is_none() { if detect_npx_cached_version(package).await.is_none() {
prepare_npx_package(package).await?; prepare_npx_package(package).await?;

View File

@@ -261,9 +261,18 @@ pub async fn get_folder_conversation(
.await .await
.map_err(AppCommandError::from)?; .map_err(AppCommandError::from)?;
let (turns, session_stats) = if let Some(ref ext_id) = summary.external_id { let (turns, session_stats, resolved_ext_id) = if let Some(ref ext_id) = summary.external_id
{
let at = summary.agent_type; let at = summary.agent_type;
let eid = ext_id.clone(); let eid = ext_id.clone();
let db_created_at = summary.created_at;
let folder_path_for_fallback = {
let folder = folder_service::get_folder_by_id(&db.conn, summary.folder_id)
.await
.ok()
.flatten();
folder.map(|f| f.path)
};
tokio::task::spawn_blocking(move || -> Result<_, AppCommandError> { tokio::task::spawn_blocking(move || -> Result<_, AppCommandError> {
let parser: Box<dyn AgentParser> = match at { let parser: Box<dyn AgentParser> = match at {
AgentType::ClaudeCode => Box::new(ClaudeParser::new()), AgentType::ClaudeCode => Box::new(ClaudeParser::new()),
@@ -271,13 +280,47 @@ pub async fn get_folder_conversation(
AgentType::OpenCode => Box::new(OpenCodeParser::new()), AgentType::OpenCode => Box::new(OpenCodeParser::new()),
AgentType::Gemini => Box::new(GeminiParser::new()), AgentType::Gemini => Box::new(GeminiParser::new()),
AgentType::OpenClaw => Box::new(OpenClawParser::new()), AgentType::OpenClaw => Box::new(OpenClawParser::new()),
_ => return Ok((vec![], None)), _ => return Ok((vec![], None, None)),
}; };
// If the external session file doesn't exist yet (e.g., new ACP session
// not yet synced to disk), return empty turns instead of failing.
match parser.get_conversation(&eid) { match parser.get_conversation(&eid) {
Ok(d) => Ok((d.turns, d.session_stats)), Ok(d) => Ok((d.turns, d.session_stats, None)),
Err(crate::parsers::ParseError::ConversationNotFound(_)) => Ok((vec![], None)), Err(crate::parsers::ParseError::ConversationNotFound(_)) => {
// For OpenClaw, the external_id may be an ACP session UUID that
// doesn't correspond to any JSONL file. Fall back to matching
// by title and folder_path from the parsed conversation list.
if at == AgentType::OpenClaw {
if let Ok(all) = parser.list_conversations() {
// Filter by folder_path first, then find the closest
// started_at match within 300 seconds of db_created_at.
let matched = all
.into_iter()
.filter(|c| {
c.folder_path
.as_ref()
.zip(folder_path_for_fallback.as_ref())
.is_some_and(|(a, b)| path_eq_for_matching(a, b))
})
.min_by_key(|c| {
(c.started_at - db_created_at).num_seconds().unsigned_abs()
})
.filter(|c| {
let diff = (c.started_at - db_created_at).num_seconds().unsigned_abs();
diff < 300
});
if let Some(conv) = matched {
let new_ext_id = conv.id.clone();
if let Ok(d) = parser.get_conversation(&new_ext_id) {
return Ok((
d.turns,
d.session_stats,
Some(new_ext_id),
));
}
}
}
}
Ok((vec![], None, None))
}
Err(e) => Err(parse_error_to_app_error(e)), Err(e) => Err(parse_error_to_app_error(e)),
} }
}) })
@@ -289,9 +332,16 @@ pub async fn get_folder_conversation(
.with_detail(e.to_string()) .with_detail(e.to_string())
})?? })??
} else { } else {
(vec![], None) (vec![], None, None)
}; };
// If we resolved a different external_id (e.g. ACP UUID → parser branch ID),
// update the database so future lookups are direct.
if let Some(new_ext_id) = resolved_ext_id {
let _ =
conversation_service::update_external_id(&db.conn, conversation_id, new_ext_id).await;
}
let mut summary = summary; let mut summary = summary;
summary.message_count = turns.len() as u32; summary.message_count = turns.len() as u32;

View File

@@ -67,7 +67,7 @@ fn strip_openclaw_user_prefix(text: &str) -> String {
// ── sessions.json deserialization ────────────────────────────────────── // ── sessions.json deserialization ──────────────────────────────────────
#[derive(Deserialize)] #[derive(Deserialize, Clone)]
#[serde(rename_all = "camelCase")] #[serde(rename_all = "camelCase")]
struct SessionMeta { struct SessionMeta {
session_id: String, session_id: String,
@@ -87,11 +87,257 @@ struct SessionMeta {
origin: Option<SessionOrigin>, origin: Option<SessionOrigin>,
} }
#[derive(Deserialize)] #[derive(Deserialize, Clone)]
struct SessionOrigin { struct SessionOrigin {
label: Option<String>, label: Option<String>,
} }
// ── JSONL tree ────────────────────────────────────────────────────────
/// A parsed JSONL record, indexed by its id.
#[derive(Clone)]
struct JRecord {
id: String,
parent_id: Option<String>,
record_type: String,
value: serde_json::Value,
}
/// Parsed tree of JSONL records.
struct JTree {
records: HashMap<String, JRecord>,
/// id → list of child ids (in insertion order)
children: HashMap<String, Vec<String>>,
/// Records with parentId = null (roots)
#[allow(dead_code)]
roots: Vec<String>,
/// Session cwd from the "session" header
session_cwd: Option<String>,
/// True if no parent has more than one child (no forks → single linear chain)
is_linear: bool,
/// Record ids in file insertion order (used for fast path when is_linear)
insertion_order: Vec<String>,
}
impl JTree {
fn parse(path: &Path) -> Result<Self, ParseError> {
let file = fs::File::open(path)?;
let reader = BufReader::new(file);
let mut records = HashMap::new();
let mut children: HashMap<String, Vec<String>> = HashMap::new();
let mut roots = Vec::new();
let mut session_cwd = None;
// Maintain insertion order for roots
let mut insert_order: Vec<String> = Vec::new();
for line in reader.lines() {
let line = match line {
Ok(l) => l,
Err(_) => continue,
};
if line.trim().is_empty() {
continue;
}
let value: serde_json::Value = match serde_json::from_str(&line) {
Ok(v) => v,
Err(_) => continue,
};
let record_type = value
.get("type")
.and_then(|t| t.as_str())
.unwrap_or("")
.to_string();
let id = value
.get("id")
.and_then(|i| i.as_str())
.unwrap_or("")
.to_string();
let parent_id = value
.get("parentId")
.and_then(|p| p.as_str())
.map(|s| s.to_string());
if record_type == "session" {
if session_cwd.is_none() {
session_cwd = value
.get("cwd")
.and_then(|s| s.as_str())
.map(|s| s.to_string());
}
continue; // session records don't participate in the tree
}
if id.is_empty() {
continue;
}
let rec = JRecord {
id: id.clone(),
parent_id: parent_id.clone(),
record_type,
value,
};
match &parent_id {
Some(pid) => {
children.entry(pid.clone()).or_default().push(id.clone());
}
None => {
roots.push(id.clone());
}
}
insert_order.push(id.clone());
records.insert(id, rec);
}
let is_linear = children.values().all(|kids| kids.len() <= 1);
Ok(JTree {
records,
children,
roots,
session_cwd,
is_linear,
insertion_order: insert_order,
})
}
/// Walk up the parentId chain from `leaf_id` to root, returning the path
/// from root → leaf (reversed).
fn ancestor_chain(&self, leaf_id: &str) -> Vec<String> {
let mut chain = Vec::new();
let mut current = Some(leaf_id.to_string());
while let Some(id) = current {
if let Some(rec) = self.records.get(&id) {
chain.push(id);
current = rec.parent_id.clone();
} else {
break;
}
}
chain.reverse();
chain
}
/// Find all leaf nodes (nodes that have no children).
fn leaf_ids(&self) -> Vec<String> {
self.records
.keys()
.filter(|id| !self.children.contains_key(*id) || self.children[*id].is_empty())
.cloned()
.collect()
}
/// Find the first user message in a branch (root → leaf path).
/// This is the "fork point" user message that starts this conversation.
fn find_branch_first_user_message(&self, branch: &[String]) -> Option<usize> {
// Walk the branch and find where this branch diverges from others.
// The first user message at or after the fork point is the conversation start.
for (i, id) in branch.iter().enumerate() {
if let Some(rec) = self.records.get(id) {
if rec.record_type != "message" {
continue;
}
let role = rec
.value
.get("message")
.and_then(|m| m.get("role"))
.and_then(|r| r.as_str())
.unwrap_or("");
if role != "user" {
continue;
}
// Check if parent has multiple children (fork point)
if let Some(pid) = &rec.parent_id {
if let Some(siblings) = self.children.get(pid) {
if siblings.len() > 1 {
return Some(i);
}
}
}
}
}
// No fork point found → the first user message is the conversation start
for (i, id) in branch.iter().enumerate() {
if let Some(rec) = self.records.get(id) {
if rec.record_type == "message" {
let role = rec
.value
.get("message")
.and_then(|m| m.get("role"))
.and_then(|r| r.as_str())
.unwrap_or("");
if role == "user" {
return Some(i);
}
}
}
}
None
}
/// Extract distinct conversation branches.
/// Each branch is identified by its leaf node.
/// Returns: Vec<(leaf_id, branch_record_ids from fork_user_msg → leaf)>
fn conversation_branches(&self) -> Vec<(String, Vec<String>)> {
// Fast path: linear chain (no forks) → single branch using insertion order
if self.is_linear {
if let Some(leaf_id) = self.insertion_order.last() {
// Find the first user message in insertion order
let first_user_idx = self.insertion_order.iter().position(|id| {
self.records.get(id).is_some_and(|r| {
r.record_type == "message"
&& r.value
.get("message")
.and_then(|m| m.get("role"))
.and_then(|r| r.as_str())
== Some("user")
})
});
if let Some(idx) = first_user_idx {
let branch_ids = self.insertion_order[idx..].to_vec();
return vec![(leaf_id.clone(), branch_ids)];
}
}
return vec![];
}
let leaves = self.leaf_ids();
let mut branches = Vec::new();
for leaf_id in &leaves {
let chain = self.ancestor_chain(leaf_id);
if chain.is_empty() {
continue;
}
// Find the fork-point user message for this branch
if let Some(fork_idx) = self.find_branch_first_user_message(&chain) {
let branch_ids: Vec<String> = chain[fork_idx..].to_vec();
// Only include branches that have at least one user message
let has_user = branch_ids.iter().any(|id| {
self.records.get(id).is_some_and(|r| {
r.record_type == "message"
&& r.value
.get("message")
.and_then(|m| m.get("role"))
.and_then(|r| r.as_str())
== Some("user")
})
});
if has_user {
branches.push((leaf_id.clone(), branch_ids));
}
}
}
branches
}
}
// ── Parser ───────────────────────────────────────────────────────────── // ── Parser ─────────────────────────────────────────────────────────────
pub struct OpenClawParser { pub struct OpenClawParser {
@@ -120,56 +366,66 @@ impl OpenClawParser {
Ok(index) Ok(index)
} }
/// Parse a JSONL file to extract summary information. /// List all JSONL files for an agent, including `.jsonl.reset.*` archives.
fn parse_jsonl_summary( fn list_jsonl_files(agent_dir: &Path) -> Vec<(String, PathBuf)> {
agent_id: &str, let sessions_dir = agent_dir.join("sessions");
session_meta: &SessionMeta, if !sessions_dir.exists() {
jsonl_path: &PathBuf, return Vec::new();
) -> Result<Option<ConversationSummary>, ParseError> { }
let file = fs::File::open(jsonl_path)?; let mut files = Vec::new();
let reader = BufReader::new(file); if let Ok(entries) = fs::read_dir(&sessions_dir) {
for entry in entries.flatten() {
let path = entry.path();
let name = path
.file_name()
.unwrap_or_default()
.to_string_lossy()
.to_string();
// Match both <uuid>.jsonl and <uuid>.jsonl.reset.<timestamp>
if let Some(session_id) = extract_session_id_from_filename(&name) {
files.push((session_id, path));
}
}
}
files
}
let mut cwd: Option<String> = None; /// Build summaries for all conversation branches in a single JSONL file.
fn summaries_from_tree(
agent_id: &str,
session_id: &str,
tree: &JTree,
session_meta: Option<&SessionMeta>,
) -> Vec<ConversationSummary> {
let branches = tree.conversation_branches();
let mut summaries = Vec::new();
for (leaf_id, branch_ids) in &branches {
let mut cwd = tree.session_cwd.clone();
let mut title: Option<String> = None; let mut title: Option<String> = None;
let mut first_timestamp: Option<DateTime<Utc>> = None; let mut first_timestamp: Option<DateTime<Utc>> = None;
let mut last_timestamp: Option<DateTime<Utc>> = None; let mut last_timestamp: Option<DateTime<Utc>> = None;
let mut message_count: u32 = 0; let mut message_count: u32 = 0;
for line in reader.lines() { for id in branch_ids {
let line = match line { let rec = match tree.records.get(id) {
Ok(l) => l, Some(r) => r,
Err(_) => continue, None => continue,
};
if line.trim().is_empty() {
continue;
}
let value: serde_json::Value = match serde_json::from_str(&line) {
Ok(v) => v,
Err(_) => continue,
}; };
let record_type = value.get("type").and_then(|t| t.as_str()).unwrap_or(""); if let Some(ts) = parse_iso_timestamp(&rec.value) {
// Extract timestamp from any record
if let Some(ts) = parse_iso_timestamp(&value) {
if first_timestamp.is_none() { if first_timestamp.is_none() {
first_timestamp = Some(ts); first_timestamp = Some(ts);
} }
last_timestamp = Some(ts); last_timestamp = Some(ts);
} }
match record_type { if rec.record_type != "message" {
"session" => { continue;
if cwd.is_none() {
cwd = value
.get("cwd")
.and_then(|s| s.as_str())
.map(|s| s.to_string());
} }
}
"message" => { let role = rec
let role = value .value
.get("message") .get("message")
.and_then(|m| m.get("role")) .and_then(|m| m.get("role"))
.and_then(|r| r.as_str()) .and_then(|r| r.as_str())
@@ -178,9 +434,7 @@ impl OpenClawParser {
match role { match role {
"user" => { "user" => {
message_count += 1; message_count += 1;
if let Some(text) = extract_first_text_content(&value) { if let Some(text) = extract_first_text_content(&rec.value) {
// Extract working directory from user message
// (overrides session cwd with the latest project dir)
if let Some(wd) = extract_working_dir(&text) { if let Some(wd) = extract_working_dir(&text) {
cwd = Some(wd); cwd = Some(wd);
} }
@@ -198,34 +452,30 @@ impl OpenClawParser {
_ => {} _ => {}
} }
} }
_ => {}
}
}
let started_at = match first_timestamp { let started_at = match first_timestamp {
Some(ts) => ts, Some(ts) => ts,
None => return Ok(None), None => continue,
}; };
// Use updatedAt from sessions.json as ended_at if available // Use updatedAt from sessions.json if this is the latest branch
let ended_at = session_meta let ended_at = session_meta
.updated_at .and_then(|m| m.updated_at)
.and_then(|ms| Utc.timestamp_millis_opt(ms as i64).single()) .and_then(|ms| Utc.timestamp_millis_opt(ms as i64).single())
.or(last_timestamp); .or(last_timestamp);
// Use origin.label as title fallback
if title.is_none() { if title.is_none() {
title = session_meta title = session_meta
.origin .and_then(|m| m.origin.as_ref())
.as_ref()
.and_then(|o| o.label.clone()); .and_then(|o| o.label.clone());
} }
let conversation_id = format!("{}/{}", agent_id, session_meta.session_id); // conversation_id: agentId/sessionId/leafId
let conversation_id = format!("{}/{}/{}", agent_id, session_id, leaf_id);
let folder_path = cwd.clone(); let folder_path = cwd.clone();
let folder_name = folder_path.as_ref().map(|p| folder_name_from_path(p)); let folder_name = folder_path.as_ref().map(|p| folder_name_from_path(p));
Ok(Some(ConversationSummary { summaries.push(ConversationSummary {
id: conversation_id, id: conversation_id,
agent_type: AgentType::OpenClaw, agent_type: AgentType::OpenClaw,
folder_path, folder_path,
@@ -234,93 +484,116 @@ impl OpenClawParser {
started_at, started_at,
ended_at, ended_at,
message_count, message_count,
model: session_meta.model.clone(), model: session_meta.and_then(|m| m.model.clone()),
git_branch: None, git_branch: None,
})) });
} }
/// Parse a JSONL file to extract full conversation detail. summaries
}
/// Parse a JSONL file (tree-aware) to extract full conversation detail
/// for a specific branch identified by leaf_id.
fn parse_conversation_detail( fn parse_conversation_detail(
jsonl_path: &PathBuf, jsonl_path: &Path,
conversation_id: &str, conversation_id: &str,
leaf_id: Option<&str>,
session_meta: Option<&SessionMeta>, session_meta: Option<&SessionMeta>,
) -> Result<ConversationDetail, ParseError> { ) -> Result<ConversationDetail, ParseError> {
let file = fs::File::open(jsonl_path)?; let tree = JTree::parse(jsonl_path)?;
let reader = BufReader::new(file);
// Determine which branch to display
let branch_ids = if let Some(lid) = leaf_id {
// Specific branch: ancestor chain from leaf, starting from fork user msg
let chain = tree.ancestor_chain(lid);
if chain.is_empty() {
return Err(ParseError::ConversationNotFound(
conversation_id.to_string(),
));
}
match tree.find_branch_first_user_message(&chain) {
Some(idx) => chain[idx..].to_vec(),
None => chain,
}
} else {
// No leaf_id: use the full chain of the most recently updated leaf
// (backward compat for old conversation IDs without leaf component)
let branches = tree.conversation_branches();
if branches.is_empty() {
// Fallback: use all message records in order
tree.records
.values()
.filter(|r| r.record_type == "message")
.map(|r| r.id.clone())
.collect()
} else {
// Find branch with latest timestamp
let mut best: Option<(DateTime<Utc>, Vec<String>)> = None;
for (_, branch) in &branches {
let ts = branch
.iter()
.filter_map(|id| tree.records.get(id))
.filter_map(|r| parse_iso_timestamp(&r.value))
.next_back();
if let Some(ts) = ts {
if best.as_ref().is_none_or(|(t, _)| ts > *t) {
best = Some((ts, branch.clone()));
}
}
}
best.map(|(_, b)| b).unwrap_or_default()
}
};
let mut messages: Vec<UnifiedMessage> = Vec::new(); let mut messages: Vec<UnifiedMessage> = Vec::new();
let mut cwd: Option<String> = None; let mut cwd = tree.session_cwd.clone();
let mut model: Option<String> = None; let mut model: Option<String> = None;
let mut title: Option<String> = None; let mut title: Option<String> = None;
let mut first_timestamp: Option<DateTime<Utc>> = None; let mut first_timestamp: Option<DateTime<Utc>> = None;
let mut last_timestamp: Option<DateTime<Utc>> = None; let mut last_timestamp: Option<DateTime<Utc>> = None;
for line in reader.lines() { for id in &branch_ids {
let line = match line { let rec = match tree.records.get(id) {
Ok(l) => l, Some(r) => r,
Err(_) => continue, None => continue,
}; };
if line.trim().is_empty() {
if rec.record_type != "message" {
continue; continue;
} }
let value: serde_json::Value = match serde_json::from_str(&line) { if let Some(ts) = parse_iso_timestamp(&rec.value) {
Ok(v) => v,
Err(_) => continue,
};
let record_type = value.get("type").and_then(|t| t.as_str()).unwrap_or("");
if let Some(ts) = parse_iso_timestamp(&value) {
if first_timestamp.is_none() { if first_timestamp.is_none() {
first_timestamp = Some(ts); first_timestamp = Some(ts);
} }
last_timestamp = Some(ts); last_timestamp = Some(ts);
} }
match record_type { let role = rec
"session" => { .value
if cwd.is_none() {
cwd = value
.get("cwd")
.and_then(|s| s.as_str())
.map(|s| s.to_string());
}
}
"message" => {
let role = value
.get("message") .get("message")
.and_then(|m| m.get("role")) .and_then(|m| m.get("role"))
.and_then(|r| r.as_str()) .and_then(|r| r.as_str())
.unwrap_or(""); .unwrap_or("");
let timestamp = parse_iso_timestamp(&rec.value).unwrap_or_else(Utc::now);
let timestamp = parse_iso_timestamp(&value).unwrap_or_else(Utc::now); let msg_id = rec.id.clone();
let msg_id = value
.get("id")
.and_then(|i| i.as_str())
.unwrap_or("")
.to_string();
match role { match role {
"user" => { "user" => {
// Extract working directory from raw text before cleaning if let Some(raw_text) = extract_first_text_content(&rec.value) {
if let Some(raw_text) = extract_first_text_content(&value) {
if let Some(wd) = extract_working_dir(&raw_text) { if let Some(wd) = extract_working_dir(&raw_text) {
cwd = Some(wd); cwd = Some(wd);
} }
} }
let content = extract_user_content(&rec.value);
let content = extract_user_content(&value);
if content.is_empty() { if content.is_empty() {
continue; continue;
} }
if title.is_none() { if title.is_none() {
if let Some(ContentBlock::Text { ref text }) = content.first() { if let Some(ContentBlock::Text { ref text }) = content.first() {
title = Some(truncate_str(text, 100)); title = Some(truncate_str(text, 100));
} }
} }
messages.push(UnifiedMessage { messages.push(UnifiedMessage {
id: msg_id, id: msg_id,
role: MessageRole::User, role: MessageRole::User,
@@ -332,18 +605,17 @@ impl OpenClawParser {
}); });
} }
"assistant" => { "assistant" => {
let content = extract_assistant_content(&value); let content = extract_assistant_content(&rec.value);
let usage = extract_usage(&value); let usage = extract_usage(&rec.value);
let msg_model = value let msg_model = rec
.value
.get("message") .get("message")
.and_then(|m| m.get("model")) .and_then(|m| m.get("model"))
.and_then(|m| m.as_str()) .and_then(|m| m.as_str())
.map(|s| s.to_string()); .map(|s| s.to_string());
if model.is_none() { if model.is_none() {
model = msg_model.clone(); model = msg_model.clone();
} }
messages.push(UnifiedMessage { messages.push(UnifiedMessage {
id: msg_id, id: msg_id,
role: MessageRole::Assistant, role: MessageRole::Assistant,
@@ -355,7 +627,7 @@ impl OpenClawParser {
}); });
} }
"toolResult" => { "toolResult" => {
let content = extract_tool_result_content(&value); let content = extract_tool_result_content(&rec.value);
messages.push(UnifiedMessage { messages.push(UnifiedMessage {
id: msg_id, id: msg_id,
role: MessageRole::Tool, role: MessageRole::Tool,
@@ -369,12 +641,7 @@ impl OpenClawParser {
_ => {} _ => {}
} }
} }
// Skip thinking_level_change, custom, etc.
_ => {}
}
}
// Prefer model from sessions.json metadata
if let Some(meta) = session_meta { if let Some(meta) = session_meta {
if model.is_none() { if model.is_none() {
model = meta.model.clone(); model = meta.model.clone();
@@ -383,10 +650,8 @@ impl OpenClawParser {
let folder_path = cwd.clone(); let folder_path = cwd.clone();
let folder_name = folder_path.as_ref().map(|p| folder_name_from_path(p)); let folder_name = folder_path.as_ref().map(|p| folder_name_from_path(p));
let turns = group_into_turns(messages); let turns = group_into_turns(messages);
// Context window stats
let context_window_used_tokens = latest_turn_total_usage_tokens(&turns); let context_window_used_tokens = latest_turn_total_usage_tokens(&turns);
let context_window_max_tokens = session_meta let context_window_max_tokens = session_meta
.and_then(|m| m.context_tokens) .and_then(|m| m.context_tokens)
@@ -417,19 +682,29 @@ impl OpenClawParser {
}) })
} }
/// Resolve JSONL path and optional session metadata from a compound conversation ID. /// Resolve a conversation_id to (jsonl_path, leaf_id, session_meta).
///
/// Conversation ID formats:
/// - `agentId/sessionId/leafId` — tree-aware branch
/// - `agentId/sessionId` — legacy, uses latest branch
/// - bare UUID — ACP session ID fallback
fn resolve_session( fn resolve_session(
&self, &self,
conversation_id: &str, conversation_id: &str,
) -> Result<(PathBuf, Option<SessionMeta>), ParseError> { ) -> Result<(PathBuf, Option<String>, Option<SessionMeta>), ParseError> {
if let Some((agent_id, session_id)) = conversation_id.split_once('/') { let parts: Vec<&str> = conversation_id.splitn(3, '/').collect();
if parts.len() >= 2 {
let agent_id = parts[0];
let session_id = parts[1];
let leaf_id = parts.get(2).map(|s| s.to_string());
let agent_dir = self.base_dir.join(agent_id); let agent_dir = self.base_dir.join(agent_id);
// Try exact JSONL file
let jsonl_path = agent_dir let jsonl_path = agent_dir
.join("sessions") .join("sessions")
.join(format!("{}.jsonl", session_id)); .join(format!("{}.jsonl", session_id));
if jsonl_path.exists() { if jsonl_path.exists() {
// Try to load session metadata
let meta = Self::read_session_index(&agent_dir) let meta = Self::read_session_index(&agent_dir)
.ok() .ok()
.and_then(|index| { .and_then(|index| {
@@ -437,65 +712,71 @@ impl OpenClawParser {
.into_values() .into_values()
.find(|m| m.session_id == session_id) .find(|m| m.session_id == session_id)
}); });
return Ok((jsonl_path, meta)); return Ok((jsonl_path, leaf_id, meta));
}
// Try reset files
if let Some((path, meta)) =
Self::find_reset_file(&agent_dir, session_id)
{
return Ok((path, leaf_id, meta));
} }
} }
// Fallback: scan all agent directories // Fallback: scan all agent directories
if self.base_dir.exists() { if self.base_dir.exists() {
let session_id = conversation_id let bare_id = match parts.len() {
.split_once('/') 1 => parts[0],
.map(|(_, s)| s) 2 => parts[1],
.unwrap_or(conversation_id); _ => parts[1],
for entry in fs::read_dir(&self.base_dir)? {
let entry = match entry {
Ok(e) => e,
Err(_) => continue,
}; };
for entry in fs::read_dir(&self.base_dir)?.flatten() {
let agent_dir = entry.path(); let agent_dir = entry.path();
if !agent_dir.is_dir() { if !agent_dir.is_dir() {
continue; continue;
} }
// Try direct session ID (without agent prefix) // Try direct session ID match
let jsonl_path = agent_dir let jsonl_path = agent_dir
.join("sessions") .join("sessions")
.join(format!("{}.jsonl", session_id)); .join(format!("{}.jsonl", bare_id));
if jsonl_path.exists() { if jsonl_path.exists() {
let meta = Self::read_session_index(&agent_dir) let meta = Self::read_session_index(&agent_dir)
.ok() .ok()
.and_then(|index| { .and_then(|index| {
index index.into_values().find(|m| m.session_id == bare_id)
.into_values()
.find(|m| m.session_id == session_id)
}); });
return Ok((jsonl_path, meta)); return Ok((jsonl_path, None, meta));
} }
// Fallback: the external_id may be an ACP session ID that differs // Try reset files
// from the internal JSONL session ID. Scan sessions.json entries if let Some((path, meta)) = Self::find_reset_file(&agent_dir, bare_id) {
// and return the one whose JSONL file exists and whose updatedAt is return Ok((path, None, meta));
// closest to what the ACP connection would have created.
if let Ok(index) = Self::read_session_index(&agent_dir) {
// Find the session with the most recent updatedAt whose file exists
let mut best: Option<(PathBuf, SessionMeta, u64)> = None;
for meta in index.into_values() {
let candidate = agent_dir
.join("sessions")
.join(format!("{}.jsonl", meta.session_id));
if !candidate.exists() {
continue;
} }
let updated = meta.updated_at.unwrap_or(0);
if best.as_ref().map_or(true, |(_, _, t)| updated > *t) { // Fallback: ACP session ID doesn't match any file.
best = Some((candidate, meta, updated)); // Scan all JSONL files and search for a branch whose leaf_id
// or first user message timestamp matches. As a last resort,
// try the most recently updated session.
let jsonl_files = Self::list_jsonl_files(&agent_dir);
for (sid, path) in &jsonl_files {
if let Ok(tree) = JTree::parse(path) {
// Check if any leaf id matches the bare_id
let leaves = tree.leaf_ids();
if leaves.iter().any(|l| l == bare_id) {
let meta = Self::read_session_index(&agent_dir)
.ok()
.and_then(|index| {
index.into_values().find(|m| m.session_id == *sid)
});
return Ok((path.clone(), Some(bare_id.to_string()), meta));
} }
} }
if let Some((path, meta, _)) = best {
return Ok((path, Some(meta)));
}
} }
// No fallback: if the ID doesn't match any file or leaf,
// return ConversationNotFound to avoid showing wrong messages.
} }
} }
@@ -503,6 +784,35 @@ impl OpenClawParser {
conversation_id.to_string(), conversation_id.to_string(),
)) ))
} }
/// Find a `.jsonl.reset.*` file matching the given session_id.
fn find_reset_file(
agent_dir: &Path,
session_id: &str,
) -> Option<(PathBuf, Option<SessionMeta>)> {
let sessions_dir = agent_dir.join("sessions");
if !sessions_dir.exists() {
return None;
}
let prefix = format!("{}.jsonl.reset.", session_id);
let mut candidates: Vec<(PathBuf, String)> = Vec::new();
if let Ok(entries) = fs::read_dir(&sessions_dir) {
for entry in entries.flatten() {
let name = entry.file_name().to_string_lossy().to_string();
if name.starts_with(&prefix) {
// Extract timestamp suffix for sorting
let suffix = name[prefix.len()..].to_string();
candidates.push((entry.path(), suffix));
}
}
}
if candidates.is_empty() {
return None;
}
// Sort by timestamp suffix descending to get the latest reset file
candidates.sort_by(|a, b| b.1.cmp(&a.1));
Some((candidates.into_iter().next().unwrap().0, None))
}
} }
impl AgentParser for OpenClawParser { impl AgentParser for OpenClawParser {
@@ -513,11 +823,7 @@ impl AgentParser for OpenClawParser {
return Ok(conversations); return Ok(conversations);
} }
for entry in fs::read_dir(&self.base_dir)? { for entry in fs::read_dir(&self.base_dir)?.flatten() {
let entry = match entry {
Ok(e) => e,
Err(_) => continue,
};
let agent_dir = entry.path(); let agent_dir = entry.path();
if !agent_dir.is_dir() { if !agent_dir.is_dir() {
continue; continue;
@@ -529,24 +835,24 @@ impl AgentParser for OpenClawParser {
.to_string_lossy() .to_string_lossy()
.to_string(); .to_string();
let index = match Self::read_session_index(&agent_dir) { let index = Self::read_session_index(&agent_dir).unwrap_or_default();
Ok(idx) => idx,
// Scan all JSONL files (including reset archives)
let jsonl_files = Self::list_jsonl_files(&agent_dir);
for (session_id, path) in &jsonl_files {
let tree = match JTree::parse(path) {
Ok(t) => t,
Err(_) => continue, Err(_) => continue,
}; };
for meta in index.values() { let meta = index
let jsonl_path = agent_dir .values()
.join("sessions") .find(|m| m.session_id == *session_id)
.join(format!("{}.jsonl", meta.session_id)); .cloned();
if !jsonl_path.exists() {
continue;
}
match Self::parse_jsonl_summary(&agent_id, meta, &jsonl_path) { let summaries =
Ok(Some(summary)) => conversations.push(summary), Self::summaries_from_tree(&agent_id, session_id, &tree, meta.as_ref());
Ok(None) => continue, conversations.extend(summaries);
Err(_) => continue,
}
} }
} }
@@ -555,13 +861,36 @@ impl AgentParser for OpenClawParser {
} }
fn get_conversation(&self, conversation_id: &str) -> Result<ConversationDetail, ParseError> { fn get_conversation(&self, conversation_id: &str) -> Result<ConversationDetail, ParseError> {
let (jsonl_path, meta) = self.resolve_session(conversation_id)?; let (jsonl_path, leaf_id, meta) = self.resolve_session(conversation_id)?;
Self::parse_conversation_detail(&jsonl_path, conversation_id, meta.as_ref()) Self::parse_conversation_detail(
&jsonl_path,
conversation_id,
leaf_id.as_deref(),
meta.as_ref(),
)
} }
} }
// ── Helper functions ─────────────────────────────────────────────────── // ── Helper functions ───────────────────────────────────────────────────
/// Extract session UUID from filenames like `<uuid>.jsonl` or `<uuid>.jsonl.reset.<ts>`.
fn extract_session_id_from_filename(name: &str) -> Option<String> {
// Skip non-jsonl files
if !name.contains(".jsonl") {
return None;
}
// Skip sessions.json
if name == "sessions.json" {
return None;
}
// Extract the UUID part before .jsonl
let uuid_part = name.split(".jsonl").next()?;
if uuid_part.is_empty() {
return None;
}
Some(uuid_part.to_string())
}
fn parse_iso_timestamp(value: &serde_json::Value) -> Option<DateTime<Utc>> { fn parse_iso_timestamp(value: &serde_json::Value) -> Option<DateTime<Utc>> {
value value
.get("timestamp") .get("timestamp")
@@ -907,9 +1236,9 @@ mod tests {
} }
#[test] #[test]
fn parses_openclaw_conversation_detail() { fn parses_openclaw_conversation_detail_with_tree() {
let path = std::env::temp_dir().join(format!( let path = std::env::temp_dir().join(format!(
"codeg-openclaw-parser-{}.jsonl", "codeg-openclaw-tree-{}.jsonl",
uuid::Uuid::new_v4() uuid::Uuid::new_v4()
)); ));
let mut file = fs::File::create(&path).expect("create temp jsonl"); let mut file = fs::File::create(&path).expect("create temp jsonl");
@@ -932,7 +1261,7 @@ mod tests {
json!({"type":"message","id":"a1","parentId":"u1","timestamp":"2026-03-17T04:56:30.466Z","message":{"role":"assistant","content":[{"type":"text","text":"[[reply_to_current]] Hi there!"}],"model":"gpt-5.4","usage":{"input":100,"output":50,"cacheRead":200,"cacheWrite":0,"totalTokens":350},"stopReason":"stop","timestamp":1773723390466_i64}}) json!({"type":"message","id":"a1","parentId":"u1","timestamp":"2026-03-17T04:56:30.466Z","message":{"role":"assistant","content":[{"type":"text","text":"[[reply_to_current]] Hi there!"}],"model":"gpt-5.4","usage":{"input":100,"output":50,"cacheRead":200,"cacheWrite":0,"totalTokens":350},"stopReason":"stop","timestamp":1773723390466_i64}})
).unwrap(); ).unwrap();
let detail = OpenClawParser::parse_conversation_detail(&path, "test/test-session", None) let detail = OpenClawParser::parse_conversation_detail(&path, "test/test-session", None, None)
.expect("parse detail"); .expect("parse detail");
fs::remove_file(&path).unwrap(); fs::remove_file(&path).unwrap();
@@ -962,4 +1291,62 @@ mod tests {
let stats = detail.session_stats.unwrap(); let stats = detail.session_stats.unwrap();
assert!(stats.total_tokens.is_some()); assert!(stats.total_tokens.is_some());
} }
#[test]
fn tree_separates_branches() {
// Simulate a JSONL with a tree:
// u1 → a1 → u2 → a2 (branch 1: "Hello" conversation)
// ↘ u3 → a3 (branch 2: "Bye" conversation, forked from a1)
let path = std::env::temp_dir().join(format!(
"codeg-openclaw-branches-{}.jsonl",
uuid::Uuid::new_v4()
));
let mut file = fs::File::create(&path).expect("create temp jsonl");
writeln!(file, "{}", json!({"type":"session","version":3,"id":"s1","timestamp":"2026-03-17T01:00:00.000Z","cwd":"/tmp"})).unwrap();
// Shared prefix
writeln!(file, "{}", json!({"type":"message","id":"u1","parentId":null,"timestamp":"2026-03-17T01:00:01.000Z","message":{"role":"user","content":[{"type":"text","text":"Hello"}]}})).unwrap();
writeln!(file, "{}", json!({"type":"message","id":"a1","parentId":"u1","timestamp":"2026-03-17T01:00:02.000Z","message":{"role":"assistant","content":[{"type":"text","text":"Hi"}]}})).unwrap();
// Branch 1 continues
writeln!(file, "{}", json!({"type":"message","id":"u2","parentId":"a1","timestamp":"2026-03-17T01:00:03.000Z","message":{"role":"user","content":[{"type":"text","text":"How are you?"}]}})).unwrap();
writeln!(file, "{}", json!({"type":"message","id":"a2","parentId":"u2","timestamp":"2026-03-17T01:00:04.000Z","message":{"role":"assistant","content":[{"type":"text","text":"Good!"}]}})).unwrap();
// Branch 2 forks from a1
writeln!(file, "{}", json!({"type":"message","id":"u3","parentId":"a1","timestamp":"2026-03-17T01:00:05.000Z","message":{"role":"user","content":[{"type":"text","text":"Bye"}]}})).unwrap();
writeln!(file, "{}", json!({"type":"message","id":"a3","parentId":"u3","timestamp":"2026-03-17T01:00:06.000Z","message":{"role":"assistant","content":[{"type":"text","text":"Goodbye!"}]}})).unwrap();
let tree = JTree::parse(&path).expect("parse tree");
let branches = tree.conversation_branches();
fs::remove_file(&path).unwrap();
// Should find 2 branches
assert_eq!(branches.len(), 2);
// Branch ending at a2 should contain u2, a2 (forked at a1, u2 is the fork user msg)
let branch_a2 = branches.iter().find(|(leaf, _)| leaf == "a2").unwrap();
assert!(branch_a2.1.contains(&"u2".to_string()));
assert!(branch_a2.1.contains(&"a2".to_string()));
// Should NOT contain u3
assert!(!branch_a2.1.contains(&"u3".to_string()));
// Branch ending at a3 should contain u3, a3
let branch_a3 = branches.iter().find(|(leaf, _)| leaf == "a3").unwrap();
assert!(branch_a3.1.contains(&"u3".to_string()));
assert!(branch_a3.1.contains(&"a3".to_string()));
// Should NOT contain u2
assert!(!branch_a3.1.contains(&"u2".to_string()));
}
#[test]
fn extract_session_id_from_filename_works() {
assert_eq!(
extract_session_id_from_filename("abc-123.jsonl"),
Some("abc-123".to_string())
);
assert_eq!(
extract_session_id_from_filename("abc-123.jsonl.reset.2026-03-17T04-46-13.819Z"),
Some("abc-123".to_string())
);
assert_eq!(extract_session_id_from_filename("sessions.json"), None);
assert_eq!(extract_session_id_from_filename("readme.txt"), None);
}
} }