feat(acp): surface Claude API retry state in chat input

Enable raw Claude SDK forwarding for ACP sessions and emit only system/api_retry events to the frontend.

Show a localized single-line retry banner with loading under the conversation input, including error details and retry progress.
This commit is contained in:
xintaofei
2026-04-14 14:59:32 +08:00
parent 77e46d80f8
commit f9923df1fe
17 changed files with 492 additions and 17 deletions

View File

@@ -1,5 +1,5 @@
use std::collections::{BTreeMap, HashMap, HashSet};
use std::path::PathBuf;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use sacp::schema::McpServerStdio;
@@ -706,6 +706,47 @@ fn resolve_working_dir(working_dir: Option<&str>) -> PathBuf {
}
}
fn claude_raw_sdk_session_meta(
agent_type: AgentType,
) -> Option<serde_json::Map<String, serde_json::Value>> {
if agent_type != AgentType::ClaudeCode {
return None;
}
let mut claude_code = serde_json::Map::new();
claude_code.insert(
"emitRawSDKMessages".to_string(),
serde_json::Value::Bool(true),
);
let mut meta = serde_json::Map::new();
meta.insert(
"claudeCode".to_string(),
serde_json::Value::Object(claude_code),
);
Some(meta)
}
fn build_new_session_request(agent_type: AgentType, cwd: &Path) -> NewSessionRequest {
let mut req = NewSessionRequest::new(cwd.to_path_buf());
if let Some(meta) = claude_raw_sdk_session_meta(agent_type) {
req = req.meta(meta);
}
req
}
fn build_load_session_request(
agent_type: AgentType,
session_id: SessionId,
cwd: &Path,
) -> LoadSessionRequest {
let mut req = LoadSessionRequest::new(session_id, cwd.to_path_buf());
if let Some(meta) = claude_raw_sdk_session_meta(agent_type) {
req = req.meta(meta);
}
req
}
/// The main ACP connection loop.
async fn run_connection(
agent: AcpAgent,
@@ -926,7 +967,8 @@ async fn run_connection(
if let Some(sid) = session_id {
// Load existing session via session/load
let load_req = LoadSessionRequest::new(SessionId::new(sid.clone()), &cwd);
let load_req =
build_load_session_request(agent_type, SessionId::new(sid.clone()), &cwd);
let load_result = cx.send_request_to(Agent, load_req).block_task().await;
match load_result {
@@ -963,7 +1005,11 @@ async fn run_connection(
Ok(())
})
.await
.otherwise_ignore();
.otherwise(async |dispatch| {
maybe_emit_claude_sdk_ext_notification(&cid, &h, dispatch);
Ok(())
})
.await;
}
}
if drained > 0 {
@@ -1042,7 +1088,7 @@ async fn run_connection(
);
}
let new_resp = cx
.send_request_to(Agent, NewSessionRequest::new(cwd.clone()))
.send_request_to(Agent, build_new_session_request(agent_type, &cwd))
.block_task()
.await?;
let fallback_sid = new_resp.session_id.0.to_string();
@@ -1099,7 +1145,7 @@ async fn run_connection(
} else {
// Create new session
let new_resp = cx
.send_request_to(Agent, NewSessionRequest::new(cwd.clone()))
.send_request_to(Agent, build_new_session_request(agent_type, &cwd))
.block_task()
.await?;
let sid = new_resp.session_id.0.to_string();
@@ -1778,7 +1824,11 @@ async fn run_conversation_loop<'a>(
},
)
.await
.otherwise_ignore();
.otherwise(async |dispatch| {
maybe_emit_claude_sdk_ext_notification(&cid, &h, dispatch);
Ok(())
})
.await;
}
Ok(_) => {}
Err(e) => {
@@ -1879,7 +1929,11 @@ async fn run_conversation_loop<'a>(
},
)
.await
.otherwise_ignore()
.otherwise(async |dispatch| {
maybe_emit_claude_sdk_ext_notification(&cid, &h, dispatch);
Ok(())
})
.await
{
eprintln!("[ACP] Ignoring dispatch parse error: {e}");
}
@@ -2347,6 +2401,58 @@ fn map_plan_entries(plan: &Plan) -> Vec<PlanEntryInfo> {
.collect()
}
fn parse_claude_sdk_message_params(
params: &serde_json::Value,
) -> Option<(String, serde_json::Value)> {
let obj = params.as_object()?;
let session_id = obj.get("sessionId")?.as_str()?.to_string();
let message = obj.get("message")?.clone();
Some((session_id, message))
}
fn is_claude_api_retry_message(message: &serde_json::Value) -> bool {
let obj = match message.as_object() {
Some(obj) => obj,
None => return false,
};
let message_type = obj.get("type").and_then(|v| v.as_str());
let message_subtype = obj.get("subtype").and_then(|v| v.as_str());
matches!(message_type, Some("system")) && matches!(message_subtype, Some("api_retry"))
}
fn map_claude_sdk_ext_notification(
connection_id: &str,
notification: &UntypedMessage,
) -> Option<AcpEvent> {
if notification.method() != "_claude/sdkMessage" {
return None;
}
let (session_id, message) = parse_claude_sdk_message_params(notification.params())?;
if !is_claude_api_retry_message(&message) {
return None;
}
Some(AcpEvent::ClaudeSdkMessage {
connection_id: connection_id.to_string(),
session_id,
message,
})
}
fn maybe_emit_claude_sdk_ext_notification(
connection_id: &str,
emitter: &EventEmitter,
dispatch: Dispatch,
) {
let Dispatch::Notification(notification) = dispatch else {
return;
};
if let Some(event) = map_claude_sdk_ext_notification(connection_id, &notification) {
crate::web::event_bridge::emit_event(emitter, "acp://event", event);
}
}
/// Fix null fields in `usage_update` notifications that would otherwise fail deserialization.
///
/// Some ACP agents send `"used": null` in usage_update notifications, but the
@@ -2524,3 +2630,110 @@ fn emit_conversation_update(
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn claude_raw_sdk_meta_enabled_only_for_claude() {
let claude_meta = claude_raw_sdk_session_meta(AgentType::ClaudeCode)
.expect("Claude must have raw SDK meta");
assert_eq!(
claude_meta
.get("claudeCode")
.and_then(|v| v.get("emitRawSDKMessages"))
.and_then(|v| v.as_bool()),
Some(true)
);
assert!(claude_raw_sdk_session_meta(AgentType::Codex).is_none());
}
#[test]
fn map_claude_sdk_ext_notification_maps_valid_payload() {
let raw = UntypedMessage::new(
"_claude/sdkMessage",
serde_json::json!({
"sessionId": "session-123",
"message": {
"type": "system",
"subtype": "api_retry",
"attempt": 3,
"max_retries": 10
}
}),
)
.unwrap();
let event = map_claude_sdk_ext_notification("conn-1", &raw)
.expect("valid sdk payload should map");
match event {
AcpEvent::ClaudeSdkMessage {
connection_id,
session_id,
message,
} => {
assert_eq!(connection_id, "conn-1");
assert_eq!(session_id, "session-123");
assert_eq!(message.get("type").and_then(|v| v.as_str()), Some("system"));
}
_ => panic!("expected ClaudeSdkMessage"),
}
}
#[test]
fn map_claude_sdk_ext_notification_rejects_non_api_retry() {
let non_retry = UntypedMessage::new(
"_claude/sdkMessage",
serde_json::json!({
"sessionId": "session-123",
"message": {"type": "system", "subtype": "status"}
}),
)
.unwrap();
assert!(map_claude_sdk_ext_notification("conn-1", &non_retry).is_none());
}
#[test]
fn map_claude_sdk_ext_notification_rejects_invalid_payload() {
let wrong_method = UntypedMessage::new(
"_other/method",
serde_json::json!({"sessionId": "s", "message": {}}),
)
.unwrap();
assert!(map_claude_sdk_ext_notification("conn-1", &wrong_method).is_none());
let missing_fields = UntypedMessage::new(
"_claude/sdkMessage",
serde_json::json!({"sessionId": 1}),
)
.unwrap();
assert!(map_claude_sdk_ext_notification("conn-1", &missing_fields).is_none());
}
#[test]
fn build_new_session_request_sets_claude_raw_meta() {
let cwd = std::path::PathBuf::from("/tmp/codeg");
let req = build_new_session_request(AgentType::ClaudeCode, &cwd);
assert_eq!(
req.meta
.as_ref()
.and_then(|m| m.get("claudeCode"))
.and_then(|v| v.get("emitRawSDKMessages"))
.and_then(|v| v.as_bool()),
Some(true)
);
}
#[test]
fn build_load_session_request_skips_meta_for_non_claude() {
let cwd = std::path::PathBuf::from("/tmp/codeg");
let req =
build_load_session_request(AgentType::Codex, SessionId::new("abc".to_string()), &cwd);
assert!(req.meta.is_none());
}
}