diff --git a/src-tauri/Cargo.toml b/src-tauri/Cargo.toml index b278870..ebeff2d 100644 --- a/src-tauri/Cargo.toml +++ b/src-tauri/Cargo.toml @@ -46,7 +46,7 @@ sea-orm-migration = { version = "1.1", features = ["sqlx-sqlite", "runtime-tokio toml = "0.8" notify = "6" base64 = "0.22" -agent-client-protocol-schema = { version = "0.10", features = ["unstable_session_usage"] } +agent-client-protocol-schema = { version = "0.10", features = ["unstable_session_usage", "unstable_session_fork"] } kill_tree = { version = "0.2", features = ["tokio"] } [target.'cfg(not(any(target_os = "android", target_os = "ios")))'.dependencies] diff --git a/src-tauri/src/acp/connection.rs b/src-tauri/src/acp/connection.rs index e3243a8..4e679a5 100644 --- a/src-tauri/src/acp/connection.rs +++ b/src-tauri/src/acp/connection.rs @@ -86,6 +86,9 @@ pub enum ConnectionCommand { request_id: String, option_id: String, }, + Fork { + reply: tokio::sync::oneshot::Sender>, + }, Disconnect, } @@ -494,6 +497,7 @@ async fn run_connection( let pending_perms: PendingPermissions = Arc::new(tokio::sync::Mutex::new(HashMap::new())); let terminal_runtime = Arc::new(TerminalRuntime::new()); let cwd = resolve_working_dir(working_dir.as_deref()); + let cwd_string = cwd.to_string_lossy().to_string(); let file_system_runtime = Arc::new(FileSystemRuntime::new(cwd.clone())); let conn_id = connection_id.clone(); @@ -617,6 +621,16 @@ async fn run_connection( &init_resp.agent_capabilities.prompt_capabilities, ); + let supports_fork = init_resp + .agent_capabilities + .session_capabilities + .fork + .is_some(); + eprintln!( + "[ACP] Agent capabilities: load_session={}, fork={}", + init_resp.agent_capabilities.load_session, supports_fork + ); + // Emit connected status let _ = handle.emit( "acp://event", @@ -689,26 +703,93 @@ async fn run_connection( &perms, &mut cmd_rx, terminal_runtime.clone(), + &cwd_string, + supports_fork, ) .await; terminal_runtime.release_all_for_session(&sid).await; - loop_result + drop(session); + handle_fork_or_exit( + loop_result, + &conn_id, + &handle, + &perms, + &mut cmd_rx, + terminal_runtime.clone(), + &cwd, + &cwd_string, + ) + .await } Err(e) => { + // session/load failed (e.g. ephemeral forked session). + // Fall back to session/new so the tab still works. + eprintln!( + "[ACP] session/load failed ({}), falling back to session/new", + e + ); let _ = handle.emit( "acp://event", AcpEvent::Error { connection_id: conn_id.clone(), - message: format!("Failed to load session: {e}"), + message: format!("Failed to load session, starting new: {e}"), }, ); - Err(e) + let new_resp = cx + .send_request_to(Agent, NewSessionRequest::new(cwd.clone())) + .block_task() + .await?; + let fallback_sid = new_resp.session_id.0.to_string(); + let initial_config_options = new_resp.config_options.clone(); + let mut session = + cx.attach_session(new_resp, Default::default())?; + let _ = handle.emit( + "acp://event", + AcpEvent::SessionStarted { + connection_id: conn_id.clone(), + session_id: fallback_sid.clone(), + }, + ); + emit_session_modes(&conn_id, &handle, session.modes()); + emit_session_config_options( + &conn_id, + &handle, + &initial_config_options, + ); + emit_selectors_ready(&conn_id, &handle); + + let loop_result = run_conversation_loop( + &mut session, + &conn_id, + &handle, + &perms, + &mut cmd_rx, + terminal_runtime.clone(), + &cwd_string, + supports_fork, + ) + .await; + terminal_runtime + .release_all_for_session(&fallback_sid) + .await; + drop(session); + handle_fork_or_exit( + loop_result, + &conn_id, + &handle, + &perms, + &mut cmd_rx, + terminal_runtime.clone(), + &cwd, + &cwd_string, + ) + .await } } } else { // Create new session let new_resp = cx - .send_request_to(Agent, NewSessionRequest::new(cwd)) + .send_request_to(Agent, NewSessionRequest::new(cwd.clone())) .block_task() .await?; let sid = new_resp.session_id.0.to_string(); @@ -732,10 +813,23 @@ async fn run_connection( &perms, &mut cmd_rx, terminal_runtime.clone(), + &cwd_string, + supports_fork, ) .await; terminal_runtime.release_all_for_session(&sid).await; - loop_result + drop(session); + handle_fork_or_exit( + loop_result, + &conn_id, + &handle, + &perms, + &mut cmd_rx, + terminal_runtime.clone(), + &cwd, + &cwd_string, + ) + .await } }) .await @@ -1197,7 +1291,96 @@ fn map_prompt_blocks(blocks: Vec) -> Vec { .collect() } +/// Result when the conversation loop exits due to a fork request. +struct ForkExitInfo { + fork_response: sacp::schema::ForkSessionResponse, + original_session_id: String, + reply: tokio::sync::oneshot::Sender>, + connection: ConnectionTo, +} + +/// After `run_conversation_loop` returns, handle normal exit or fork transition. +/// +/// When fork is requested, the original session has already been dropped by the +/// caller. We attach to the forked session (S2) directly using the +/// `ForkSessionResponse` — no separate `session/load` is needed because S2 was +/// just created in-memory by the agent on this connection. +async fn handle_fork_or_exit( + loop_result: Result, sacp::Error>, + conn_id: &str, + handle: &tauri::AppHandle, + perms: &PendingPermissions, + cmd_rx: &mut mpsc::Receiver, + terminal_runtime: Arc, + _cwd: &std::path::Path, + cwd_string: &str, +) -> Result<(), sacp::Error> { + let fork_info = match loop_result { + Ok(Some(info)) => info, + Ok(None) => return Ok(()), + Err(e) => return Err(e), + }; + + let cx = fork_info.connection; + let fork_resp = fork_info.fork_response; + let new_sid = fork_resp.session_id.0.to_string(); + + eprintln!( + "[ACP] Fork transition: attaching to forked session {} (original: {})", + new_sid, fork_info.original_session_id + ); + + // Reply success to the frontend + let _ = fork_info.reply.send(Ok(crate::acp::types::ForkResultInfo { + forked_session_id: new_sid.clone(), + original_session_id: fork_info.original_session_id, + })); + + // Build a NewSessionResponse from the ForkSessionResponse so we can + // attach directly — the forked session is already live on this process. + let initial_config_options = fork_resp.config_options.clone(); + let new_resp = NewSessionResponse::new(fork_resp.session_id) + .modes(fork_resp.modes) + .config_options(fork_resp.config_options) + .meta(fork_resp.meta); + let mut session = cx.attach_session(new_resp, Default::default())?; + + let _ = handle.emit( + "acp://event", + AcpEvent::SessionStarted { + connection_id: conn_id.to_string(), + session_id: new_sid.clone(), + }, + ); + emit_session_modes(conn_id, handle, session.modes()); + emit_session_config_options(conn_id, handle, &initial_config_options); + emit_selectors_ready(conn_id, handle); + + let loop_result = run_conversation_loop( + &mut session, + conn_id, + handle, + perms, + cmd_rx, + terminal_runtime.clone(), + cwd_string, + true, // fork already succeeded on this process + ) + .await; + terminal_runtime.release_all_for_session(&new_sid).await; + drop(session); + + // Recursively handle nested forks + Box::pin(handle_fork_or_exit( + loop_result, conn_id, handle, perms, cmd_rx, terminal_runtime, _cwd, cwd_string, + )) + .await +} + /// Main conversation command loop: wait for frontend commands and process them. +/// +/// Returns `Ok(None)` on normal exit (disconnect / channel closed) or +/// `Ok(Some(ForkExitInfo))` when the loop should be restarted on a forked session. async fn run_conversation_loop<'a>( session: &mut sacp::ActiveSession<'a, Agent>, conn_id: &str, @@ -1205,7 +1388,9 @@ async fn run_conversation_loop<'a>( perms: &PendingPermissions, cmd_rx: &mut mpsc::Receiver, terminal_runtime: Arc, -) -> Result<(), sacp::Error> { + cwd: &str, + supports_fork: bool, +) -> Result, sacp::Error> { loop { // Wait for either a user command or a session update (e.g. available_commands_update) let cmd = loop { @@ -1565,12 +1750,45 @@ async fn run_conversation_loop<'a>( )); } } + Some(ConnectionCommand::Fork { reply }) => { + if !supports_fork { + let _ = reply.send(Err(AcpError::protocol( + "This agent does not support session/fork".to_string(), + ))); + continue; + } + let cx = session.connection(); + let sid = session.session_id().clone(); + eprintln!( + "[ACP] Sending session/fork for session_id={} cwd={}", + sid.0, cwd + ); + let result = crate::acp::fork::fork_session(&cx, &sid, cwd).await; + match result { + Ok(fork_response) => { + eprintln!( + "[ACP] Fork succeeded: new_session_id={}", + fork_response.session_id.0 + ); + return Ok(Some(ForkExitInfo { + fork_response, + original_session_id: sid.0.to_string(), + reply, + connection: cx, + })); + } + Err(e) => { + eprintln!("[ACP] Fork failed: {e}"); + let _ = reply.send(Err(e)); + } + } + } Some(ConnectionCommand::Disconnect) | None => { break; } } } - Ok(()) + Ok(None) } /// Serialize a Vec into a human-readable text string. diff --git a/src-tauri/src/acp/fork.rs b/src-tauri/src/acp/fork.rs new file mode 100644 index 0000000..4d393ac --- /dev/null +++ b/src-tauri/src/acp/fork.rs @@ -0,0 +1,35 @@ +//! ACP `session/fork` support via raw JSON-RPC messages. +//! +//! The `sacp` crate does not yet provide typed request/response types for +//! `session/fork`, so we use `UntypedMessage` (the same pattern used for +//! `session/set_config_option` in connection.rs). + +use sacp::schema::{ForkSessionRequest, ForkSessionResponse, SessionId}; +use sacp::{Agent, ConnectionTo, UntypedMessage}; + +use crate::acp::error::AcpError; + +/// Send a `session/fork` request over an existing ACP connection. +/// +/// Returns the full `ForkSessionResponse` so the caller can attach directly +/// without a separate `session/load` round-trip. +pub async fn fork_session( + cx: &ConnectionTo, + session_id: &SessionId, + cwd: &str, +) -> Result { + let req = ForkSessionRequest::new(session_id.clone(), cwd); + let untyped_req = UntypedMessage::new("session/fork", &req) + .map_err(|e| AcpError::protocol(format!("Failed to build fork request: {e}")))?; + + let raw_response: serde_json::Value = cx + .send_request_to(Agent, untyped_req) + .block_task() + .await + .map_err(|e| AcpError::protocol(format!("session/fork failed: {e}")))?; + + let response: ForkSessionResponse = serde_json::from_value(raw_response) + .map_err(|e| AcpError::protocol(format!("Failed to parse fork response: {e}")))?; + + Ok(response) +} diff --git a/src-tauri/src/acp/manager.rs b/src-tauri/src/acp/manager.rs index 5125476..e49dd8c 100644 --- a/src-tauri/src/acp/manager.rs +++ b/src-tauri/src/acp/manager.rs @@ -5,7 +5,7 @@ use tokio::sync::Mutex; use crate::acp::connection::{spawn_agent_connection, AgentConnection, ConnectionCommand}; use crate::acp::error::AcpError; -use crate::acp::types::{ConnectionInfo, PromptInputBlock}; +use crate::acp::types::{ConnectionInfo, ForkResultInfo, PromptInputBlock}; use crate::models::agent::AgentType; pub struct ConnectionManager { @@ -143,6 +143,24 @@ impl ConnectionManager { .map_err(|_| AcpError::ProcessExited) } + pub async fn fork_session(&self, conn_id: &str) -> Result { + let cmd_tx = { + let connections = self.connections.lock().await; + let conn = connections + .get(conn_id) + .ok_or_else(|| AcpError::ConnectionNotFound(conn_id.into()))?; + conn.cmd_tx.clone() + }; + let (reply_tx, reply_rx) = tokio::sync::oneshot::channel(); + cmd_tx + .send(ConnectionCommand::Fork { reply: reply_tx }) + .await + .map_err(|_| AcpError::ProcessExited)?; + reply_rx + .await + .map_err(|_| AcpError::protocol("Fork reply channel closed".to_string()))? + } + pub async fn disconnect(&self, conn_id: &str) -> Result<(), AcpError> { let cmd_tx = { let mut connections = self.connections.lock().await; diff --git a/src-tauri/src/acp/mod.rs b/src-tauri/src/acp/mod.rs index 9770714..415c358 100644 --- a/src-tauri/src/acp/mod.rs +++ b/src-tauri/src/acp/mod.rs @@ -1,6 +1,7 @@ pub mod binary_cache; pub mod connection; pub mod error; +pub mod fork; pub mod file_system_runtime; pub mod manager; pub mod preflight; diff --git a/src-tauri/src/acp/types.rs b/src-tauri/src/acp/types.rs index 6a49295..7027301 100644 --- a/src-tauri/src/acp/types.rs +++ b/src-tauri/src/acp/types.rs @@ -289,3 +289,10 @@ pub struct AvailableCommandInfo { pub description: String, pub input_hint: Option, } + +#[derive(Debug, Clone, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct ForkResultInfo { + pub forked_session_id: String, + pub original_session_id: String, +} diff --git a/src-tauri/src/commands/acp.rs b/src-tauri/src/commands/acp.rs index 89cc3aa..ae4b57e 100644 --- a/src-tauri/src/commands/acp.rs +++ b/src-tauri/src/commands/acp.rs @@ -12,7 +12,7 @@ use crate::acp::preflight::{self, PreflightResult}; use crate::acp::registry; use crate::acp::types::{ AcpAgentInfo, AgentSkillContent, AgentSkillItem, AgentSkillLayout, AgentSkillLocation, - AgentSkillScope, AgentSkillsListResult, ConnectionInfo, PromptInputBlock, + AgentSkillScope, AgentSkillsListResult, ConnectionInfo, ForkResultInfo, PromptInputBlock, }; use crate::db::service::agent_setting_service; use crate::db::AppDatabase; @@ -1367,6 +1367,14 @@ pub async fn acp_cancel( manager.cancel(&connection_id).await } +#[tauri::command] +pub async fn acp_fork( + connection_id: String, + manager: State<'_, ConnectionManager>, +) -> Result { + manager.fork_session(&connection_id).await +} + #[tauri::command] pub async fn acp_respond_permission( connection_id: String, diff --git a/src-tauri/src/lib.rs b/src-tauri/src/lib.rs index 0601401..9953b34 100644 --- a/src-tauri/src/lib.rs +++ b/src-tauri/src/lib.rs @@ -264,6 +264,7 @@ pub fn run() { acp_commands::acp_set_mode, acp_commands::acp_set_config_option, acp_commands::acp_cancel, + acp_commands::acp_fork, acp_commands::acp_respond_permission, acp_commands::acp_disconnect, acp_commands::acp_list_connections, diff --git a/src/components/chat/chat-input.tsx b/src/components/chat/chat-input.tsx index d0fd62a..e5454e3 100644 --- a/src/components/chat/chat-input.tsx +++ b/src/components/chat/chat-input.tsx @@ -41,6 +41,7 @@ interface ChatInputProps { isEditingQueueItem?: boolean onSaveQueueEdit?: (draft: PromptDraft) => void onCancelQueueEdit?: () => void + onForkSend?: (draft: PromptDraft, modeId?: string | null) => void } export function ChatInput({ @@ -71,6 +72,7 @@ export function ChatInput({ isEditingQueueItem, onSaveQueueEdit, onCancelQueueEdit, + onForkSend, }: ChatInputProps) { const t = useTranslations("Folder.chat.chatInput") const isConnected = status === "connected" @@ -116,6 +118,7 @@ export function ChatInput({ isEditingQueueItem={isEditingQueueItem} onSaveQueueEdit={onSaveQueueEdit} onCancelQueueEdit={onCancelQueueEdit} + onForkSend={onForkSend} placeholder={ isConnecting ? t("connecting") diff --git a/src/components/chat/conversation-shell.tsx b/src/components/chat/conversation-shell.tsx index 10bf60f..9b0e917 100644 --- a/src/components/chat/conversation-shell.tsx +++ b/src/components/chat/conversation-shell.tsx @@ -51,6 +51,7 @@ interface ConversationShellProps { isEditingQueueItem?: boolean onSaveQueueEdit?: (draft: PromptDraft) => void onCancelQueueEdit?: () => void + onForkSend?: (draft: PromptDraft, modeId?: string | null) => void } export function ConversationShell({ @@ -88,6 +89,7 @@ export function ConversationShell({ isEditingQueueItem, onSaveQueueEdit, onCancelQueueEdit, + onForkSend, }: ConversationShellProps) { return (
@@ -129,6 +131,7 @@ export function ConversationShell({ isEditingQueueItem={isEditingQueueItem} onSaveQueueEdit={onSaveQueueEdit} onCancelQueueEdit={onCancelQueueEdit} + onForkSend={onForkSend} /> )} diff --git a/src/components/chat/message-input.tsx b/src/components/chat/message-input.tsx index 01540b9..d954cc4 100644 --- a/src/components/chat/message-input.tsx +++ b/src/components/chat/message-input.tsx @@ -15,14 +15,22 @@ import { import { Textarea } from "@/components/ui/textarea" import { Check, + ChevronUp, Ellipsis, FileSearch, + GitFork, ListPlus, Plus, Send, Square, X, } from "lucide-react" +import { + DropdownMenu, + DropdownMenuContent, + DropdownMenuItem, + DropdownMenuTrigger, +} from "@/components/ui/dropdown-menu" import { cn } from "@/lib/utils" import { matchShortcutEvent } from "@/lib/keyboard-shortcuts" import { useShortcutSettings } from "@/hooks/use-shortcut-settings" @@ -76,6 +84,7 @@ interface MessageInputProps { isEditingQueueItem?: boolean onSaveQueueEdit?: (draft: PromptDraft) => void onCancelQueueEdit?: () => void + onForkSend?: (draft: PromptDraft, modeId?: string | null) => void } interface ResourceInputAttachment { @@ -280,6 +289,7 @@ export function MessageInput({ isEditingQueueItem = false, onSaveQueueEdit, onCancelQueueEdit, + onForkSend, }: MessageInputProps) { const t = useTranslations("Folder.chat.messageInput") const tQueue = useTranslations("Folder.chat.messageQueue") @@ -960,6 +970,24 @@ export function MessageInput({ effectiveDraftStorageKey, ]) + const handleForkSendClick = useCallback(() => { + if (!onForkSend) return + const draft = buildDraft() + if (!draft) return + onForkSend(draft, showModeSelector ? effectiveModeId : null) + if (effectiveDraftStorageKey) { + clearMessageInputDraft(effectiveDraftStorageKey) + } + setText("") + setAttachments([]) + }, [ + onForkSend, + buildDraft, + effectiveModeId, + showModeSelector, + effectiveDraftStorageKey, + ]) + const handleKeyDown = useCallback( (e: React.KeyboardEvent) => { if ( @@ -1288,6 +1316,35 @@ export function MessageInput({
+ ) : onForkSend ? ( +
+ + + + + + + + + {t("forkAndSend")} + + + +
) : (