Files
tg_source_bot/bot.py
2025-11-04 09:39:45 +08:00

320 lines
12 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# -*- coding: utf-8 -*-
import os
import logging
import mimetypes
from datetime import datetime
from aiogram import Bot, Dispatcher, types, F
from aiogram.filters import Command
from aiogram.types import FSInputFile
from aiogram.utils.keyboard import InlineKeyboardBuilder
from aiogram.client.session.aiohttp import AiohttpSession
from aiogram.client.telegram import TelegramAPIServer
# === CONFIGURATION ===
# 从环境变量中安全地读取配置
BOT_TOKEN = os.environ.get("BOT_TOKEN")
# 将 USER_ID 转换为整数,如果未设置则默认为 0
AUTHORIZED_USER_ID = int(os.environ.get("AUTHORIZED_USER_ID", 0))
# 允许从环境变量覆盖存储目录,默认为 "local_storage"
# 注意:在 docker-compose 中,这个值被设为 /var/lib/telegram-bot-api
STORAGE_DIR = os.environ.get("STORAGE_DIR", "local_storage")
# === 启动时检查 ===
if not BOT_TOKEN:
raise ValueError("错误:未设置 BOT_TOKEN 环境变量。")
if AUTHORIZED_USER_ID == 0:
raise ValueError("错误:未设置 AUTHORIZED_USER_ID 环境变量。")
# === LOGGING ===
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# === INITIAL SETUP ===
local_api_server = TelegramAPIServer.from_base("http://bot-api:8081")
session = AiohttpSession(api=local_api_server)
bot = Bot(token=BOT_TOKEN, session=session)
dp = Dispatcher()
def ensure_storage():
"""Ensure the local storage directory exists."""
if not os.path.exists(STORAGE_DIR):
os.makedirs(STORAGE_DIR)
logger.info(f"Storage folder created at: {STORAGE_DIR}")
else:
logger.info(f"Storage folder exists at: {STORAGE_DIR}")
def is_authorized(user_id: int) -> bool:
"""Check if the user is authorized."""
return user_id == AUTHORIZED_USER_ID
def search_files(keyword: str):
"""Search for files containing the keyword."""
matches = []
for root, _, files in os.walk(STORAGE_DIR):
for f in files:
if keyword.lower() in f.lower():
matches.append(os.path.join(root, f))
return matches
def get_storage_summary():
"""Return summary of all files."""
total_size = 0
files_info = []
for root, dirs, files in os.walk(STORAGE_DIR):
for f in files:
path = os.path.join(root, f)
try:
size = os.path.getsize(path)
mtime = datetime.fromtimestamp(os.path.getmtime(path))
total_size += size
files_info.append((f, size, mtime))
except FileNotFoundError:
logger.warning(f"File not found during summary: {path}")
return total_size, files_info
def get_extension_from_mime(mime_type: str) -> str:
"""Guess the correct extension from MIME type."""
ext = mimetypes.guess_extension(mime_type or "")
return ext if ext else ""
# === COMMAND HANDLERS ===
@dp.message(Command("start"))
async def cmd_start(message: types.Message):
if not is_authorized(message.from_user.id):
await message.answer("🚫 Access denied.")
return
ensure_storage()
await message.answer("✅ Bot initialized! Local storage ready.")
@dp.message(Command("overview"))
async def cmd_overview(message: types.Message):
if not is_authorized(message.from_user.id):
await message.answer("🚫 Access denied.")
return
total_size, files_info = get_storage_summary()
if not files_info:
await message.answer("📁 Storage is empty.")
return
# 对文件按修改时间降序排序
files_info.sort(key=lambda x: x[2], reverse=True)
summary = "\n".join(
[f"{f}{s/1024:.1f} KB — {d.strftime('%Y-%m-%d %H:%M')}" for f, s, d in files_info]
)
await message.answer(f"📦 Files:\n{summary}\n\nTotal: {total_size/1024:.1f} KB")
@dp.message(Command("search"))
async def cmd_search(message: types.Message):
if not is_authorized(message.from_user.id):
await message.answer("🚫 Access denied.")
return
args = message.text.split(maxsplit=1)
if len(args) < 2:
await message.answer("Usage: /search <keyword>")
return
keyword = args[1]
results = search_files(keyword)
if not results:
await message.answer("🔍 No matching files found.")
return
builder = InlineKeyboardBuilder()
# 限制搜索结果数量,避免消息过长
for i, f in enumerate(results[:50]):
builder.button(text=f"{i+1}. {os.path.basename(f)}", callback_data=f"get|{f}")
builder.adjust(1) # 每行一个按钮
await message.answer(f"🔍 Found {len(results)} file(s):", reply_markup=builder.as_markup())
@dp.callback_query(F.data.startswith("get|"))
async def cb_get_file(callback: types.CallbackQuery):
if not is_authorized(callback.from_user.id):
await callback.message.answer("🚫 Access denied.")
return
path = callback.data.split("|", 1)[1]
if not os.path.exists(path):
await callback.message.answer("File not found (it may have been deleted).")
await callback.answer()
return
await callback.answer(f"Sending {os.path.basename(path)}...")
try:
await callback.message.answer_document(FSInputFile(path))
except Exception as e:
logger.error(f"Failed to send file {path}: {e}")
await callback.message.answer(f"Error sending file: {e}")
@dp.message(Command("delete"))
async def cmd_delete(message: types.Message):
if not is_authorized(message.from_user.id):
await message.answer("🚫 Access denied.")
return
args = message.text.split(maxsplit=1)
if len(args) < 2:
await message.answer("Usage: /delete <filename_keyword>")
return
keyword = args[1]
matches = search_files(keyword)
if not matches:
await message.answer("File not found.")
return
deleted_files = []
for f in matches:
try:
os.remove(f)
deleted_files.append(os.path.basename(f))
except OSError as e:
logger.error(f"Failed to delete {f}: {e}")
if deleted_files:
await message.answer(f"🗑️ Deleted {len(deleted_files)} file(s):\n" + "\n".join(deleted_files))
else:
await message.answer("Could not delete matching files.")
# === MESSAGE HANDLER FOR FILES/TEXT ===
@dp.message(F.photo | F.video | F.document | F.text)
async def handle_incoming(message: types.Message):
if not is_authorized(message.from_user.id):
await message.answer("🚫 Access denied.")
return
ensure_storage()
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
# --- Handle text ---
if message.text and not (message.photo or message.video or message.document):
# ... (文本处理逻辑不变) ...
filename = f"{timestamp}.txt"
# 文本文档也放入 'documents' 文件夹
subdir = "documents"
target_dir = os.path.join(STORAGE_DIR, subdir)
if not os.path.exists(target_dir):
os.makedirs(target_dir)
path = os.path.join(target_dir, filename)
with open(path, "w", encoding="utf-8") as f:
f.write(message.text)
await message.answer(f"💾 Text saved as `{filename}` (in /{subdir})", parse_mode="Markdown")
return
# --- Handle media/documents ---
file_obj = None
file_name = None
mime_type = None
is_photo = False
if message.document:
file_obj = message.document
file_name = message.document.file_name
mime_type = message.document.mime_type
elif message.photo:
file_obj = message.photo[-1] # Highest resolution
file_name = f"{timestamp}.jpg"
mime_type = "image/jpeg"
is_photo = True # 标记为照片
elif message.video:
file_obj = message.video
file_name = message.video.file_name
mime_type = message.video.mime_type
if not file_name: # 备用文件名
file_name = timestamp
if file_obj:
logger.info(f"Received file. Original file_name variable: '{file_name}' (MIME: {mime_type}) [IsPhoto: {is_photo}]")
# ... (文件名清理逻辑不变) ...
base_name, ext = os.path.splitext(file_name)
if not ext:
ext = get_extension_from_mime(mime_type)
if not ext:
ext = ".bin" # Fallback
safe_base_name = "".join(c for c in base_name if c.isalnum() or c in (' ', '.', '_', '-')).rstrip()
final_name = f"{timestamp}_{safe_base_name}{ext}"
# ... (子目录分类逻辑不变) ...
subdir = "others" # 默认目录
if mime_type:
if mime_type.startswith("video/"):
subdir = "video"
elif mime_type.startswith("image/"):
subdir = "pictures"
elif mime_type in ("application/zip", "application/x-zip-compressed", "application/x-rar-compressed", "application/gzip", "application/x-7z-compressed"):
subdir = "zip"
elif mime_type.startswith("audio/"):
subdir = "audio"
elif mime_type in ("application/pdf", "application/msword", "application/vnd.openxmlformats-officedocument.wordprocessingml.document", "text/plain"):
subdir = "documents"
target_dir = os.path.join(STORAGE_DIR, subdir)
if not os.path.exists(target_dir):
try:
os.makedirs(target_dir)
logger.info(f"Created subdirectory: {target_dir}")
except OSError as e:
logger.error(f"Failed to create subdirectory {target_dir}: {e}")
target_dir = STORAGE_DIR
destination_path = os.path.join(target_dir, final_name)
logger.info(f"Target file name set to: '{final_name}' in subdir: '{subdir}'")
# --- 根据 is_photo 区分处理 ---
try:
if is_photo:
# --- 照片处理逻辑 ---
# 照片无法使用 get_file + rename必须直接下载
logger.info(f"Processing as Photo. Downloading file_id: {file_obj.file_id}")
await bot.download_file(file_obj.file_id, destination_path)
logger.info(f"Downloaded photo to: {destination_path}")
else:
# --- 视频/文档处理逻辑 ---
# 1. 从 API 获取文件信息
file_info = await bot.get_file(file_obj.file_id)
# 2. 这是文件在 API 服务器上的绝对路径
source_path = file_info.file_path
logger.info(f"Processing as Document/Video. File is already on disk at: {source_path}")
# 3. 检查路径
if not os.path.abspath(source_path).startswith(os.path.abspath(STORAGE_DIR)):
logger.error(f"FATAL: File path {source_path} is outside STORAGE_DIR {STORAGE_DIR}. Check docker-compose command.")
# 后备方案:尝试下载 (这会很慢并且可能因404失败)
await bot.download_file(file_info.file_path, destination_path)
else:
# 4. 【核心】移动文件
os.rename(source_path, destination_path)
logger.info(f"Moved file from {source_path} to {destination_path}")
await message.answer(f"💾 Saved `{final_name}` (in /{subdir})", parse_mode="Markdown")
except Exception as e:
# 确保在这里打印完整的异常,包括 file_id
logger.error(f"Failed to save file {file_obj.file_id} (Dest: {destination_path}): {e}", exc_info=True)
await message.answer(f"Error saving file: {e}")
# === MAIN ENTRY ===
async def main():
ensure_storage()
logger.info(f"🚀 Bot is starting... Storage: {STORAGE_DIR}")
await dp.start_polling(bot)
if __name__ == "__main__":
import asyncio
asyncio.run(main())