optimize: WeChat channel message handling and error resilience

- Filter messages by message_type=1 to skip bot echo and prevent loops
- Add voice message support (type=3) with voice-to-text extraction
- Check resend results and re-buffer failed messages to prevent loss
- Handle session expiry (ret=-14) with 30s pause in polling loop
- Use exponential backoff (5s–30s) for network errors instead of fixed 5s

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
xintaofei
2026-04-02 00:29:01 +08:00
parent 8050e30a55
commit 0ef36ee918

View File

@@ -411,6 +411,7 @@ impl ChatChannelBackend for WeixinBackend {
tokio::spawn(async move { tokio::spawn(async move {
let mut cursor = initial_cursor; let mut cursor = initial_cursor;
let mut consecutive_errors: u32 = 0;
loop { loop {
if *shutdown_rx.borrow() { if *shutdown_rx.borrow() {
@@ -434,6 +435,7 @@ impl ChatChannelBackend for WeixinBackend {
match result { match result {
Ok(resp) => { Ok(resp) => {
// Recover from error state after successful poll // Recover from error state after successful poll
consecutive_errors = 0;
{ {
let mut s = status.lock().await; let mut s = status.lock().await;
if *s == ChannelConnectionStatus::Error { if *s == ChannelConnectionStatus::Error {
@@ -458,6 +460,13 @@ impl ChatChannelBackend for WeixinBackend {
if r != 0 { if r != 0 {
eprintln!("[Weixin] getupdates ret={r}"); eprintln!("[Weixin] getupdates ret={r}");
} }
// Session expired — pause and wait for re-auth
if r == -14 {
eprintln!("[Weixin] session expired (ret=-14), pausing 30s");
*status.lock().await = ChannelConnectionStatus::Error;
tokio::time::sleep(Duration::from_secs(30)).await;
continue;
}
} }
// Process messages // Process messages
@@ -466,7 +475,15 @@ impl ChatChannelBackend for WeixinBackend {
eprintln!("[Weixin] got {} message(s)", msgs.len()); eprintln!("[Weixin] got {} message(s)", msgs.len());
} }
for msg in msgs { for msg in msgs {
// Only handle text messages (type 1 in item_list) // Only handle user messages (message_type=1),
// skip bot echo (message_type=2)
let msg_type =
msg.get("message_type").and_then(|v| v.as_i64());
if msg_type != Some(1) {
continue;
}
// Extract text from type=1 (text) or type=3 (voice-to-text)
let text = msg let text = msg
.get("item_list") .get("item_list")
.and_then(|v| v.as_array()) .and_then(|v| v.as_array())
@@ -474,11 +491,14 @@ impl ChatChannelBackend for WeixinBackend {
items.iter().find_map(|item| { items.iter().find_map(|item| {
let t = let t =
item.get("type").and_then(|v| v.as_i64())?; item.get("type").and_then(|v| v.as_i64())?;
if t == 1 { match t {
item.pointer("/text_item/text") 1 => item
.and_then(|v| v.as_str()) .pointer("/text_item/text")
} else { .and_then(|v| v.as_str()),
None 3 => item
.pointer("/voice_item/text")
.and_then(|v| v.as_str()),
_ => None,
} }
}) })
}); });
@@ -542,7 +562,7 @@ impl ChatChannelBackend for WeixinBackend {
}, },
"base_info": { "channel_version": "1.0.2" } "base_info": { "channel_version": "1.0.2" }
}); });
let _ = client let send_ok = match client
.post(format!( .post(format!(
"{base_url}/ilink/bot/sendmessage" "{base_url}/ilink/bot/sendmessage"
)) ))
@@ -551,7 +571,26 @@ impl ChatChannelBackend for WeixinBackend {
)) ))
.json(&send_body) .json(&send_body)
.send() .send()
.await; .await
{
Ok(r) => {
let ok = r.status().is_success();
if !ok {
eprintln!("[Weixin] resend failed: HTTP {}", r.status());
}
ok
}
Err(e) => {
eprintln!("[Weixin] resend error: {e}");
false
}
};
if !send_ok {
// Re-buffer remaining messages
pending_messages.lock().await.push(
pending_text.clone(),
);
}
} }
} }
} }
@@ -576,9 +615,17 @@ impl ChatChannelBackend for WeixinBackend {
} }
} }
Err(e) => { Err(e) => {
eprintln!("[Weixin] polling error: {e}"); consecutive_errors += 1;
eprintln!(
"[Weixin] polling error ({consecutive_errors}): {e}"
);
*status.lock().await = ChannelConnectionStatus::Error; *status.lock().await = ChannelConnectionStatus::Error;
tokio::time::sleep(Duration::from_secs(5)).await; // Exponential backoff: 5s, 10s, 20s, capped at 30s
let delay = std::cmp::min(
5 * 2u64.saturating_pow(consecutive_errors - 1),
30,
);
tokio::time::sleep(Duration::from_secs(delay)).await;
} }
} }
} }