From a9f6ce9105b58ffc81b52ef1e3e4865f38a48062 Mon Sep 17 00:00:00 2001 From: xintaofei Date: Tue, 31 Mar 2026 15:26:29 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96=E6=B6=88=E6=81=AF=E6=B8=A0?= =?UTF-8?q?=E9=81=93=E7=9A=84=E5=AE=9E=E7=8E=B0=E4=BB=A3=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src-tauri/src/chat_channel/backends/lark.rs | 63 +++++- src-tauri/src/chat_channel/backends/mod.rs | 47 +++++ .../src/chat_channel/backends/telegram.rs | 86 +++++++- .../src/chat_channel/command_dispatcher.rs | 68 ++++--- .../src/chat_channel/command_handlers.rs | 22 +-- .../src/chat_channel/event_subscriber.rs | 176 +++++++++++------ src-tauri/src/chat_channel/manager.rs | 187 ++++++++++++------ src-tauri/src/chat_channel/scheduler.rs | 18 ++ src-tauri/src/chat_channel/types.rs | 13 ++ src-tauri/src/commands/chat_channel.rs | 134 ++++--------- src-tauri/src/web/handlers/chat_channel.rs | 3 +- src/components/settings/channel-list-tab.tsx | 28 +++ 12 files changed, 571 insertions(+), 274 deletions(-) diff --git a/src-tauri/src/chat_channel/backends/lark.rs b/src-tauri/src/chat_channel/backends/lark.rs index fb3f8a0..310823d 100644 --- a/src-tauri/src/chat_channel/backends/lark.rs +++ b/src-tauri/src/chat_channel/backends/lark.rs @@ -126,8 +126,13 @@ struct TokenCache { struct PartialMessage { parts: HashMap>, total: i32, + created_at: Instant, } +/// TTL for partial message reassembly entries. Prevents unbounded memory growth +/// if a multi-part message never completes (network issue, Lark SDK bug, etc). +const PARTIAL_MSG_TTL_SECS: u64 = 60; + // ── LarkBackend ── pub struct LarkBackend { @@ -148,7 +153,11 @@ impl LarkBackend { app_secret, chat_id, channel_id, - client: reqwest::Client::new(), + client: reqwest::Client::builder() + .connect_timeout(Duration::from_secs(10)) + .timeout(Duration::from_secs(30)) + .build() + .unwrap_or_default(), token_cache: Arc::new(RwLock::new(None)), status: Arc::new(Mutex::new(ChannelConnectionStatus::Disconnected)), shutdown_tx: Arc::new(Mutex::new(None)), @@ -310,6 +319,7 @@ impl LarkBackend { let (mut write, mut read) = ws_stream.split(); let mut partial_msgs: HashMap = HashMap::new(); + let mut last_partial_cleanup = Instant::now(); loop { tokio::select! { @@ -340,12 +350,19 @@ impl LarkBackend { let sum: i32 = frame.get_header("sum").and_then(|s| s.parse().ok()).unwrap_or(1); let seq: i32 = frame.get_header("seq").and_then(|s| s.parse().ok()).unwrap_or(0); - let full_payload = if sum <= 1 { + // Evict stale partial messages to prevent unbounded memory growth + if last_partial_cleanup.elapsed() > Duration::from_secs(PARTIAL_MSG_TTL_SECS) { + partial_msgs.retain(|_, pm| pm.created_at.elapsed() < Duration::from_secs(PARTIAL_MSG_TTL_SECS)); + last_partial_cleanup = Instant::now(); + } + + let full_payload = if sum <= 1 { Some(frame.payload.clone()) } else { let entry = partial_msgs.entry(msg_id.clone()).or_insert_with(|| PartialMessage { parts: HashMap::new(), total: sum, + created_at: Instant::now(), }); entry.parts.insert(seq, frame.payload.clone()); if entry.parts.len() as i32 >= entry.total { @@ -447,6 +464,21 @@ async fn handle_lark_event( return; } + // Group chat filtering: only process if bot is mentioned + let chat_type = event + .pointer("/event/message/chat_type") + .and_then(|v| v.as_str()) + .unwrap_or("p2p"); + + if chat_type == "group" { + let mentions = event + .pointer("/event/message/mentions") + .and_then(|v| v.as_array()); + if mentions.is_none() || mentions.unwrap().is_empty() { + return; // No mentions in group chat, ignore + } + } + let content_str = event .pointer("/event/message/content") .and_then(|v| v.as_str()) @@ -462,25 +494,48 @@ async fn handle_lark_event( return; } + // Strip mention placeholders (e.g. "@_user_1") from text + let clean_text = strip_lark_mentions(&text, event); + + if clean_text.is_empty() { + return; + } + let sender_id = event .pointer("/event/sender/sender_id/open_id") .and_then(|v| v.as_str()) .unwrap_or("unknown") .to_string(); - eprintln!("[Lark] incoming message from {}: {}", sender_id, text); + eprintln!("[Lark] incoming message from {}: {}", sender_id, clean_text); let _ = command_tx .send(IncomingCommand { channel_id, sender_id, - command_text: text, + command_text: clean_text, metadata: event.clone(), }) .await; } } +/// Strip Lark mention placeholders (e.g. `@_user_1`) from the message text. +fn strip_lark_mentions(text: &str, event: &serde_json::Value) -> String { + let mut result = text.to_string(); + if let Some(mentions) = event + .pointer("/event/message/mentions") + .and_then(|v| v.as_array()) + { + for mention in mentions { + if let Some(key) = mention.get("key").and_then(|v| v.as_str()) { + result = result.replace(key, ""); + } + } + } + result.trim().to_string() +} + /// Fetch a fresh WebSocket endpoint URL from Feishu. async fn fetch_ws_url( client: &reqwest::Client, diff --git a/src-tauri/src/chat_channel/backends/mod.rs b/src-tauri/src/chat_channel/backends/mod.rs index 84a386d..bf23602 100644 --- a/src-tauri/src/chat_channel/backends/mod.rs +++ b/src-tauri/src/chat_channel/backends/mod.rs @@ -1,2 +1,49 @@ pub mod lark; pub mod telegram; + +use super::error::ChatChannelError; +use super::traits::ChatChannelBackend; +use super::types::*; + +/// Factory function to create a backend instance from channel type, config, and token. +/// Eliminates duplicated match blocks across connect, test, and auto-connect paths. +pub fn create_backend( + channel_id: i32, + channel_type: ChannelType, + config: &serde_json::Value, + token: String, +) -> Result, ChatChannelError> { + match channel_type { + ChannelType::Telegram => { + let cfg: TelegramConfig = serde_json::from_value(config.clone()).map_err(|e| { + ChatChannelError::ConfigurationInvalid(format!("Invalid Telegram config: {e}")) + })?; + if cfg.chat_id.is_empty() { + return Err(ChatChannelError::ConfigurationInvalid( + "chat_id is required".into(), + )); + } + Ok(Box::new(telegram::TelegramBackend::new( + channel_id, + token, + cfg.chat_id, + ))) + } + ChannelType::Lark => { + let cfg: LarkConfig = serde_json::from_value(config.clone()).map_err(|e| { + ChatChannelError::ConfigurationInvalid(format!("Invalid Lark config: {e}")) + })?; + if cfg.app_id.is_empty() || cfg.chat_id.is_empty() { + return Err(ChatChannelError::ConfigurationInvalid( + "app_id and chat_id are required".into(), + )); + } + Ok(Box::new(lark::LarkBackend::new( + channel_id, + cfg.app_id, + token, + cfg.chat_id, + ))) + } + } +} diff --git a/src-tauri/src/chat_channel/backends/telegram.rs b/src-tauri/src/chat_channel/backends/telegram.rs index 9daea81..8362e1f 100644 --- a/src-tauri/src/chat_channel/backends/telegram.rs +++ b/src-tauri/src/chat_channel/backends/telegram.rs @@ -1,4 +1,5 @@ use std::sync::Arc; +use std::time::Duration; use async_trait::async_trait; use tokio::sync::{mpsc, Mutex}; @@ -21,7 +22,11 @@ impl TelegramBackend { Self { bot_token, chat_id, - client: reqwest::Client::new(), + client: reqwest::Client::builder() + .connect_timeout(Duration::from_secs(10)) + .timeout(Duration::from_secs(60)) + .build() + .unwrap_or_default(), status: Arc::new(Mutex::new(ChannelConnectionStatus::Disconnected)), channel_id, shutdown_tx: Arc::new(Mutex::new(None)), @@ -91,7 +96,7 @@ impl ChatChannelBackend for TelegramBackend { ) -> Result<(), ChatChannelError> { *self.status.lock().await = ChannelConnectionStatus::Connecting; - // Verify bot token by calling getMe + // Verify bot token and extract bot username for group @mention filtering let resp = self .client .get(self.api_url("getMe")) @@ -99,13 +104,24 @@ impl ChatChannelBackend for TelegramBackend { .await .map_err(|e| ChatChannelError::ConnectionFailed(e.to_string()))?; - if !resp.status().is_success() { + let me_body: serde_json::Value = resp + .json() + .await + .map_err(|e| ChatChannelError::ConnectionFailed(e.to_string()))?; + + if me_body.get("ok").and_then(|v| v.as_bool()) != Some(true) { *self.status.lock().await = ChannelConnectionStatus::Error; return Err(ChatChannelError::AuthenticationFailed( "Invalid bot token".to_string(), )); } + let bot_username = me_body + .pointer("/result/username") + .and_then(|v| v.as_str()) + .unwrap_or("") + .to_lowercase(); + *self.status.lock().await = ChannelConnectionStatus::Connected; // Start long-polling loop @@ -136,6 +152,14 @@ impl ChatChannelBackend for TelegramBackend { match result { Ok(resp) => { + // Recover from error state after successful poll + { + let mut s = status.lock().await; + if *s == ChannelConnectionStatus::Error { + *s = ChannelConnectionStatus::Connected; + } + } + if let Ok(body) = resp.json::().await { if let Some(updates) = body.get("result").and_then(|r| r.as_array()) { for update in updates { @@ -148,6 +172,25 @@ impl ChatChannelBackend for TelegramBackend { .pointer("/message/text") .and_then(|t| t.as_str()) { + // Group chat filtering: only process if @bot is mentioned + let chat_type = update + .pointer("/message/chat/type") + .and_then(|v| v.as_str()) + .unwrap_or("private"); + + if (chat_type == "group" || chat_type == "supergroup") + && !bot_username.is_empty() + { + let at_bot = + format!("@{}", bot_username); + if !text.to_lowercase().contains(&at_bot) { + continue; + } + } + + // Strip @bot_username from command text (case-insensitive) + let clean_text = strip_bot_mention(text, &bot_username); + let sender_id = update .pointer("/message/from/id") .and_then(|i| i.as_i64()) @@ -157,7 +200,7 @@ impl ChatChannelBackend for TelegramBackend { .send(IncomingCommand { channel_id, sender_id, - command_text: text.to_string(), + command_text: clean_text, metadata: update.clone(), }) .await; @@ -170,7 +213,6 @@ impl ChatChannelBackend for TelegramBackend { eprintln!("[Telegram] polling error: {e}"); *status.lock().await = ChannelConnectionStatus::Error; tokio::time::sleep(tokio::time::Duration::from_secs(5)).await; - *status.lock().await = ChannelConnectionStatus::Connected; } } } @@ -222,16 +264,42 @@ impl ChatChannelBackend for TelegramBackend { .await .map_err(|e| ChatChannelError::ConnectionFailed(e.to_string()))?; - if resp.status().is_success() { + let body: serde_json::Value = resp + .json() + .await + .map_err(|e| ChatChannelError::ConnectionFailed(e.to_string()))?; + + if body.get("ok").and_then(|v| v.as_bool()) == Some(true) { Ok(()) } else { - Err(ChatChannelError::AuthenticationFailed( - "Invalid bot token".to_string(), - )) + let desc = body + .get("description") + .and_then(|v| v.as_str()) + .unwrap_or("Invalid bot token"); + Err(ChatChannelError::AuthenticationFailed(desc.to_string())) } } } +/// Strip `@bot_username` from text (case-insensitive). +/// Handles Telegram convention: `/command@botname args` → `/command args` +fn strip_bot_mention(text: &str, bot_username: &str) -> String { + if bot_username.is_empty() { + return text.to_string(); + } + let at_bot = format!("@{}", bot_username); + let text_lower = text.to_lowercase(); + let at_bot_lower = at_bot.to_lowercase(); + if let Some(pos) = text_lower.find(&at_bot_lower) { + let mut result = String::with_capacity(text.len()); + result.push_str(&text[..pos]); + result.push_str(&text[pos + at_bot.len()..]); + result.trim().to_string() + } else { + text.to_string() + } +} + fn format_telegram_markdown(msg: &RichMessage) -> String { let mut text = String::new(); diff --git a/src-tauri/src/chat_channel/command_dispatcher.rs b/src-tauri/src/chat_channel/command_dispatcher.rs index c8c6cba..e7b3559 100644 --- a/src-tauri/src/chat_channel/command_dispatcher.rs +++ b/src-tauri/src/chat_channel/command_dispatcher.rs @@ -1,3 +1,5 @@ +use std::time::{Duration, Instant}; + use sea_orm::DatabaseConnection; use tokio::sync::mpsc; use tokio::task::JoinHandle; @@ -11,6 +13,40 @@ use crate::db::service::{app_metadata_service, chat_channel_message_log_service} const COMMAND_PREFIX_KEY: &str = "chat_command_prefix"; const DEFAULT_COMMAND_PREFIX: &str = "/"; const MESSAGE_LANGUAGE_KEY: &str = "chat_message_language"; +/// How often to refresh cached config from DB. +const CONFIG_CACHE_TTL_SECS: u64 = 30; + +struct CommandConfigCache { + prefix: String, + lang: Lang, + last_refresh: Instant, +} + +impl CommandConfigCache { + fn new() -> Self { + Self { + prefix: DEFAULT_COMMAND_PREFIX.to_string(), + lang: Lang::default(), + // Force refresh on first use + last_refresh: Instant::now() - Duration::from_secs(CONFIG_CACHE_TTL_SECS + 1), + } + } + + async fn refresh_if_needed(&mut self, db: &DatabaseConnection) { + if self.last_refresh.elapsed() < Duration::from_secs(CONFIG_CACHE_TTL_SECS) { + return; + } + + if let Ok(Some(val)) = app_metadata_service::get_value(db, COMMAND_PREFIX_KEY).await { + self.prefix = val; + } + if let Ok(Some(val)) = app_metadata_service::get_value(db, MESSAGE_LANGUAGE_KEY).await { + self.lang = Lang::from_str_lossy(&val); + } + + self.last_refresh = Instant::now(); + } +} pub fn spawn_command_dispatcher( mut command_rx: mpsc::Receiver, @@ -18,6 +54,8 @@ pub fn spawn_command_dispatcher( db_conn: DatabaseConnection, ) -> JoinHandle<()> { tokio::spawn(async move { + let mut config = CommandConfigCache::new(); + while let Some(cmd) = command_rx.recv().await { let text = cmd.command_text.trim(); @@ -33,15 +71,9 @@ pub fn spawn_command_dispatcher( ) .await; - let prefix = app_metadata_service::get_value(&db_conn, COMMAND_PREFIX_KEY) - .await - .ok() - .flatten() - .unwrap_or_else(|| DEFAULT_COMMAND_PREFIX.to_string()); + config.refresh_if_needed(&db_conn).await; - let lang = load_lang(&db_conn).await; - - let response = dispatch_command(text, &prefix, &db_conn, &manager, lang).await; + let response = dispatch_command(text, &config.prefix, &db_conn, &manager, config.lang).await; // Send response back via the same channel let send_result = manager.send_to_channel(cmd.channel_id, &response).await; @@ -70,15 +102,6 @@ pub fn spawn_command_dispatcher( }) } -async fn load_lang(db: &DatabaseConnection) -> Lang { - app_metadata_service::get_value(db, MESSAGE_LANGUAGE_KEY) - .await - .ok() - .flatten() - .map(|v| Lang::from_str_lossy(&v)) - .unwrap_or_default() -} - async fn dispatch_command( text: &str, prefix: &str, @@ -86,13 +109,12 @@ async fn dispatch_command( manager: &ChatChannelManager, lang: Lang, ) -> super::types::RichMessage { - // Check if text starts with the configured prefix - if !text.starts_with(prefix) { - return command_handlers::handle_help(prefix, lang); - } + // Strip prefix; if text doesn't start with it, show help + let without_prefix = match text.strip_prefix(prefix) { + Some(rest) => rest, + None => return command_handlers::handle_help(prefix, lang), + }; - // Strip prefix and parse command + args - let without_prefix = &text[prefix.len()..]; let parts: Vec<&str> = without_prefix.splitn(2, ' ').collect(); let command = parts[0].to_lowercase(); let args = parts.get(1).map(|s| s.trim()).unwrap_or(""); diff --git a/src-tauri/src/chat_channel/command_handlers.rs b/src-tauri/src/chat_channel/command_handlers.rs index 956238d..70d4a6f 100644 --- a/src-tauri/src/chat_channel/command_handlers.rs +++ b/src-tauri/src/chat_channel/command_handlers.rs @@ -1,5 +1,5 @@ use chrono::Utc; -use sea_orm::{ColumnTrait, DatabaseConnection, EntityTrait, QueryFilter, QueryOrder}; +use sea_orm::{ColumnTrait, DatabaseConnection, EntityTrait, QueryFilter, QueryOrder, QuerySelect}; use super::i18n::{self, Lang}; use super::manager::ChatChannelManager; @@ -7,9 +7,10 @@ use super::types::{MessageLevel, RichMessage}; use crate::db::entities::conversation; pub async fn handle_recent(db: &DatabaseConnection, lang: Lang) -> RichMessage { - let rows = match conversation::Entity::find() + let recent = match conversation::Entity::find() .filter(conversation::Column::DeletedAt.is_null()) .order_by_desc(conversation::Column::CreatedAt) + .limit(5) .all(db) .await { @@ -24,7 +25,6 @@ pub async fn handle_recent(db: &DatabaseConnection, lang: Lang) -> RichMessage { } }; - let recent: Vec<_> = rows.into_iter().take(5).collect(); if recent.is_empty() { return RichMessage::info(i18n::no_conversations(lang)) .with_title(i18n::recent_conversations_title(lang)); @@ -53,9 +53,11 @@ pub async fn handle_search( keyword: &str, lang: Lang, ) -> RichMessage { - let rows = match conversation::Entity::find() + let matched = match conversation::Entity::find() .filter(conversation::Column::DeletedAt.is_null()) + .filter(conversation::Column::Title.contains(keyword)) .order_by_desc(conversation::Column::CreatedAt) + .limit(10) .all(db) .await { @@ -70,18 +72,6 @@ pub async fn handle_search( } }; - let keyword_lower = keyword.to_lowercase(); - let matched: Vec<_> = rows - .into_iter() - .filter(|c| { - c.title - .as_deref() - .map(|t| t.to_lowercase().contains(&keyword_lower)) - .unwrap_or(false) - }) - .take(10) - .collect(); - if matched.is_empty() { return RichMessage::info(i18n::search_no_results(lang, keyword)) .with_title(i18n::search_results_title(lang)); diff --git a/src-tauri/src/chat_channel/event_subscriber.rs b/src-tauri/src/chat_channel/event_subscriber.rs index 12e697e..16b1dc1 100644 --- a/src-tauri/src/chat_channel/event_subscriber.rs +++ b/src-tauri/src/chat_channel/event_subscriber.rs @@ -14,7 +14,68 @@ use crate::web::event_bridge::WebEventBroadcaster; /// Minimum interval between pushes for the same event type per channel (debounce). const DEBOUNCE_SECS: u64 = 5; +/// How often to refresh cached config from DB. +const CONFIG_CACHE_TTL_SECS: u64 = 30; + const MESSAGE_LANGUAGE_KEY: &str = "chat_message_language"; +const EVENT_FILTER_KEY: &str = "chat_event_filter"; + +struct CachedChannel { + id: i32, + event_filter_json: Option, +} + +struct EventConfigCache { + lang: Lang, + global_filter: Option>, + enabled_channels: Vec, + last_refresh: Instant, +} + +impl EventConfigCache { + fn new() -> Self { + Self { + lang: Lang::default(), + global_filter: None, + enabled_channels: Vec::new(), + // Force refresh on first use + last_refresh: Instant::now() - Duration::from_secs(CONFIG_CACHE_TTL_SECS + 1), + } + } + + async fn refresh_if_needed(&mut self, db: &DatabaseConnection) { + if self.last_refresh.elapsed() < Duration::from_secs(CONFIG_CACHE_TTL_SECS) { + return; + } + + if let Ok(Some(val)) = app_metadata_service::get_value(db, MESSAGE_LANGUAGE_KEY).await { + self.lang = Lang::from_str_lossy(&val); + } + + // Parse as Option> so JSON "null" → None (intentional, not accidental) + self.global_filter = app_metadata_service::get_value(db, EVENT_FILTER_KEY) + .await + .ok() + .flatten() + .and_then(|json| { + serde_json::from_str::>>(&json) + .ok() + .flatten() + }); + + if let Ok(channels) = chat_channel_service::list_enabled(db).await { + self.enabled_channels = channels + .into_iter() + .map(|ch| CachedChannel { + id: ch.id, + event_filter_json: ch.event_filter_json, + }) + .collect(); + } + + self.last_refresh = Instant::now(); + } +} pub fn spawn_event_subscriber( broadcaster: Arc, @@ -24,6 +85,7 @@ pub fn spawn_event_subscriber( tokio::spawn(async move { let mut rx = broadcaster.subscribe(); let mut last_push: HashMap<(i32, String), Instant> = HashMap::new(); + let mut config = EventConfigCache::new(); loop { let event = match rx.recv().await { @@ -38,82 +100,72 @@ pub fn spawn_event_subscriber( } }; - let lang = load_lang(&db_conn).await; + config.refresh_if_needed(&db_conn).await; - let message = match parse_event(&event.channel, &event.payload, lang) { - Some((event_type, msg)) => { - // Global event filter check - let global_filter = app_metadata_service::get_value(&db_conn, "chat_event_filter") - .await - .ok() - .flatten() - .and_then(|json| serde_json::from_str::>(&json).ok()); + // Prune stale debounce entries + last_push.retain(|_, t| t.elapsed() < Duration::from_secs(DEBOUNCE_SECS * 2)); - if let Some(filter) = &global_filter { - if !filter.contains(&event_type) { - continue; - } + if let Some((event_type, msg)) = + parse_event(&event.channel, &event.payload, config.lang) + { + // Global event filter check + if let Some(filter) = &config.global_filter { + if !filter.contains(&event_type) { + continue; } + } - // Check enabled channels and forward - let channels = match chat_channel_service::list_enabled(&db_conn).await { - Ok(c) => c, - Err(e) => { - eprintln!("[ChatChannel] failed to list channels: {e}"); - continue; - } - }; - - for ch in &channels { - // Debounce - let key = (ch.id, event_type.clone()); - let now = Instant::now(); - if let Some(last) = last_push.get(&key) { - if now.duration_since(*last) < Duration::from_secs(DEBOUNCE_SECS) { + for ch in &config.enabled_channels { + // Per-channel event filter + if let Some(filter_json) = &ch.event_filter_json { + if let Ok(filter) = serde_json::from_str::>(filter_json) { + if !filter.contains(&event_type) { continue; } } - last_push.insert(key, now); - - // Send - let send_result = manager.send_to_channel(ch.id, &msg).await; - let (status, error_detail) = match &send_result { - Ok(_) => ("sent", None), - Err(e) => ("failed", Some(e.to_string())), - }; - - let _ = chat_channel_message_log_service::create_log( - &db_conn, - ch.id, - "outbound", - "event_push", - &msg.to_plain_text(), - status, - error_detail, - ) - .await; } - Some(msg) - } - None => None, - }; + // Debounce: skip if same event type was pushed to this channel recently + let key = (ch.id, event_type.clone()); + let now = Instant::now(); + if let Some(last) = last_push.get(&key) { + if now.duration_since(*last) < Duration::from_secs(DEBOUNCE_SECS) { + continue; + } + } - drop(message); + // Send + let send_result = manager.send_to_channel(ch.id, &msg).await; + let (status, error_detail) = match &send_result { + Ok(_) => { + // Only update debounce timestamp on success + last_push.insert(key, now); + ("sent", None) + } + Err(e) => ("failed", Some(e.to_string())), + }; + + let _ = chat_channel_message_log_service::create_log( + &db_conn, + ch.id, + "outbound", + "event_push", + &msg.to_plain_text(), + status, + error_detail, + ) + .await; + } + } } }) } -async fn load_lang(db: &DatabaseConnection) -> Lang { - app_metadata_service::get_value(db, MESSAGE_LANGUAGE_KEY) - .await - .ok() - .flatten() - .map(|v| Lang::from_str_lossy(&v)) - .unwrap_or_default() -} - -fn parse_event(channel: &str, payload: &serde_json::Value, lang: Lang) -> Option<(String, RichMessage)> { +fn parse_event( + channel: &str, + payload: &serde_json::Value, + lang: Lang, +) -> Option<(String, RichMessage)> { match channel { "acp://event" => parse_acp_event(payload, lang), _ => None, diff --git a/src-tauri/src/chat_channel/manager.rs b/src-tauri/src/chat_channel/manager.rs index 85630b6..974f836 100644 --- a/src-tauri/src/chat_channel/manager.rs +++ b/src-tauri/src/chat_channel/manager.rs @@ -13,7 +13,7 @@ struct ActiveChannel { id: i32, name: String, channel_type: ChannelType, - backend: Box, + backend: Arc, } /// Inner state shared across clones. @@ -21,6 +21,7 @@ struct Inner { channels: Mutex>, command_tx: mpsc::Sender, command_rx: Mutex>>, + broadcaster: Mutex>>, } pub struct ChatChannelManager { @@ -41,6 +42,7 @@ impl ChatChannelManager { channels: Mutex::new(HashMap::new()), command_tx, command_rx: Mutex::new(Some(command_rx)), + broadcaster: Mutex::new(None), }), } } @@ -61,6 +63,19 @@ impl ChatChannelManager { self.inner.command_rx.lock().await.take() } + /// Emit a status change event to the frontend via broadcaster. + async fn emit_status_event(&self, channel_id: i32, status: &str) { + if let Some(broadcaster) = self.inner.broadcaster.lock().await.as_ref() { + broadcaster.send( + "chat-channel://status", + &serde_json::json!({ + "channel_id": channel_id, + "status": status, + }), + ); + } + } + pub async fn add_channel( &self, id: i32, @@ -68,6 +83,14 @@ impl ChatChannelManager { channel_type: ChannelType, backend: Box, ) -> Result<(), ChatChannelError> { + let backend: Arc = Arc::from(backend); + + // Stop existing channel if present (prevents task leak on duplicate connect) + let old = self.inner.channels.lock().await.remove(&id); + if let Some(existing) = old { + let _ = existing.backend.stop().await; + } + let command_tx = self.inner.command_tx.clone(); backend.start(command_tx).await?; @@ -79,20 +102,25 @@ impl ChatChannelManager { }; self.inner.channels.lock().await.insert(id, channel); + self.emit_status_event(id, "connected").await; Ok(()) } pub async fn remove_channel(&self, id: i32) -> Result<(), ChatChannelError> { - let mut channels = self.inner.channels.lock().await; - if let Some(channel) = channels.remove(&id) { + let removed = self.inner.channels.lock().await.remove(&id); + if let Some(channel) = removed { channel.backend.stop().await?; + self.emit_status_event(id, "disconnected").await; } Ok(()) } pub async fn stop_all(&self) { - let mut channels = self.inner.channels.lock().await; - for (_, channel) in channels.drain() { + let drained: Vec = { + let mut channels = self.inner.channels.lock().await; + channels.drain().map(|(_, ch)| ch).collect() + }; + for channel in drained { let _ = channel.backend.stop().await; } } @@ -102,29 +130,49 @@ impl ChatChannelManager { channel_id: i32, message: &RichMessage, ) -> Result { - let channels = self.inner.channels.lock().await; - let channel = channels - .get(&channel_id) - .ok_or(ChatChannelError::NotFound(channel_id))?; - channel.backend.send_rich_message(message).await + let backend = { + let channels = self.inner.channels.lock().await; + channels + .get(&channel_id) + .ok_or(ChatChannelError::NotFound(channel_id))? + .backend + .clone() + }; + backend.send_rich_message(message).await } pub async fn send_to_all(&self, message: &RichMessage) { - let channels = self.inner.channels.lock().await; - for (_, channel) in channels.iter() { - let _ = channel.backend.send_rich_message(message).await; + let backends: Vec> = { + let channels = self.inner.channels.lock().await; + channels.values().map(|ch| ch.backend.clone()).collect() + }; + for backend in backends { + let _ = backend.send_rich_message(message).await; } } pub async fn get_status(&self) -> Vec { - let channels = self.inner.channels.lock().await; - let mut result = Vec::new(); - for (_, ch) in channels.iter() { - let status = ch.backend.status().await; + let entries: Vec<(i32, String, String, Arc)> = { + let channels = self.inner.channels.lock().await; + channels + .values() + .map(|ch| { + ( + ch.id, + ch.name.clone(), + ch.channel_type.to_string(), + ch.backend.clone(), + ) + }) + .collect() + }; + let mut result = Vec::with_capacity(entries.len()); + for (id, name, ct, backend) in entries { + let status = backend.status().await; result.push(crate::models::ChannelStatusInfo { - channel_id: ch.id, - name: ch.name.clone(), - channel_type: ch.channel_type.to_string(), + channel_id: id, + name, + channel_type: ct, status: serde_json::to_value(status) .ok() .and_then(|v| v.as_str().map(String::from)) @@ -135,17 +183,24 @@ impl ChatChannelManager { } pub async fn test_channel(&self, id: i32) -> Result<(), ChatChannelError> { - let channels = self.inner.channels.lock().await; - let channel = channels - .get(&id) - .ok_or(ChatChannelError::NotFound(id))?; - channel.backend.test_connection().await + let backend = { + let channels = self.inner.channels.lock().await; + channels + .get(&id) + .ok_or(ChatChannelError::NotFound(id))? + .backend + .clone() + }; + backend.test_connection().await } pub async fn is_connected(&self, id: i32) -> bool { - let channels = self.inner.channels.lock().await; - if let Some(ch) = channels.get(&id) { - ch.backend.status().await == ChannelConnectionStatus::Connected + let backend = { + let channels = self.inner.channels.lock().await; + channels.get(&id).map(|ch| ch.backend.clone()) + }; + if let Some(b) = backend { + b.status().await == ChannelConnectionStatus::Connected } else { false } @@ -158,6 +213,9 @@ impl ChatChannelManager { broadcaster: Arc, db_conn: DatabaseConnection, ) { + // Store broadcaster for status event emission + *self.inner.broadcaster.lock().await = Some(broadcaster.clone()); + let db_conn2 = db_conn.clone(); // Spawn event subscriber @@ -201,54 +259,53 @@ impl ChatChannelManager { serde_json::Value::String(ch.channel_type.clone()), ) { Ok(t) => t, - Err(_) => continue, + Err(_) => { + eprintln!( + "[ChatChannel] unknown channel type '{}' for '{}' (id={}), skipping", + ch.channel_type, ch.name, ch.id + ); + continue; + } }; let config: serde_json::Value = match serde_json::from_str(&ch.config_json) { Ok(v) => v, - Err(_) => continue, + Err(e) => { + eprintln!( + "[ChatChannel] invalid config for '{}' (id={}): {e}, skipping", + ch.name, ch.id + ); + continue; + } }; let token = match crate::keyring_store::get_channel_token(ch.id) { Some(t) => t, - None => continue, - }; - - let backend: Box = match channel_type { - ChannelType::Telegram => { - let chat_id = config - .get("chat_id") - .and_then(|v| v.as_str()) - .unwrap_or("") - .to_string(); - if chat_id.is_empty() { - continue; - } - Box::new(super::backends::telegram::TelegramBackend::new( - ch.id, token, chat_id, - )) - } - ChannelType::Lark => { - let app_id = config - .get("app_id") - .and_then(|v| v.as_str()) - .unwrap_or("") - .to_string(); - let chat_id = config - .get("chat_id") - .and_then(|v| v.as_str()) - .unwrap_or("") - .to_string(); - if app_id.is_empty() || chat_id.is_empty() { - continue; - } - Box::new(super::backends::lark::LarkBackend::new( - ch.id, app_id, token, chat_id, - )) + None => { + eprintln!( + "[ChatChannel] no token found for '{}' (id={}), skipping auto-connect", + ch.name, ch.id + ); + continue; } }; - if let Err(e) = self.add_channel(ch.id, ch.name.clone(), channel_type, backend).await { + let backend = + match super::backends::create_backend(ch.id, channel_type, &config, token) { + Ok(b) => b, + Err(e) => { + eprintln!( + "[ChatChannel] failed to create backend for '{}' (id={}): {e}", + ch.name, ch.id + ); + continue; + } + }; + + if let Err(e) = self + .add_channel(ch.id, ch.name.clone(), channel_type, backend) + .await + { eprintln!( "[ChatChannel] failed to auto-connect '{}' (id={}): {e}", ch.name, ch.id diff --git a/src-tauri/src/chat_channel/scheduler.rs b/src-tauri/src/chat_channel/scheduler.rs index 8d3f64c..b5a3aec 100644 --- a/src-tauri/src/chat_channel/scheduler.rs +++ b/src-tauri/src/chat_channel/scheduler.rs @@ -11,6 +11,8 @@ use crate::db::entities::conversation; use crate::db::service::{app_metadata_service, chat_channel_message_log_service, chat_channel_service}; const MESSAGE_LANGUAGE_KEY: &str = "chat_message_language"; +/// Days to retain message logs before cleanup. +const LOG_RETENTION_DAYS: i64 = 30; pub fn spawn_daily_report_scheduler( manager: ChatChannelManager, @@ -18,6 +20,7 @@ pub fn spawn_daily_report_scheduler( ) -> JoinHandle<()> { tokio::spawn(async move { let mut sent_today: HashSet<(i32, NaiveDate)> = HashSet::new(); + let mut last_cleanup_date: Option = None; loop { tokio::time::sleep(tokio::time::Duration::from_secs(60)).await; @@ -29,6 +32,21 @@ pub fn spawn_daily_report_scheduler( // Clean up old entries from sent_today sent_today.retain(|(_, date)| *date == today); + // Periodic log cleanup: once per day + if last_cleanup_date != Some(today) { + last_cleanup_date = Some(today); + let cutoff = Utc::now() - chrono::Duration::days(LOG_RETENTION_DAYS); + match chat_channel_message_log_service::cleanup_old_logs(&db_conn, cutoff).await { + Ok(n) if n > 0 => { + eprintln!("[ChatChannel] cleaned up {n} old message logs"); + } + Err(e) => { + eprintln!("[ChatChannel] log cleanup failed: {e}"); + } + _ => {} + } + } + let channels = match chat_channel_service::list_enabled(&db_conn).await { Ok(c) => c, Err(e) => { diff --git a/src-tauri/src/chat_channel/types.rs b/src-tauri/src/chat_channel/types.rs index 263468f..fe8533c 100644 --- a/src-tauri/src/chat_channel/types.rs +++ b/src-tauri/src/chat_channel/types.rs @@ -7,6 +7,19 @@ pub enum ChannelType { Telegram, } +// ── Per-channel strong typed configs ── + +#[derive(Debug, Clone, Deserialize)] +pub struct TelegramConfig { + pub chat_id: String, +} + +#[derive(Debug, Clone, Deserialize)] +pub struct LarkConfig { + pub app_id: String, + pub chat_id: String, +} + impl std::fmt::Display for ChannelType { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { diff --git a/src-tauri/src/commands/chat_channel.rs b/src-tauri/src/commands/chat_channel.rs index 093be25..b74147a 100644 --- a/src-tauri/src/commands/chat_channel.rs +++ b/src-tauri/src/commands/chat_channel.rs @@ -1,8 +1,5 @@ use crate::app_error::AppCommandError; -use crate::chat_channel::backends::lark::LarkBackend; -use crate::chat_channel::backends::telegram::TelegramBackend; use crate::chat_channel::manager::ChatChannelManager; -use crate::chat_channel::traits::ChatChannelBackend; use crate::chat_channel::types::ChannelType; use crate::db::service::{chat_channel_message_log_service, chat_channel_service}; use crate::db::AppDatabase; @@ -76,7 +73,13 @@ pub async fn update_chat_channel_core( Ok(ChatChannelInfo::from(model)) } -pub async fn delete_chat_channel_core(db: &AppDatabase, id: i32) -> Result<(), AppCommandError> { +pub async fn delete_chat_channel_core( + db: &AppDatabase, + manager: &ChatChannelManager, + id: i32, +) -> Result<(), AppCommandError> { + // Disconnect running backend before deleting from DB (prevents orphaned task) + let _ = manager.remove_channel(id).await; chat_channel_service::delete(&db.conn, id) .await .map_err(AppCommandError::from)?; @@ -103,45 +106,17 @@ pub async fn connect_chat_channel_core( )) })?; - let backend: Box = match channel_type { - ChannelType::Telegram => { - let config: serde_json::Value = - serde_json::from_str(&model.config_json).map_err(|e| { - AppCommandError::configuration_invalid("Invalid config JSON") - .with_detail(e.to_string()) - })?; - let chat_id = config - .get("chat_id") - .and_then(|v| v.as_str()) - .ok_or_else(|| AppCommandError::configuration_missing("chat_id is required"))? - .to_string(); - let bot_token = crate::keyring_store::get_channel_token(id).ok_or_else(|| { - AppCommandError::configuration_missing("Bot token not set") - })?; - Box::new(TelegramBackend::new(id, bot_token, chat_id)) - } - ChannelType::Lark => { - let config: serde_json::Value = - serde_json::from_str(&model.config_json).map_err(|e| { - AppCommandError::configuration_invalid("Invalid config JSON") - .with_detail(e.to_string()) - })?; - let app_id = config - .get("app_id") - .and_then(|v| v.as_str()) - .ok_or_else(|| AppCommandError::configuration_missing("app_id is required"))? - .to_string(); - let chat_id = config - .get("chat_id") - .and_then(|v| v.as_str()) - .ok_or_else(|| AppCommandError::configuration_missing("chat_id is required"))? - .to_string(); - let app_secret = crate::keyring_store::get_channel_token(id).ok_or_else(|| { - AppCommandError::configuration_missing("App Secret not set") - })?; - Box::new(LarkBackend::new(id, app_id, app_secret, chat_id)) - } - }; + let config: serde_json::Value = + serde_json::from_str(&model.config_json).map_err(|e| { + AppCommandError::configuration_invalid("Invalid config JSON") + .with_detail(e.to_string()) + })?; + + let token = crate::keyring_store::get_channel_token(id) + .ok_or_else(|| AppCommandError::configuration_missing("Token not set"))?; + + let backend = crate::chat_channel::backends::create_backend(id, channel_type, &config, token) + .map_err(AppCommandError::from)?; manager .add_channel(id, model.name, channel_type, backend) @@ -169,53 +144,22 @@ pub async fn test_chat_channel_core( )) })?; - match channel_type { - ChannelType::Telegram => { - let config: serde_json::Value = - serde_json::from_str(&model.config_json).map_err(|e| { - AppCommandError::configuration_invalid("Invalid config JSON") - .with_detail(e.to_string()) - })?; - let chat_id = config - .get("chat_id") - .and_then(|v| v.as_str()) - .ok_or_else(|| AppCommandError::configuration_missing("chat_id is required"))? - .to_string(); - let bot_token = crate::keyring_store::get_channel_token(id).ok_or_else(|| { - AppCommandError::configuration_missing("Bot token not set") - })?; - let backend = TelegramBackend::new(id, bot_token, chat_id); - backend - .test_connection() - .await - .map_err(AppCommandError::from)?; - } - ChannelType::Lark => { - let config: serde_json::Value = - serde_json::from_str(&model.config_json).map_err(|e| { - AppCommandError::configuration_invalid("Invalid config JSON") - .with_detail(e.to_string()) - })?; - let app_id = config - .get("app_id") - .and_then(|v| v.as_str()) - .ok_or_else(|| AppCommandError::configuration_missing("app_id is required"))? - .to_string(); - let chat_id = config - .get("chat_id") - .and_then(|v| v.as_str()) - .ok_or_else(|| AppCommandError::configuration_missing("chat_id is required"))? - .to_string(); - let app_secret = crate::keyring_store::get_channel_token(id).ok_or_else(|| { - AppCommandError::configuration_missing("App Secret not set") - })?; - let backend = LarkBackend::new(id, app_id, app_secret, chat_id); - backend - .test_connection() - .await - .map_err(AppCommandError::from)?; - } - } + let config: serde_json::Value = + serde_json::from_str(&model.config_json).map_err(|e| { + AppCommandError::configuration_invalid("Invalid config JSON") + .with_detail(e.to_string()) + })?; + + let token = crate::keyring_store::get_channel_token(id) + .ok_or_else(|| AppCommandError::configuration_missing("Token not set"))?; + + let backend = crate::chat_channel::backends::create_backend(id, channel_type, &config, token) + .map_err(AppCommandError::from)?; + + backend + .test_connection() + .await + .map_err(AppCommandError::from)?; Ok(()) } @@ -342,9 +286,10 @@ pub async fn get_chat_event_filter_core( .map_err(AppCommandError::from)?; match val { Some(json) => { - let arr: Vec = - serde_json::from_str(&json).map_err(|e| AppCommandError::invalid_input(e.to_string()))?; - Ok(Some(arr)) + // Parse as Option> to correctly handle stored "null" + let filter: Option> = serde_json::from_str(&json) + .map_err(|e| AppCommandError::invalid_input(e.to_string()))?; + Ok(filter) } None => Ok(None), } @@ -426,9 +371,10 @@ pub async fn update_chat_channel( #[tauri::command] pub async fn delete_chat_channel( db: tauri::State<'_, AppDatabase>, + manager: tauri::State<'_, ChatChannelManager>, id: i32, ) -> Result<(), AppCommandError> { - delete_chat_channel_core(&db, id).await + delete_chat_channel_core(&db, &manager, id).await } #[cfg(feature = "tauri-runtime")] diff --git a/src-tauri/src/web/handlers/chat_channel.rs b/src-tauri/src/web/handlers/chat_channel.rs index 3c106c9..b248bb1 100644 --- a/src-tauri/src/web/handlers/chat_channel.rs +++ b/src-tauri/src/web/handlers/chat_channel.rs @@ -112,7 +112,8 @@ pub async fn delete_chat_channel( Extension(state): Extension>, Json(params): Json, ) -> Result, AppCommandError> { - cc_commands::delete_chat_channel_core(&state.db, params.id).await?; + cc_commands::delete_chat_channel_core(&state.db, &state.chat_channel_manager, params.id) + .await?; Ok(Json(())) } diff --git a/src/components/settings/channel-list-tab.tsx b/src/components/settings/channel-list-tab.tsx index fc186c6..e61fbd2 100644 --- a/src/components/settings/channel-list-tab.tsx +++ b/src/components/settings/channel-list-tab.tsx @@ -36,6 +36,7 @@ import { updateChatChannel, getChatChannelStatus, } from "@/lib/api" +import { subscribe } from "@/lib/platform" import type { ChatChannelInfo, ChannelStatusInfo } from "@/lib/types" import { AddChatChannelDialog } from "./add-chat-channel-dialog" import { EditChatChannelDialog } from "./edit-chat-channel-dialog" @@ -69,6 +70,33 @@ export function ChannelListTab() { loadChannels().catch(console.error) }, [loadChannels]) + // Subscribe to real-time status change events from backend + useEffect(() => { + let cancelled = false + let unsub: (() => void) | undefined + subscribe<{ + channel_id: number + status: ChannelStatusInfo["status"] + }>("chat-channel://status", (payload) => { + setStatuses((prev) => { + const idx = prev.findIndex((s) => s.channel_id === payload.channel_id) + if (idx >= 0) { + const updated = [...prev] + updated[idx] = { ...updated[idx], status: payload.status } + return updated + } + return prev + }) + }).then((fn) => { + if (cancelled) fn() + else unsub = fn + }) + return () => { + cancelled = true + unsub?.() + } + }, []) + const handleToggleEnabled = useCallback( async (ch: ChatChannelInfo, connected: boolean) => { try {