初步集成消息通道,支持Telegram + Lark机器人

This commit is contained in:
xintaofei
2026-03-30 22:51:49 +08:00
parent 544abbd15d
commit d18cec33bf
44 changed files with 4106 additions and 11 deletions

View File

@@ -0,0 +1,622 @@
use std::collections::HashMap;
use std::sync::Arc;
use std::time::{Duration, Instant};
use async_trait::async_trait;
use futures_util::{SinkExt, StreamExt};
use prost::Message as ProstMessage;
use serde::{Deserialize, Serialize};
use tokio::sync::{mpsc, Mutex, RwLock};
use tokio_tungstenite::tungstenite;
use crate::chat_channel::error::ChatChannelError;
use crate::chat_channel::traits::ChatChannelBackend;
use crate::chat_channel::types::*;
const FEISHU_BASE_URL: &str = "https://open.feishu.cn";
const TOKEN_REFRESH_MARGIN_SECS: u64 = 300;
// ── Lark WebSocket protobuf Frame (pbbp2) ──
// Source: larksuite/oapi-sdk-go ws/pbbp2.pb.go
const FRAME_METHOD_CONTROL: i32 = 0; // Ping/Pong
const FRAME_METHOD_DATA: i32 = 1; // Event/Card
#[derive(Clone, PartialEq, ProstMessage)]
struct Frame {
#[prost(uint64, tag = 1)]
seq_id: u64,
#[prost(uint64, tag = 2)]
log_id: u64,
#[prost(int32, tag = 3)]
service: i32,
#[prost(int32, tag = 4)]
method: i32,
#[prost(message, repeated, tag = 5)]
headers: Vec<FrameHeader>,
#[prost(string, tag = 6)]
payload_encoding: String,
#[prost(string, tag = 7)]
payload_type: String,
#[prost(bytes = "vec", tag = 8)]
payload: Vec<u8>,
#[prost(string, tag = 9)]
log_id_new: String,
}
#[derive(Clone, PartialEq, ProstMessage)]
struct FrameHeader {
#[prost(string, tag = 1)]
key: String,
#[prost(string, tag = 2)]
value: String,
}
impl Frame {
fn get_header(&self, key: &str) -> Option<&str> {
self.headers
.iter()
.find(|h| h.key == key)
.map(|h| h.value.as_str())
}
fn set_header(&mut self, key: &str, value: &str) {
if let Some(h) = self.headers.iter_mut().find(|h| h.key == key) {
h.value = value.to_string();
} else {
self.headers.push(FrameHeader {
key: key.to_string(),
value: value.to_string(),
});
}
}
}
// ── Lark REST API types ──
#[derive(Deserialize)]
struct TenantAccessTokenResponse {
code: i32,
msg: String,
tenant_access_token: Option<String>,
expire: Option<u64>,
}
#[derive(Serialize)]
struct SendMessageRequest {
receive_id: String,
msg_type: String,
content: String,
}
#[derive(Deserialize)]
struct SendMessageResponse {
code: i32,
msg: String,
data: Option<SendMessageData>,
}
#[derive(Deserialize)]
struct SendMessageData {
message_id: Option<String>,
}
#[derive(Deserialize)]
struct WsConnectResponse {
code: i32,
msg: String,
data: Option<WsConnectData>,
}
#[derive(Deserialize)]
struct WsConnectData {
#[serde(rename = "URL")]
url: Option<String>,
}
// ── Token cache ──
struct TokenCache {
token: String,
expires_at: Instant,
}
// ── Multi-part frame cache ──
struct PartialMessage {
parts: HashMap<i32, Vec<u8>>,
total: i32,
}
// ── LarkBackend ──
pub struct LarkBackend {
app_id: String,
app_secret: String,
chat_id: String,
channel_id: i32,
client: reqwest::Client,
token_cache: Arc<RwLock<Option<TokenCache>>>,
status: Arc<Mutex<ChannelConnectionStatus>>,
shutdown_tx: Arc<Mutex<Option<tokio::sync::watch::Sender<bool>>>>,
}
impl LarkBackend {
pub fn new(channel_id: i32, app_id: String, app_secret: String, chat_id: String) -> Self {
Self {
app_id,
app_secret,
chat_id,
channel_id,
client: reqwest::Client::new(),
token_cache: Arc::new(RwLock::new(None)),
status: Arc::new(Mutex::new(ChannelConnectionStatus::Disconnected)),
shutdown_tx: Arc::new(Mutex::new(None)),
}
}
async fn get_tenant_access_token(&self) -> Result<String, ChatChannelError> {
{
let cache = self.token_cache.read().await;
if let Some(cached) = cache.as_ref() {
if cached.expires_at > Instant::now() {
return Ok(cached.token.clone());
}
}
}
let resp = self
.client
.post(format!(
"{}/open-apis/auth/v3/tenant_access_token/internal",
FEISHU_BASE_URL
))
.json(&serde_json::json!({
"app_id": self.app_id,
"app_secret": self.app_secret,
}))
.send()
.await
.map_err(|e| ChatChannelError::AuthenticationFailed(e.to_string()))?;
let result: TenantAccessTokenResponse = resp
.json()
.await
.map_err(|e| ChatChannelError::AuthenticationFailed(e.to_string()))?;
if result.code != 0 {
return Err(ChatChannelError::AuthenticationFailed(format!(
"code={}, msg={}",
result.code, result.msg
)));
}
let token = result
.tenant_access_token
.ok_or_else(|| {
ChatChannelError::AuthenticationFailed("No token in response".into())
})?;
let expire_secs = result.expire.unwrap_or(7200);
let expires_at = Instant::now()
+ Duration::from_secs(expire_secs.saturating_sub(TOKEN_REFRESH_MARGIN_SECS));
*self.token_cache.write().await = Some(TokenCache {
token: token.clone(),
expires_at,
});
Ok(token)
}
async fn send_lark_message(
&self,
msg_type: &str,
content: &str,
) -> Result<SentMessageId, ChatChannelError> {
let token = self.get_tenant_access_token().await?;
let resp = self
.client
.post(format!(
"{}/open-apis/im/v1/messages?receive_id_type=chat_id",
FEISHU_BASE_URL
))
.header("Authorization", format!("Bearer {}", token))
.json(&SendMessageRequest {
receive_id: self.chat_id.clone(),
msg_type: msg_type.to_string(),
content: content.to_string(),
})
.send()
.await
.map_err(|e| ChatChannelError::SendFailed(e.to_string()))?;
let result: SendMessageResponse = resp
.json()
.await
.map_err(|e| ChatChannelError::SendFailed(e.to_string()))?;
if result.code != 0 {
return Err(ChatChannelError::SendFailed(format!(
"code={}, msg={}",
result.code, result.msg
)));
}
let message_id = result
.data
.and_then(|d| d.message_id)
.unwrap_or_default();
Ok(SentMessageId(message_id))
}
async fn start_ws_receiver(
&self,
command_tx: mpsc::Sender<IncomingCommand>,
) -> Result<(), ChatChannelError> {
// Verify we can get a WS URL before spawning the background task
let _ = fetch_ws_url(&self.client, &self.app_id, &self.app_secret).await?;
let (shutdown_tx, mut shutdown_rx) = tokio::sync::watch::channel(false);
*self.shutdown_tx.lock().await = Some(shutdown_tx);
let channel_id = self.channel_id;
let status = self.status.clone();
let app_id = self.app_id.clone();
let app_secret = self.app_secret.clone();
let client = self.client.clone();
tokio::spawn(async move {
let mut retry_count = 0u32;
loop {
if *shutdown_rx.borrow() {
break;
}
let ws_url = match fetch_ws_url(&client, &app_id, &app_secret).await {
Ok(url) => url,
Err(e) => {
eprintln!("[Lark] failed to get WS endpoint: {e}");
*status.lock().await = ChannelConnectionStatus::Error;
let delay = Duration::from_secs((2u64).pow(retry_count.min(5)));
retry_count += 1;
tokio::select! {
_ = tokio::time::sleep(delay) => continue,
_ = shutdown_rx.changed() => break,
}
}
};
let ws_result = tokio_tungstenite::connect_async(&ws_url).await;
let ws_stream = match ws_result {
Ok((stream, _)) => {
*status.lock().await = ChannelConnectionStatus::Connected;
retry_count = 0;
eprintln!("[Lark] WebSocket connected");
stream
}
Err(e) => {
eprintln!("[Lark] WebSocket connect failed: {e}");
*status.lock().await = ChannelConnectionStatus::Error;
let delay = Duration::from_secs((2u64).pow(retry_count.min(5)));
retry_count += 1;
tokio::select! {
_ = tokio::time::sleep(delay) => continue,
_ = shutdown_rx.changed() => break,
}
}
};
let (mut write, mut read) = ws_stream.split();
let mut partial_msgs: HashMap<String, PartialMessage> = HashMap::new();
loop {
tokio::select! {
msg = read.next() => {
match msg {
Some(Ok(tungstenite::Message::Binary(data))) => {
match Frame::decode(data.as_ref()) {
Ok(frame) => {
let frame_type = frame.get_header("type").unwrap_or("").to_string();
if frame.method == FRAME_METHOD_CONTROL {
// Control frame: ping → respond with pong
if frame_type == "ping" {
let mut pong = frame.clone();
// Clear type header and set to pong
pong.set_header("type", "pong");
pong.payload = Vec::new();
let mut buf = Vec::new();
if pong.encode(&mut buf).is_ok() {
let _ = write.send(tungstenite::Message::Binary(buf.into())).await;
}
}
} else if frame.method == FRAME_METHOD_DATA && frame_type == "event" {
let start = Instant::now();
// Multi-part reassembly
let msg_id = frame.get_header("message_id").unwrap_or("").to_string();
let sum: i32 = frame.get_header("sum").and_then(|s| s.parse().ok()).unwrap_or(1);
let seq: i32 = frame.get_header("seq").and_then(|s| s.parse().ok()).unwrap_or(0);
let full_payload = if sum <= 1 {
Some(frame.payload.clone())
} else {
let entry = partial_msgs.entry(msg_id.clone()).or_insert_with(|| PartialMessage {
parts: HashMap::new(),
total: sum,
});
entry.parts.insert(seq, frame.payload.clone());
if entry.parts.len() as i32 >= entry.total {
// All parts received — reassemble in order
let mut combined = Vec::new();
for i in 0..entry.total {
if let Some(part) = entry.parts.get(&i) {
combined.extend_from_slice(part);
}
}
partial_msgs.remove(&msg_id);
Some(combined)
} else {
None // Still waiting for more parts
}
};
if let Some(payload_bytes) = full_payload {
// Process event
if let Ok(payload_str) = std::str::from_utf8(&payload_bytes) {
if let Ok(event) = serde_json::from_str::<serde_json::Value>(payload_str) {
handle_lark_event(&event, channel_id, &command_tx).await;
} else {
eprintln!("[Lark] event payload is not valid JSON");
}
}
// Send acknowledgment: echo frame back with {"code":200}
let elapsed_ms = start.elapsed().as_millis();
let mut ack = frame.clone();
ack.payload = br#"{"code":200}"#.to_vec();
ack.set_header("biz_rt", &elapsed_ms.to_string());
let mut buf = Vec::new();
if ack.encode(&mut buf).is_ok() {
let _ = write.send(tungstenite::Message::Binary(buf.into())).await;
}
}
}
}
Err(e) => {
eprintln!("[Lark] protobuf decode error: {e}, len={}", data.len());
}
}
}
Some(Ok(tungstenite::Message::Ping(data))) => {
let _ = write.send(tungstenite::Message::Pong(data)).await;
}
Some(Ok(tungstenite::Message::Close(_))) | None => {
eprintln!("[Lark] WebSocket closed, will reconnect");
break;
}
Some(Err(e)) => {
eprintln!("[Lark] WebSocket error: {e}");
break;
}
_ => {}
}
}
_ = shutdown_rx.changed() => {
let _ = write.close().await;
*status.lock().await = ChannelConnectionStatus::Disconnected;
return;
}
}
}
*status.lock().await = ChannelConnectionStatus::Connecting;
let delay = Duration::from_secs(3);
tokio::select! {
_ = tokio::time::sleep(delay) => {},
_ = shutdown_rx.changed() => break,
}
}
*status.lock().await = ChannelConnectionStatus::Disconnected;
});
Ok(())
}
}
async fn handle_lark_event(
event: &serde_json::Value,
channel_id: i32,
command_tx: &mpsc::Sender<IncomingCommand>,
) {
let event_type = event
.pointer("/header/event_type")
.and_then(|v| v.as_str())
.unwrap_or("");
if event_type == "im.message.receive_v1" {
let msg_type = event
.pointer("/event/message/message_type")
.and_then(|v| v.as_str())
.unwrap_or("");
if msg_type != "text" {
return;
}
let content_str = event
.pointer("/event/message/content")
.and_then(|v| v.as_str())
.unwrap_or("");
// Content is JSON string: {"text":"actual message"}
let text = serde_json::from_str::<serde_json::Value>(content_str)
.ok()
.and_then(|v| v.get("text").and_then(|t| t.as_str()).map(String::from))
.unwrap_or_default();
if text.is_empty() {
return;
}
let sender_id = event
.pointer("/event/sender/sender_id/open_id")
.and_then(|v| v.as_str())
.unwrap_or("unknown")
.to_string();
eprintln!("[Lark] incoming message from {}: {}", sender_id, text);
let _ = command_tx
.send(IncomingCommand {
channel_id,
sender_id,
command_text: text,
metadata: event.clone(),
})
.await;
}
}
/// Fetch a fresh WebSocket endpoint URL from Feishu.
async fn fetch_ws_url(
client: &reqwest::Client,
app_id: &str,
app_secret: &str,
) -> Result<String, ChatChannelError> {
let resp = client
.post(format!("{}/callback/ws/endpoint", FEISHU_BASE_URL))
.json(&serde_json::json!({
"AppID": app_id,
"AppSecret": app_secret,
}))
.send()
.await
.map_err(|e| ChatChannelError::ConnectionFailed(e.to_string()))?;
let ws_resp: WsConnectResponse = resp
.json()
.await
.map_err(|e| ChatChannelError::ConnectionFailed(e.to_string()))?;
if ws_resp.code != 0 {
return Err(ChatChannelError::ConnectionFailed(format!(
"WS connect failed: code={}, msg={}",
ws_resp.code, ws_resp.msg
)));
}
ws_resp
.data
.and_then(|d| d.url)
.ok_or_else(|| ChatChannelError::ConnectionFailed("No WebSocket URL returned".into()))
}
#[async_trait]
impl ChatChannelBackend for LarkBackend {
fn channel_type(&self) -> ChannelType {
ChannelType::Lark
}
async fn start(
&self,
command_tx: mpsc::Sender<IncomingCommand>,
) -> Result<(), ChatChannelError> {
*self.status.lock().await = ChannelConnectionStatus::Connecting;
self.get_tenant_access_token().await?;
*self.status.lock().await = ChannelConnectionStatus::Connected;
if let Err(e) = self.start_ws_receiver(command_tx).await {
eprintln!("[Lark] WebSocket receiver failed to start: {e}");
}
Ok(())
}
async fn stop(&self) -> Result<(), ChatChannelError> {
if let Some(tx) = self.shutdown_tx.lock().await.take() {
let _ = tx.send(true);
}
*self.status.lock().await = ChannelConnectionStatus::Disconnected;
Ok(())
}
async fn status(&self) -> ChannelConnectionStatus {
*self.status.lock().await
}
async fn send_message(&self, text: &str) -> Result<SentMessageId, ChatChannelError> {
let content = serde_json::json!({ "text": text }).to_string();
self.send_lark_message("text", &content).await
}
async fn send_rich_message(
&self,
message: &RichMessage,
) -> Result<SentMessageId, ChatChannelError> {
let card = build_lark_card(message);
let content = serde_json::to_string(&card)
.map_err(|e| ChatChannelError::SendFailed(e.to_string()))?;
self.send_lark_message("interactive", &content).await
}
async fn test_connection(&self) -> Result<(), ChatChannelError> {
self.get_tenant_access_token().await?;
Ok(())
}
}
fn build_lark_card(msg: &RichMessage) -> serde_json::Value {
let header_color = match msg.level {
MessageLevel::Info => "blue",
MessageLevel::Warning => "orange",
MessageLevel::Error => "red",
};
let title = msg.title.as_deref().unwrap_or("Codeg");
let mut elements: Vec<serde_json::Value> = Vec::new();
if !msg.body.is_empty() {
elements.push(serde_json::json!({
"tag": "markdown",
"content": msg.body,
}));
}
if !msg.fields.is_empty() {
let field_elements: Vec<serde_json::Value> = msg
.fields
.iter()
.map(|(k, v)| {
serde_json::json!({
"is_short": true,
"text": {
"tag": "lark_md",
"content": format!("**{}**\n{}", k, v),
}
})
})
.collect();
elements.push(serde_json::json!({
"tag": "div",
"fields": field_elements,
}));
}
serde_json::json!({
"config": { "wide_screen_mode": true },
"header": {
"title": {
"tag": "plain_text",
"content": title,
},
"template": header_color,
},
"elements": elements,
})
}

View File

@@ -0,0 +1,2 @@
pub mod lark;
pub mod telegram;

View File

@@ -0,0 +1,255 @@
use std::sync::Arc;
use async_trait::async_trait;
use tokio::sync::{mpsc, Mutex};
use crate::chat_channel::error::ChatChannelError;
use crate::chat_channel::traits::ChatChannelBackend;
use crate::chat_channel::types::*;
pub struct TelegramBackend {
bot_token: String,
chat_id: String,
client: reqwest::Client,
status: Arc<Mutex<ChannelConnectionStatus>>,
channel_id: i32,
shutdown_tx: Arc<Mutex<Option<tokio::sync::watch::Sender<bool>>>>,
}
impl TelegramBackend {
pub fn new(channel_id: i32, bot_token: String, chat_id: String) -> Self {
Self {
bot_token,
chat_id,
client: reqwest::Client::new(),
status: Arc::new(Mutex::new(ChannelConnectionStatus::Disconnected)),
channel_id,
shutdown_tx: Arc::new(Mutex::new(None)),
}
}
fn api_url(&self, method: &str) -> String {
format!(
"https://api.telegram.org/bot{}/{}",
self.bot_token, method
)
}
}
#[async_trait]
impl ChatChannelBackend for TelegramBackend {
fn channel_type(&self) -> ChannelType {
ChannelType::Telegram
}
async fn start(
&self,
command_tx: mpsc::Sender<IncomingCommand>,
) -> Result<(), ChatChannelError> {
*self.status.lock().await = ChannelConnectionStatus::Connecting;
// Verify bot token by calling getMe
let resp = self
.client
.get(&self.api_url("getMe"))
.send()
.await
.map_err(|e| ChatChannelError::ConnectionFailed(e.to_string()))?;
if !resp.status().is_success() {
*self.status.lock().await = ChannelConnectionStatus::Error;
return Err(ChatChannelError::AuthenticationFailed(
"Invalid bot token".to_string(),
));
}
*self.status.lock().await = ChannelConnectionStatus::Connected;
// Start long-polling loop
let (shutdown_tx, mut shutdown_rx) = tokio::sync::watch::channel(false);
*self.shutdown_tx.lock().await = Some(shutdown_tx);
let client = self.client.clone();
let bot_token = self.bot_token.clone();
let channel_id = self.channel_id;
let status = self.status.clone();
tokio::spawn(async move {
let mut offset: i64 = 0;
loop {
if *shutdown_rx.borrow() {
break;
}
let url = format!(
"https://api.telegram.org/bot{}/getUpdates?timeout=30&offset={}",
bot_token, offset
);
let result = tokio::select! {
r = client.get(&url).send() => r,
_ = shutdown_rx.changed() => break,
};
match result {
Ok(resp) => {
if let Ok(body) = resp.json::<serde_json::Value>().await {
if let Some(updates) = body.get("result").and_then(|r| r.as_array()) {
for update in updates {
if let Some(uid) =
update.get("update_id").and_then(|u| u.as_i64())
{
offset = uid + 1;
}
if let Some(text) = update
.pointer("/message/text")
.and_then(|t| t.as_str())
{
let sender_id = update
.pointer("/message/from/id")
.and_then(|i| i.as_i64())
.map(|i| i.to_string())
.unwrap_or_default();
let _ = command_tx
.send(IncomingCommand {
channel_id,
sender_id,
command_text: text.to_string(),
metadata: update.clone(),
})
.await;
}
}
}
}
}
Err(e) => {
eprintln!("[Telegram] polling error: {e}");
*status.lock().await = ChannelConnectionStatus::Error;
tokio::time::sleep(tokio::time::Duration::from_secs(5)).await;
*status.lock().await = ChannelConnectionStatus::Connected;
}
}
}
*status.lock().await = ChannelConnectionStatus::Disconnected;
});
Ok(())
}
async fn stop(&self) -> Result<(), ChatChannelError> {
if let Some(tx) = self.shutdown_tx.lock().await.take() {
let _ = tx.send(true);
}
*self.status.lock().await = ChannelConnectionStatus::Disconnected;
Ok(())
}
async fn status(&self) -> ChannelConnectionStatus {
*self.status.lock().await
}
async fn send_message(&self, text: &str) -> Result<SentMessageId, ChatChannelError> {
let body = serde_json::json!({
"chat_id": self.chat_id,
"text": text,
"parse_mode": "Markdown",
});
let resp = self
.client
.post(&self.api_url("sendMessage"))
.json(&body)
.send()
.await
.map_err(|e| ChatChannelError::SendFailed(e.to_string()))?;
let result: serde_json::Value = resp
.json()
.await
.map_err(|e| ChatChannelError::SendFailed(e.to_string()))?;
let message_id = result
.pointer("/result/message_id")
.and_then(|v| v.as_i64())
.map(|id| id.to_string())
.unwrap_or_default();
Ok(SentMessageId(message_id))
}
async fn send_rich_message(
&self,
message: &RichMessage,
) -> Result<SentMessageId, ChatChannelError> {
let text = format_telegram_markdown(message);
self.send_message(&text).await
}
async fn test_connection(&self) -> Result<(), ChatChannelError> {
let resp = self
.client
.get(&self.api_url("getMe"))
.send()
.await
.map_err(|e| ChatChannelError::ConnectionFailed(e.to_string()))?;
if resp.status().is_success() {
Ok(())
} else {
Err(ChatChannelError::AuthenticationFailed(
"Invalid bot token".to_string(),
))
}
}
}
fn format_telegram_markdown(msg: &RichMessage) -> String {
let mut text = String::new();
let level_emoji = match msg.level {
MessageLevel::Info => "",
MessageLevel::Warning => "⚠️",
MessageLevel::Error => "",
};
if let Some(title) = &msg.title {
text.push_str(&format!("{} *{}*\n", level_emoji, escape_markdown(title)));
}
text.push_str(&escape_markdown(&msg.body));
if !msg.fields.is_empty() {
text.push('\n');
for (key, value) in &msg.fields {
text.push_str(&format!(
"\n*{}*: {}",
escape_markdown(key),
escape_markdown(value)
));
}
}
text
}
fn escape_markdown(text: &str) -> String {
text.replace('_', "\\_")
.replace('*', "\\*")
.replace('[', "\\[")
.replace(']', "\\]")
.replace('(', "\\(")
.replace(')', "\\)")
.replace('~', "\\~")
.replace('`', "\\`")
.replace('>', "\\>")
.replace('#', "\\#")
.replace('+', "\\+")
.replace('-', "\\-")
.replace('=', "\\=")
.replace('|', "\\|")
.replace('{', "\\{")
.replace('}', "\\}")
.replace('.', "\\.")
.replace('!', "\\!")
}

View File

@@ -0,0 +1,97 @@
use sea_orm::DatabaseConnection;
use tokio::sync::mpsc;
use tokio::task::JoinHandle;
use super::command_handlers;
use super::manager::ChatChannelManager;
use super::types::IncomingCommand;
use crate::db::service::chat_channel_message_log_service;
pub fn spawn_command_dispatcher(
mut command_rx: mpsc::Receiver<IncomingCommand>,
manager: ChatChannelManager,
db_conn: DatabaseConnection,
) -> JoinHandle<()> {
tokio::spawn(async move {
while let Some(cmd) = command_rx.recv().await {
let text = cmd.command_text.trim();
// Log inbound command
let _ = chat_channel_message_log_service::create_log(
&db_conn,
cmd.channel_id,
"inbound",
"command_query",
text,
"sent",
None,
)
.await;
let response = dispatch_command(text, &db_conn, &manager).await;
// Send response back via the same channel
let send_result = manager.send_to_channel(cmd.channel_id, &response).await;
let (status, error_detail) = match &send_result {
Ok(_) => ("sent", None),
Err(e) => ("failed", Some(e.to_string())),
};
let _ = chat_channel_message_log_service::create_log(
&db_conn,
cmd.channel_id,
"outbound",
"command_response",
&response.to_plain_text(),
status,
error_detail,
)
.await;
}
})
}
async fn dispatch_command(
text: &str,
db: &DatabaseConnection,
manager: &ChatChannelManager,
) -> super::types::RichMessage {
let parts: Vec<&str> = text.splitn(2, ' ').collect();
let command = parts[0].to_lowercase();
let args = parts.get(1).map(|s| s.trim()).unwrap_or("");
match command.as_str() {
"/recent" => command_handlers::handle_recent(db).await,
"/search" => {
if args.is_empty() {
super::types::RichMessage::info("用法: /search <关键词>")
.with_title("参数错误")
} else {
command_handlers::handle_search(db, args).await
}
}
"/detail" => {
if let Ok(id) = args.parse::<i32>() {
command_handlers::handle_detail(db, id).await
} else {
super::types::RichMessage::info("用法: /detail <会话ID>")
.with_title("参数错误")
}
}
"/today" => command_handlers::handle_today(db).await,
"/status" => command_handlers::handle_status(manager).await,
"/help" | "/start" => command_handlers::handle_help(),
_ => {
if text.starts_with('/') {
super::types::RichMessage::info(format!(
"未知命令: {}\n输入 /help 查看可用命令",
command
))
.with_title("未知命令")
} else {
// Non-command messages are ignored
return command_handlers::handle_help();
}
}
}
}

View File

@@ -0,0 +1,218 @@
use chrono::Utc;
use sea_orm::{ColumnTrait, DatabaseConnection, EntityTrait, QueryFilter, QueryOrder};
use super::manager::ChatChannelManager;
use super::types::{MessageLevel, RichMessage};
use crate::db::entities::conversation;
pub async fn handle_recent(db: &DatabaseConnection) -> RichMessage {
let rows = match conversation::Entity::find()
.filter(conversation::Column::DeletedAt.is_null())
.order_by_desc(conversation::Column::CreatedAt)
.all(db)
.await
{
Ok(rows) => rows,
Err(e) => {
return RichMessage {
title: Some("查询失败".to_string()),
body: e.to_string(),
fields: Vec::new(),
level: MessageLevel::Error,
};
}
};
let recent: Vec<_> = rows.into_iter().take(5).collect();
if recent.is_empty() {
return RichMessage::info("暂无会话记录").with_title("最近会话");
}
let mut body = String::new();
for (i, conv) in recent.iter().enumerate() {
let title = conv.title.as_deref().unwrap_or("(无标题)");
let agent = &conv.agent_type;
let time = conv.created_at.format("%m-%d %H:%M");
body.push_str(&format!(
"{}. [{}] {} ({})\n",
i + 1,
agent,
title,
time
));
}
RichMessage::info(body.trim_end()).with_title("最近 5 条会话")
}
pub async fn handle_search(db: &DatabaseConnection, keyword: &str) -> RichMessage {
let rows = match conversation::Entity::find()
.filter(conversation::Column::DeletedAt.is_null())
.order_by_desc(conversation::Column::CreatedAt)
.all(db)
.await
{
Ok(rows) => rows,
Err(e) => {
return RichMessage {
title: Some("查询失败".to_string()),
body: e.to_string(),
fields: Vec::new(),
level: MessageLevel::Error,
};
}
};
let keyword_lower = keyword.to_lowercase();
let matched: Vec<_> = rows
.into_iter()
.filter(|c| {
c.title
.as_deref()
.map(|t| t.to_lowercase().contains(&keyword_lower))
.unwrap_or(false)
})
.take(10)
.collect();
if matched.is_empty() {
return RichMessage::info(format!("未找到包含 \"{keyword}\" 的会话"))
.with_title("搜索结果");
}
let mut body = String::new();
for (i, conv) in matched.iter().enumerate() {
let title = conv.title.as_deref().unwrap_or("(无标题)");
let agent = &conv.agent_type;
body.push_str(&format!("{}. [{}] {} (ID:{})\n", i + 1, agent, title, conv.id));
}
RichMessage::info(body.trim_end())
.with_title(&format!("搜索 \"{}\" - {} 条结果", keyword, matched.len()))
}
pub async fn handle_detail(db: &DatabaseConnection, conversation_id: i32) -> RichMessage {
let conv = match conversation::Entity::find_by_id(conversation_id)
.filter(conversation::Column::DeletedAt.is_null())
.one(db)
.await
{
Ok(Some(c)) => c,
Ok(None) => {
return RichMessage::info(format!("会话 {conversation_id} 不存在"))
.with_title("未找到");
}
Err(e) => {
return RichMessage {
title: Some("查询失败".to_string()),
body: e.to_string(),
fields: Vec::new(),
level: MessageLevel::Error,
};
}
};
let title = conv.title.as_deref().unwrap_or("(无标题)");
RichMessage::info(title)
.with_title(&format!("会话详情 #{}", conv.id))
.with_field("代理", &conv.agent_type)
.with_field("状态", format!("{:?}", conv.status))
.with_field("消息数", &conv.message_count.to_string())
.with_field("创建时间", &conv.created_at.format("%Y-%m-%d %H:%M").to_string())
}
pub async fn handle_today(db: &DatabaseConnection) -> RichMessage {
let now = Utc::now();
let today_start = now
.date_naive()
.and_hms_opt(0, 0, 0)
.unwrap()
.and_utc();
let rows = match conversation::Entity::find()
.filter(conversation::Column::DeletedAt.is_null())
.filter(conversation::Column::CreatedAt.gte(today_start))
.order_by_desc(conversation::Column::CreatedAt)
.all(db)
.await
{
Ok(rows) => rows,
Err(e) => {
return RichMessage {
title: Some("查询失败".to_string()),
body: e.to_string(),
fields: Vec::new(),
level: MessageLevel::Error,
};
}
};
if rows.is_empty() {
return RichMessage::info("今日暂无编码活动").with_title("今日活动");
}
// Group by agent_type
let mut by_agent: std::collections::HashMap<String, u32> = std::collections::HashMap::new();
let mut titles: Vec<String> = Vec::new();
for conv in &rows {
*by_agent.entry(conv.agent_type.clone()).or_insert(0) += 1;
if let Some(t) = &conv.title {
if titles.len() < 5 {
titles.push(t.clone());
}
}
}
let mut body = format!("会话总数: {}", rows.len());
body.push_str("\n\n按代理:");
for (agent, count) in &by_agent {
body.push_str(&format!("\n {agent} - {count}"));
}
if !titles.is_empty() {
body.push_str("\n\n最近活动:");
for t in &titles {
body.push_str(&format!("\n{t}"));
}
}
RichMessage::info(body).with_title(&format!(
"今日活动 ({})",
now.format("%Y-%m-%d")
))
}
pub async fn handle_status(manager: &ChatChannelManager) -> RichMessage {
let statuses = manager.get_status().await;
if statuses.is_empty() {
return RichMessage::info("暂无活跃渠道").with_title("渠道状态");
}
let mut body = String::new();
for s in &statuses {
let icon = match s.status.as_str() {
"connected" => "",
"connecting" => "",
"error" => "",
_ => "",
};
body.push_str(&format!(
"{} {} [{}] - {}\n",
icon, s.name, s.channel_type, s.status
));
}
RichMessage::info(body.trim_end()).with_title("渠道状态")
}
pub fn handle_help() -> RichMessage {
RichMessage::info(
"/recent - 最近 5 条会话\n\
/search <关键词> - 搜索会话\n\
/detail <ID> - 会话详情\n\
/today - 今日活动汇总\n\
/status - 渠道连接状态\n\
/help - 显示帮助",
)
.with_title("Codeg Bot 帮助")
}

View File

@@ -0,0 +1,39 @@
use crate::app_error::AppCommandError;
#[derive(Debug, thiserror::Error)]
pub enum ChatChannelError {
#[error("connection failed: {0}")]
ConnectionFailed(String),
#[error("send failed: {0}")]
SendFailed(String),
#[error("authentication failed: {0}")]
AuthenticationFailed(String),
#[error("configuration invalid: {0}")]
ConfigurationInvalid(String),
#[error("not connected")]
NotConnected,
#[error("already connected")]
AlreadyConnected,
#[error("channel not found: {0}")]
NotFound(i32),
#[error("{0}")]
Other(String),
}
impl From<ChatChannelError> for AppCommandError {
fn from(err: ChatChannelError) -> Self {
match &err {
ChatChannelError::NotFound(_) => AppCommandError::not_found(err.to_string()),
ChatChannelError::AuthenticationFailed(_) => {
AppCommandError::authentication_failed(err.to_string())
}
ChatChannelError::ConfigurationInvalid(_) => {
AppCommandError::configuration_invalid(err.to_string())
}
ChatChannelError::ConnectionFailed(_) | ChatChannelError::SendFailed(_) => {
AppCommandError::network(err.to_string())
}
_ => AppCommandError::task_execution_failed(err.to_string()),
}
}
}

View File

@@ -0,0 +1,217 @@
use std::collections::HashMap;
use std::sync::Arc;
use std::time::{Duration, Instant};
use sea_orm::DatabaseConnection;
use tokio::task::JoinHandle;
use super::manager::ChatChannelManager;
use super::message_formatter;
use super::types::RichMessage;
use crate::db::service::{chat_channel_message_log_service, chat_channel_service};
use crate::web::event_bridge::WebEventBroadcaster;
/// Minimum interval between pushes for the same event type per channel (debounce).
const DEBOUNCE_SECS: u64 = 5;
pub fn spawn_event_subscriber(
broadcaster: Arc<WebEventBroadcaster>,
manager: ChatChannelManager,
db_conn: DatabaseConnection,
) -> JoinHandle<()> {
tokio::spawn(async move {
let mut rx = broadcaster.subscribe();
let mut last_push: HashMap<(i32, String), Instant> = HashMap::new();
loop {
let event = match rx.recv().await {
Ok(e) => e,
Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => {
eprintln!("[ChatChannel] event subscriber lagged by {n} messages");
continue;
}
Err(tokio::sync::broadcast::error::RecvError::Closed) => {
eprintln!("[ChatChannel] event broadcaster closed, stopping subscriber");
break;
}
};
let message = match parse_event(&event.channel, &event.payload) {
Some((event_type, msg)) => {
// Check enabled channels and forward
let channels = match chat_channel_service::list_enabled(&db_conn).await {
Ok(c) => c,
Err(e) => {
eprintln!("[ChatChannel] failed to list channels: {e}");
continue;
}
};
for ch in &channels {
// Check event filter
if let Some(filter_json) = &ch.event_filter_json {
if let Ok(filter) =
serde_json::from_str::<Vec<String>>(filter_json)
{
if !filter.contains(&event_type) {
continue;
}
}
}
// Debounce
let key = (ch.id, event_type.clone());
let now = Instant::now();
if let Some(last) = last_push.get(&key) {
if now.duration_since(*last) < Duration::from_secs(DEBOUNCE_SECS) {
continue;
}
}
last_push.insert(key, now);
// Send
let send_result = manager.send_to_channel(ch.id, &msg).await;
let (status, error_detail) = match &send_result {
Ok(_) => ("sent", None),
Err(e) => ("failed", Some(e.to_string())),
};
let _ = chat_channel_message_log_service::create_log(
&db_conn,
ch.id,
"outbound",
"event_push",
&msg.to_plain_text(),
status,
error_detail,
)
.await;
}
Some(msg)
}
None => None,
};
drop(message);
}
})
}
fn parse_event(channel: &str, payload: &serde_json::Value) -> Option<(String, RichMessage)> {
match channel {
"acp://event" => parse_acp_event(payload),
"folder://git-push-succeeded" => parse_git_push(payload),
"folder://git-commit-succeeded" => parse_git_commit(payload),
_ => None,
}
}
fn parse_acp_event(payload: &serde_json::Value) -> Option<(String, RichMessage)> {
let event_type = payload.get("type")?.as_str()?;
let connection_id = payload
.get("connection_id")
.and_then(|v| v.as_str())
.unwrap_or("unknown");
match event_type {
"session_started" => {
let agent_type = payload
.pointer("/data/agent_type")
.and_then(|v| v.as_str())
.unwrap_or("Unknown Agent");
let folder = payload
.pointer("/data/folder_name")
.and_then(|v| v.as_str())
.unwrap_or(connection_id);
Some((
"session_started".to_string(),
message_formatter::format_session_started(agent_type, folder),
))
}
"turn_complete" => {
let stop_reason = payload
.pointer("/data/stop_reason")
.and_then(|v| v.as_str())
.unwrap_or("unknown");
// Only push for end_turn, not for intermediate completions
if stop_reason != "end_turn" {
return None;
}
let agent_type = payload
.pointer("/data/agent_type")
.and_then(|v| v.as_str())
.unwrap_or("Unknown Agent");
Some((
"turn_complete".to_string(),
message_formatter::format_turn_complete(agent_type, stop_reason),
))
}
"error" => {
let agent_type = payload
.pointer("/data/agent_type")
.and_then(|v| v.as_str())
.unwrap_or("Unknown Agent");
let message = payload
.pointer("/data/message")
.and_then(|v| v.as_str())
.unwrap_or("Unknown error");
Some((
"error".to_string(),
message_formatter::format_agent_error(agent_type, message),
))
}
"status_changed" => {
let status = payload
.pointer("/data/status")
.and_then(|v| v.as_str())?;
if status != "disconnected" {
return None;
}
let agent_type = payload
.pointer("/data/agent_type")
.and_then(|v| v.as_str())
.unwrap_or("Unknown Agent");
Some((
"status_disconnected".to_string(),
message_formatter::format_agent_disconnected(agent_type),
))
}
// Phase 2: "permission_request" will be handled here
_ => None,
}
}
fn parse_git_push(payload: &serde_json::Value) -> Option<(String, RichMessage)> {
let folder_name = payload
.get("folder_name")
.and_then(|v| v.as_str())
.unwrap_or("unknown");
let branch = payload
.get("branch")
.and_then(|v| v.as_str())
.unwrap_or("unknown");
let commits = payload
.get("pushed_commits")
.and_then(|v| v.as_u64())
.unwrap_or(0) as u32;
Some((
"git_push".to_string(),
message_formatter::format_git_push(folder_name, branch, commits),
))
}
fn parse_git_commit(payload: &serde_json::Value) -> Option<(String, RichMessage)> {
let folder_name = payload
.get("folder_name")
.and_then(|v| v.as_str())
.unwrap_or("unknown");
let files = payload
.get("committed_files")
.and_then(|v| v.as_u64())
.unwrap_or(0) as u32;
Some((
"git_commit".to_string(),
message_formatter::format_git_commit(folder_name, files),
))
}

View File

@@ -0,0 +1,255 @@
use std::collections::HashMap;
use std::sync::Arc;
use sea_orm::DatabaseConnection;
use tokio::sync::{mpsc, Mutex};
use super::error::ChatChannelError;
use super::traits::ChatChannelBackend;
use super::types::*;
use crate::web::event_bridge::WebEventBroadcaster;
struct ActiveChannel {
id: i32,
name: String,
channel_type: ChannelType,
backend: Box<dyn ChatChannelBackend>,
}
/// Inner state shared across clones.
struct Inner {
channels: Mutex<HashMap<i32, ActiveChannel>>,
command_tx: mpsc::Sender<IncomingCommand>,
command_rx: Mutex<Option<mpsc::Receiver<IncomingCommand>>>,
}
pub struct ChatChannelManager {
inner: Arc<Inner>,
}
impl ChatChannelManager {
pub fn new() -> Self {
let (command_tx, command_rx) = mpsc::channel(256);
Self {
inner: Arc::new(Inner {
channels: Mutex::new(HashMap::new()),
command_tx,
command_rx: Mutex::new(Some(command_rx)),
}),
}
}
/// Shallow clone sharing the same state (like ConnectionManager::clone_ref).
pub fn clone_ref(&self) -> Self {
Self {
inner: self.inner.clone(),
}
}
pub fn command_sender(&self) -> mpsc::Sender<IncomingCommand> {
self.inner.command_tx.clone()
}
/// Take the command receiver (can only be called once, at startup).
pub async fn take_command_receiver(&self) -> Option<mpsc::Receiver<IncomingCommand>> {
self.inner.command_rx.lock().await.take()
}
pub async fn add_channel(
&self,
id: i32,
name: String,
channel_type: ChannelType,
backend: Box<dyn ChatChannelBackend>,
) -> Result<(), ChatChannelError> {
let command_tx = self.inner.command_tx.clone();
backend.start(command_tx).await?;
let channel = ActiveChannel {
id,
name,
channel_type,
backend,
};
self.inner.channels.lock().await.insert(id, channel);
Ok(())
}
pub async fn remove_channel(&self, id: i32) -> Result<(), ChatChannelError> {
let mut channels = self.inner.channels.lock().await;
if let Some(channel) = channels.remove(&id) {
channel.backend.stop().await?;
}
Ok(())
}
pub async fn stop_all(&self) {
let mut channels = self.inner.channels.lock().await;
for (_, channel) in channels.drain() {
let _ = channel.backend.stop().await;
}
}
pub async fn send_to_channel(
&self,
channel_id: i32,
message: &RichMessage,
) -> Result<SentMessageId, ChatChannelError> {
let channels = self.inner.channels.lock().await;
let channel = channels
.get(&channel_id)
.ok_or(ChatChannelError::NotFound(channel_id))?;
channel.backend.send_rich_message(message).await
}
pub async fn send_to_all(&self, message: &RichMessage) {
let channels = self.inner.channels.lock().await;
for (_, channel) in channels.iter() {
let _ = channel.backend.send_rich_message(message).await;
}
}
pub async fn get_status(&self) -> Vec<crate::models::ChannelStatusInfo> {
let channels = self.inner.channels.lock().await;
let mut result = Vec::new();
for (_, ch) in channels.iter() {
let status = ch.backend.status().await;
result.push(crate::models::ChannelStatusInfo {
channel_id: ch.id,
name: ch.name.clone(),
channel_type: ch.channel_type.to_string(),
status: serde_json::to_value(status)
.ok()
.and_then(|v| v.as_str().map(String::from))
.unwrap_or_else(|| "unknown".to_string()),
});
}
result
}
pub async fn test_channel(&self, id: i32) -> Result<(), ChatChannelError> {
let channels = self.inner.channels.lock().await;
let channel = channels
.get(&id)
.ok_or(ChatChannelError::NotFound(id))?;
channel.backend.test_connection().await
}
pub async fn is_connected(&self, id: i32) -> bool {
let channels = self.inner.channels.lock().await;
if let Some(ch) = channels.get(&id) {
ch.backend.status().await == ChannelConnectionStatus::Connected
} else {
false
}
}
/// Start background tasks (event subscriber + command dispatcher) and
/// auto-connect all enabled channels from DB.
pub async fn start_background(
&self,
broadcaster: Arc<WebEventBroadcaster>,
db_conn: DatabaseConnection,
) {
let db_conn2 = db_conn.clone();
// Spawn event subscriber
let manager_for_events = self.clone_ref();
super::event_subscriber::spawn_event_subscriber(
broadcaster,
manager_for_events,
db_conn.clone(),
);
// Spawn command dispatcher
if let Some(command_rx) = self.take_command_receiver().await {
let manager_for_cmds = self.clone_ref();
super::command_dispatcher::spawn_command_dispatcher(
command_rx,
manager_for_cmds,
db_conn.clone(),
);
}
// Spawn daily report scheduler
let manager_for_scheduler = self.clone_ref();
super::scheduler::spawn_daily_report_scheduler(manager_for_scheduler, db_conn.clone());
// Auto-connect enabled channels
self.auto_connect_channels(&db_conn2).await;
}
async fn auto_connect_channels(&self, db_conn: &DatabaseConnection) {
let channels =
match crate::db::service::chat_channel_service::list_enabled(db_conn).await {
Ok(c) => c,
Err(e) => {
eprintln!("[ChatChannel] failed to load enabled channels: {e}");
return;
}
};
for ch in channels {
let channel_type: ChannelType = match serde_json::from_value(
serde_json::Value::String(ch.channel_type.clone()),
) {
Ok(t) => t,
Err(_) => continue,
};
let config: serde_json::Value = match serde_json::from_str(&ch.config_json) {
Ok(v) => v,
Err(_) => continue,
};
let token = match crate::keyring_store::get_channel_token(ch.id) {
Some(t) => t,
None => continue,
};
let backend: Box<dyn ChatChannelBackend> = match channel_type {
ChannelType::Telegram => {
let chat_id = config
.get("chat_id")
.and_then(|v| v.as_str())
.unwrap_or("")
.to_string();
if chat_id.is_empty() {
continue;
}
Box::new(super::backends::telegram::TelegramBackend::new(
ch.id, token, chat_id,
))
}
ChannelType::Lark => {
let app_id = config
.get("app_id")
.and_then(|v| v.as_str())
.unwrap_or("")
.to_string();
let chat_id = config
.get("chat_id")
.and_then(|v| v.as_str())
.unwrap_or("")
.to_string();
if app_id.is_empty() || chat_id.is_empty() {
continue;
}
Box::new(super::backends::lark::LarkBackend::new(
ch.id, app_id, token, chat_id,
))
}
};
if let Err(e) = self.add_channel(ch.id, ch.name.clone(), channel_type, backend).await {
eprintln!(
"[ChatChannel] failed to auto-connect '{}' (id={}): {e}",
ch.name, ch.id
);
} else {
eprintln!("[ChatChannel] auto-connected '{}' (id={})", ch.name, ch.id);
}
}
}
}

View File

@@ -0,0 +1,86 @@
use super::types::{MessageLevel, RichMessage};
pub fn format_session_started(agent_type: &str, folder_name: &str) -> RichMessage {
RichMessage::info(format!("{agent_type}{folder_name} 开始了新会话"))
.with_title("新会话")
}
pub fn format_turn_complete(agent_type: &str, stop_reason: &str) -> RichMessage {
let reason = match stop_reason {
"end_turn" => "正常结束",
"cancelled" => "已取消",
_ => stop_reason,
};
RichMessage::info(format!("{agent_type} 会话已完成"))
.with_title("会话完成")
.with_field("结束原因", reason)
}
pub fn format_agent_error(agent_type: &str, message: &str) -> RichMessage {
RichMessage {
title: Some("代理错误".to_string()),
body: format!("{agent_type} 发生错误"),
fields: vec![("错误信息".to_string(), message.to_string())],
level: MessageLevel::Error,
}
}
pub fn format_agent_disconnected(agent_type: &str) -> RichMessage {
RichMessage {
title: Some("代理断开".to_string()),
body: format!("{agent_type} 已断开连接"),
fields: Vec::new(),
level: MessageLevel::Warning,
}
}
pub fn format_git_push(folder_name: &str, branch: &str, commits: u32) -> RichMessage {
RichMessage::info(format!(
"Git Push 成功: {commits} 个提交推送到 {branch}"
))
.with_title("Git Push")
.with_field("项目", folder_name)
}
pub fn format_git_commit(folder_name: &str, files: u32) -> RichMessage {
RichMessage::info(format!("Git Commit: {files} 个文件已提交"))
.with_title("Git Commit")
.with_field("项目", folder_name)
}
pub struct DailyReportData {
pub date: String,
pub conversations_by_agent: Vec<(String, u32)>,
pub total_conversations: u32,
pub projects_involved: Vec<String>,
pub key_activities: Vec<String>,
}
pub fn format_daily_report(report: &DailyReportData) -> RichMessage {
let mut body = format!("今日编码活动汇总 ({})", report.date);
body.push_str(&format!("\n\n会话总数: {}", report.total_conversations));
if !report.conversations_by_agent.is_empty() {
body.push_str("\n\n按代理分布:");
for (agent, count) in &report.conversations_by_agent {
body.push_str(&format!("\n {} - {} 个会话", agent, count));
}
}
if !report.projects_involved.is_empty() {
body.push_str(&format!(
"\n\n涉及项目: {}",
report.projects_involved.join(", ")
));
}
if !report.key_activities.is_empty() {
body.push_str("\n\n主要活动:");
for activity in &report.key_activities {
body.push_str(&format!("\n{}", activity));
}
}
RichMessage::info(body).with_title("每日编码报告")
}

View File

@@ -0,0 +1,10 @@
pub mod backends;
pub mod command_dispatcher;
pub mod command_handlers;
pub mod error;
pub mod event_subscriber;
pub mod manager;
pub mod message_formatter;
pub mod scheduler;
pub mod traits;
pub mod types;

View File

@@ -0,0 +1,130 @@
use std::collections::HashSet;
use chrono::{Local, NaiveDate, Timelike, Utc};
use sea_orm::{ColumnTrait, DatabaseConnection, EntityTrait, QueryFilter, QueryOrder};
use tokio::task::JoinHandle;
use super::manager::ChatChannelManager;
use super::message_formatter::{self, DailyReportData};
use crate::db::entities::conversation;
use crate::db::service::{chat_channel_message_log_service, chat_channel_service};
pub fn spawn_daily_report_scheduler(
manager: ChatChannelManager,
db_conn: DatabaseConnection,
) -> JoinHandle<()> {
tokio::spawn(async move {
let mut sent_today: HashSet<(i32, NaiveDate)> = HashSet::new();
loop {
tokio::time::sleep(tokio::time::Duration::from_secs(60)).await;
let now = Local::now();
let today = now.date_naive();
let current_time = format!("{:02}:{:02}", now.hour(), now.minute());
// Clean up old entries from sent_today
sent_today.retain(|(_, date)| *date == today);
let channels = match chat_channel_service::list_enabled(&db_conn).await {
Ok(c) => c,
Err(e) => {
eprintln!("[ChatChannel] scheduler: failed to list channels: {e}");
continue;
}
};
for ch in &channels {
if !ch.daily_report_enabled {
continue;
}
let report_time = ch
.daily_report_time
.as_deref()
.unwrap_or("18:00");
if current_time != report_time {
continue;
}
let key = (ch.id, today);
if sent_today.contains(&key) {
continue;
}
// Generate and send report
let report = generate_daily_report(&db_conn).await;
let message = message_formatter::format_daily_report(&report);
let send_result = manager.send_to_channel(ch.id, &message).await;
let (status, error_detail) = match &send_result {
Ok(_) => ("sent", None),
Err(e) => ("failed", Some(e.to_string())),
};
let _ = chat_channel_message_log_service::create_log(
&db_conn,
ch.id,
"outbound",
"daily_report",
&message.to_plain_text(),
status,
error_detail,
)
.await;
sent_today.insert(key);
}
}
})
}
async fn generate_daily_report(db: &DatabaseConnection) -> DailyReportData {
let now = Utc::now();
let today_start = now.date_naive().and_hms_opt(0, 0, 0).unwrap().and_utc();
let rows = conversation::Entity::find()
.filter(conversation::Column::DeletedAt.is_null())
.filter(conversation::Column::CreatedAt.gte(today_start))
.order_by_desc(conversation::Column::CreatedAt)
.all(db)
.await
.unwrap_or_default();
let mut by_agent: std::collections::HashMap<String, u32> =
std::collections::HashMap::new();
let mut folder_ids: HashSet<i32> = HashSet::new();
let mut activities: Vec<String> = Vec::new();
for conv in &rows {
*by_agent.entry(conv.agent_type.clone()).or_insert(0) += 1;
folder_ids.insert(conv.folder_id);
if let Some(title) = &conv.title {
if activities.len() < 10 {
activities.push(title.clone());
}
}
}
// Resolve folder names
let mut project_names: Vec<String> = Vec::new();
for fid in &folder_ids {
if let Ok(Some(folder)) = crate::db::entities::folder::Entity::find_by_id(*fid)
.one(db)
.await
{
project_names.push(folder.name);
}
}
let conversations_by_agent: Vec<(String, u32)> = by_agent.into_iter().collect();
DailyReportData {
date: now.format("%Y-%m-%d").to_string(),
total_conversations: rows.len() as u32,
conversations_by_agent,
projects_involved: project_names,
key_activities: activities,
}
}

View File

@@ -0,0 +1,53 @@
use async_trait::async_trait;
use tokio::sync::mpsc;
use super::error::ChatChannelError;
use super::types::*;
#[async_trait]
pub trait ChatChannelBackend: Send + Sync + 'static {
fn channel_type(&self) -> ChannelType;
/// Start the receiving loop. `command_tx` forwards incoming IM messages
/// to the central command dispatcher.
async fn start(
&self,
command_tx: mpsc::Sender<IncomingCommand>,
) -> Result<(), ChatChannelError>;
/// Stop the backend connection gracefully.
async fn stop(&self) -> Result<(), ChatChannelError>;
/// Current connection status.
async fn status(&self) -> ChannelConnectionStatus;
/// Send a plain text message.
async fn send_message(&self, text: &str) -> Result<SentMessageId, ChatChannelError>;
/// Send a rich/structured message (Telegram Markdown / Lark Card).
async fn send_rich_message(
&self,
message: &RichMessage,
) -> Result<SentMessageId, ChatChannelError>;
/// [Phase 2] Send an interactive message with action buttons.
/// Default implementation degrades to send_rich_message.
async fn send_interactive_message(
&self,
message: &InteractiveMessage,
) -> Result<SentMessageId, ChatChannelError> {
self.send_rich_message(&message.to_rich_fallback()).await
}
/// [Phase 2] Update an already-sent message (e.g., permission status change).
async fn update_message(
&self,
_message_id: &SentMessageId,
_message: &RichMessage,
) -> Result<(), ChatChannelError> {
Ok(())
}
/// Test the connection (used by "Test Connection" button in UI).
async fn test_connection(&self) -> Result<(), ChatChannelError>;
}

View File

@@ -0,0 +1,125 @@
use serde::{Deserialize, Serialize};
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum ChannelType {
Lark,
Telegram,
}
impl std::fmt::Display for ChannelType {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
ChannelType::Lark => write!(f, "lark"),
ChannelType::Telegram => write!(f, "telegram"),
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum ChannelConnectionStatus {
Connected,
Connecting,
Disconnected,
Error,
}
#[derive(Debug, Clone)]
pub struct SentMessageId(pub String);
pub struct IncomingCommand {
pub channel_id: i32,
pub sender_id: String,
pub command_text: String,
pub metadata: serde_json::Value,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum MessageLevel {
Info,
Warning,
Error,
}
#[derive(Debug, Clone)]
pub struct RichMessage {
pub title: Option<String>,
pub body: String,
pub fields: Vec<(String, String)>,
pub level: MessageLevel,
}
impl RichMessage {
pub fn info(body: impl Into<String>) -> Self {
Self {
title: None,
body: body.into(),
fields: Vec::new(),
level: MessageLevel::Info,
}
}
pub fn with_title(mut self, title: impl Into<String>) -> Self {
self.title = Some(title.into());
self
}
pub fn with_field(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
self.fields.push((key.into(), value.into()));
self
}
pub fn to_plain_text(&self) -> String {
let mut text = String::new();
if let Some(title) = &self.title {
text.push_str(title);
text.push('\n');
}
text.push_str(&self.body);
for (key, value) in &self.fields {
text.push_str(&format!("\n{}: {}", key, value));
}
text
}
}
// ── Phase 2 forward-compatible types ──
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum ButtonStyle {
Primary,
Danger,
Default,
}
#[derive(Debug, Clone)]
pub struct MessageButton {
pub id: String,
pub label: String,
pub style: ButtonStyle,
}
#[derive(Debug, Clone)]
pub struct InteractiveMessage {
pub base: RichMessage,
pub buttons: Vec<MessageButton>,
pub callback_context: serde_json::Value,
}
impl InteractiveMessage {
pub fn to_rich_fallback(&self) -> RichMessage {
let mut msg = self.base.clone();
if !self.buttons.is_empty() {
let button_text: Vec<String> = self
.buttons
.iter()
.map(|b| format!("[{}]", b.label))
.collect();
msg.body.push_str(&format!("\n\n{}", button_text.join(" ")));
}
msg
}
}