支持在历史会话中分叉出新会话

This commit is contained in:
xintaofei
2026-03-15 11:44:01 +08:00
parent a85ac9dcfe
commit f50484f08c
23 changed files with 503 additions and 31 deletions

View File

@@ -46,7 +46,7 @@ sea-orm-migration = { version = "1.1", features = ["sqlx-sqlite", "runtime-tokio
toml = "0.8"
notify = "6"
base64 = "0.22"
agent-client-protocol-schema = { version = "0.10", features = ["unstable_session_usage"] }
agent-client-protocol-schema = { version = "0.10", features = ["unstable_session_usage", "unstable_session_fork"] }
kill_tree = { version = "0.2", features = ["tokio"] }
[target.'cfg(not(any(target_os = "android", target_os = "ios")))'.dependencies]

View File

@@ -86,6 +86,9 @@ pub enum ConnectionCommand {
request_id: String,
option_id: String,
},
Fork {
reply: tokio::sync::oneshot::Sender<Result<crate::acp::types::ForkResultInfo, AcpError>>,
},
Disconnect,
}
@@ -494,6 +497,7 @@ async fn run_connection(
let pending_perms: PendingPermissions = Arc::new(tokio::sync::Mutex::new(HashMap::new()));
let terminal_runtime = Arc::new(TerminalRuntime::new());
let cwd = resolve_working_dir(working_dir.as_deref());
let cwd_string = cwd.to_string_lossy().to_string();
let file_system_runtime = Arc::new(FileSystemRuntime::new(cwd.clone()));
let conn_id = connection_id.clone();
@@ -617,6 +621,16 @@ async fn run_connection(
&init_resp.agent_capabilities.prompt_capabilities,
);
let supports_fork = init_resp
.agent_capabilities
.session_capabilities
.fork
.is_some();
eprintln!(
"[ACP] Agent capabilities: load_session={}, fork={}",
init_resp.agent_capabilities.load_session, supports_fork
);
// Emit connected status
let _ = handle.emit(
"acp://event",
@@ -689,26 +703,93 @@ async fn run_connection(
&perms,
&mut cmd_rx,
terminal_runtime.clone(),
&cwd_string,
supports_fork,
)
.await;
terminal_runtime.release_all_for_session(&sid).await;
loop_result
drop(session);
handle_fork_or_exit(
loop_result,
&conn_id,
&handle,
&perms,
&mut cmd_rx,
terminal_runtime.clone(),
&cwd,
&cwd_string,
)
.await
}
Err(e) => {
// session/load failed (e.g. ephemeral forked session).
// Fall back to session/new so the tab still works.
eprintln!(
"[ACP] session/load failed ({}), falling back to session/new",
e
);
let _ = handle.emit(
"acp://event",
AcpEvent::Error {
connection_id: conn_id.clone(),
message: format!("Failed to load session: {e}"),
message: format!("Failed to load session, starting new: {e}"),
},
);
Err(e)
let new_resp = cx
.send_request_to(Agent, NewSessionRequest::new(cwd.clone()))
.block_task()
.await?;
let fallback_sid = new_resp.session_id.0.to_string();
let initial_config_options = new_resp.config_options.clone();
let mut session =
cx.attach_session(new_resp, Default::default())?;
let _ = handle.emit(
"acp://event",
AcpEvent::SessionStarted {
connection_id: conn_id.clone(),
session_id: fallback_sid.clone(),
},
);
emit_session_modes(&conn_id, &handle, session.modes());
emit_session_config_options(
&conn_id,
&handle,
&initial_config_options,
);
emit_selectors_ready(&conn_id, &handle);
let loop_result = run_conversation_loop(
&mut session,
&conn_id,
&handle,
&perms,
&mut cmd_rx,
terminal_runtime.clone(),
&cwd_string,
supports_fork,
)
.await;
terminal_runtime
.release_all_for_session(&fallback_sid)
.await;
drop(session);
handle_fork_or_exit(
loop_result,
&conn_id,
&handle,
&perms,
&mut cmd_rx,
terminal_runtime.clone(),
&cwd,
&cwd_string,
)
.await
}
}
} else {
// Create new session
let new_resp = cx
.send_request_to(Agent, NewSessionRequest::new(cwd))
.send_request_to(Agent, NewSessionRequest::new(cwd.clone()))
.block_task()
.await?;
let sid = new_resp.session_id.0.to_string();
@@ -732,10 +813,23 @@ async fn run_connection(
&perms,
&mut cmd_rx,
terminal_runtime.clone(),
&cwd_string,
supports_fork,
)
.await;
terminal_runtime.release_all_for_session(&sid).await;
loop_result
drop(session);
handle_fork_or_exit(
loop_result,
&conn_id,
&handle,
&perms,
&mut cmd_rx,
terminal_runtime.clone(),
&cwd,
&cwd_string,
)
.await
}
})
.await
@@ -1197,7 +1291,96 @@ fn map_prompt_blocks(blocks: Vec<PromptInputBlock>) -> Vec<ContentBlock> {
.collect()
}
/// Result when the conversation loop exits due to a fork request.
struct ForkExitInfo {
fork_response: sacp::schema::ForkSessionResponse,
original_session_id: String,
reply: tokio::sync::oneshot::Sender<Result<crate::acp::types::ForkResultInfo, AcpError>>,
connection: ConnectionTo<Agent>,
}
/// After `run_conversation_loop` returns, handle normal exit or fork transition.
///
/// When fork is requested, the original session has already been dropped by the
/// caller. We attach to the forked session (S2) directly using the
/// `ForkSessionResponse` — no separate `session/load` is needed because S2 was
/// just created in-memory by the agent on this connection.
async fn handle_fork_or_exit(
loop_result: Result<Option<ForkExitInfo>, sacp::Error>,
conn_id: &str,
handle: &tauri::AppHandle,
perms: &PendingPermissions,
cmd_rx: &mut mpsc::Receiver<ConnectionCommand>,
terminal_runtime: Arc<TerminalRuntime>,
_cwd: &std::path::Path,
cwd_string: &str,
) -> Result<(), sacp::Error> {
let fork_info = match loop_result {
Ok(Some(info)) => info,
Ok(None) => return Ok(()),
Err(e) => return Err(e),
};
let cx = fork_info.connection;
let fork_resp = fork_info.fork_response;
let new_sid = fork_resp.session_id.0.to_string();
eprintln!(
"[ACP] Fork transition: attaching to forked session {} (original: {})",
new_sid, fork_info.original_session_id
);
// Reply success to the frontend
let _ = fork_info.reply.send(Ok(crate::acp::types::ForkResultInfo {
forked_session_id: new_sid.clone(),
original_session_id: fork_info.original_session_id,
}));
// Build a NewSessionResponse from the ForkSessionResponse so we can
// attach directly — the forked session is already live on this process.
let initial_config_options = fork_resp.config_options.clone();
let new_resp = NewSessionResponse::new(fork_resp.session_id)
.modes(fork_resp.modes)
.config_options(fork_resp.config_options)
.meta(fork_resp.meta);
let mut session = cx.attach_session(new_resp, Default::default())?;
let _ = handle.emit(
"acp://event",
AcpEvent::SessionStarted {
connection_id: conn_id.to_string(),
session_id: new_sid.clone(),
},
);
emit_session_modes(conn_id, handle, session.modes());
emit_session_config_options(conn_id, handle, &initial_config_options);
emit_selectors_ready(conn_id, handle);
let loop_result = run_conversation_loop(
&mut session,
conn_id,
handle,
perms,
cmd_rx,
terminal_runtime.clone(),
cwd_string,
true, // fork already succeeded on this process
)
.await;
terminal_runtime.release_all_for_session(&new_sid).await;
drop(session);
// Recursively handle nested forks
Box::pin(handle_fork_or_exit(
loop_result, conn_id, handle, perms, cmd_rx, terminal_runtime, _cwd, cwd_string,
))
.await
}
/// Main conversation command loop: wait for frontend commands and process them.
///
/// Returns `Ok(None)` on normal exit (disconnect / channel closed) or
/// `Ok(Some(ForkExitInfo))` when the loop should be restarted on a forked session.
async fn run_conversation_loop<'a>(
session: &mut sacp::ActiveSession<'a, Agent>,
conn_id: &str,
@@ -1205,7 +1388,9 @@ async fn run_conversation_loop<'a>(
perms: &PendingPermissions,
cmd_rx: &mut mpsc::Receiver<ConnectionCommand>,
terminal_runtime: Arc<TerminalRuntime>,
) -> Result<(), sacp::Error> {
cwd: &str,
supports_fork: bool,
) -> Result<Option<ForkExitInfo>, sacp::Error> {
loop {
// Wait for either a user command or a session update (e.g. available_commands_update)
let cmd = loop {
@@ -1565,12 +1750,45 @@ async fn run_conversation_loop<'a>(
));
}
}
Some(ConnectionCommand::Fork { reply }) => {
if !supports_fork {
let _ = reply.send(Err(AcpError::protocol(
"This agent does not support session/fork".to_string(),
)));
continue;
}
let cx = session.connection();
let sid = session.session_id().clone();
eprintln!(
"[ACP] Sending session/fork for session_id={} cwd={}",
sid.0, cwd
);
let result = crate::acp::fork::fork_session(&cx, &sid, cwd).await;
match result {
Ok(fork_response) => {
eprintln!(
"[ACP] Fork succeeded: new_session_id={}",
fork_response.session_id.0
);
return Ok(Some(ForkExitInfo {
fork_response,
original_session_id: sid.0.to_string(),
reply,
connection: cx,
}));
}
Err(e) => {
eprintln!("[ACP] Fork failed: {e}");
let _ = reply.send(Err(e));
}
}
}
Some(ConnectionCommand::Disconnect) | None => {
break;
}
}
}
Ok(())
Ok(None)
}
/// Serialize a Vec<ToolCallContent> into a human-readable text string.

35
src-tauri/src/acp/fork.rs Normal file
View File

@@ -0,0 +1,35 @@
//! ACP `session/fork` support via raw JSON-RPC messages.
//!
//! The `sacp` crate does not yet provide typed request/response types for
//! `session/fork`, so we use `UntypedMessage` (the same pattern used for
//! `session/set_config_option` in connection.rs).
use sacp::schema::{ForkSessionRequest, ForkSessionResponse, SessionId};
use sacp::{Agent, ConnectionTo, UntypedMessage};
use crate::acp::error::AcpError;
/// Send a `session/fork` request over an existing ACP connection.
///
/// Returns the full `ForkSessionResponse` so the caller can attach directly
/// without a separate `session/load` round-trip.
pub async fn fork_session(
cx: &ConnectionTo<Agent>,
session_id: &SessionId,
cwd: &str,
) -> Result<ForkSessionResponse, AcpError> {
let req = ForkSessionRequest::new(session_id.clone(), cwd);
let untyped_req = UntypedMessage::new("session/fork", &req)
.map_err(|e| AcpError::protocol(format!("Failed to build fork request: {e}")))?;
let raw_response: serde_json::Value = cx
.send_request_to(Agent, untyped_req)
.block_task()
.await
.map_err(|e| AcpError::protocol(format!("session/fork failed: {e}")))?;
let response: ForkSessionResponse = serde_json::from_value(raw_response)
.map_err(|e| AcpError::protocol(format!("Failed to parse fork response: {e}")))?;
Ok(response)
}

View File

@@ -5,7 +5,7 @@ use tokio::sync::Mutex;
use crate::acp::connection::{spawn_agent_connection, AgentConnection, ConnectionCommand};
use crate::acp::error::AcpError;
use crate::acp::types::{ConnectionInfo, PromptInputBlock};
use crate::acp::types::{ConnectionInfo, ForkResultInfo, PromptInputBlock};
use crate::models::agent::AgentType;
pub struct ConnectionManager {
@@ -143,6 +143,24 @@ impl ConnectionManager {
.map_err(|_| AcpError::ProcessExited)
}
pub async fn fork_session(&self, conn_id: &str) -> Result<ForkResultInfo, AcpError> {
let cmd_tx = {
let connections = self.connections.lock().await;
let conn = connections
.get(conn_id)
.ok_or_else(|| AcpError::ConnectionNotFound(conn_id.into()))?;
conn.cmd_tx.clone()
};
let (reply_tx, reply_rx) = tokio::sync::oneshot::channel();
cmd_tx
.send(ConnectionCommand::Fork { reply: reply_tx })
.await
.map_err(|_| AcpError::ProcessExited)?;
reply_rx
.await
.map_err(|_| AcpError::protocol("Fork reply channel closed".to_string()))?
}
pub async fn disconnect(&self, conn_id: &str) -> Result<(), AcpError> {
let cmd_tx = {
let mut connections = self.connections.lock().await;

View File

@@ -1,6 +1,7 @@
pub mod binary_cache;
pub mod connection;
pub mod error;
pub mod fork;
pub mod file_system_runtime;
pub mod manager;
pub mod preflight;

View File

@@ -289,3 +289,10 @@ pub struct AvailableCommandInfo {
pub description: String,
pub input_hint: Option<String>,
}
#[derive(Debug, Clone, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct ForkResultInfo {
pub forked_session_id: String,
pub original_session_id: String,
}

View File

@@ -12,7 +12,7 @@ use crate::acp::preflight::{self, PreflightResult};
use crate::acp::registry;
use crate::acp::types::{
AcpAgentInfo, AgentSkillContent, AgentSkillItem, AgentSkillLayout, AgentSkillLocation,
AgentSkillScope, AgentSkillsListResult, ConnectionInfo, PromptInputBlock,
AgentSkillScope, AgentSkillsListResult, ConnectionInfo, ForkResultInfo, PromptInputBlock,
};
use crate::db::service::agent_setting_service;
use crate::db::AppDatabase;
@@ -1367,6 +1367,14 @@ pub async fn acp_cancel(
manager.cancel(&connection_id).await
}
#[tauri::command]
pub async fn acp_fork(
connection_id: String,
manager: State<'_, ConnectionManager>,
) -> Result<ForkResultInfo, AcpError> {
manager.fork_session(&connection_id).await
}
#[tauri::command]
pub async fn acp_respond_permission(
connection_id: String,

View File

@@ -264,6 +264,7 @@ pub fn run() {
acp_commands::acp_set_mode,
acp_commands::acp_set_config_option,
acp_commands::acp_cancel,
acp_commands::acp_fork,
acp_commands::acp_respond_permission,
acp_commands::acp_disconnect,
acp_commands::acp_list_connections,