# -*- coding: utf-8 -*- import os import logging import mimetypes import re import asyncio from datetime import datetime from aiogram import Bot, Dispatcher, types, F from aiogram.filters import Command, StateFilter # <--- [修改1] 引入 StateFilter from aiogram.types import FSInputFile, ForceReply from aiogram.utils.keyboard import InlineKeyboardBuilder from aiogram.client.session.aiohttp import AiohttpSession from aiogram.client.telegram import TelegramAPIServer from aiogram.fsm.state import State, StatesGroup from aiogram.fsm.context import FSMContext # === CONFIGURATION === # 从环境变量中安全地读取配置 BOT_TOKEN = os.environ.get("BOT_TOKEN") # 将 USER_ID 转换为整数,如果未设置则默认为 0 AUTHORIZED_USER_ID = int(os.environ.get("AUTHORIZED_USER_ID", 0)) # 允许从环境变量覆盖存储目录 STORAGE_DIR = os.environ.get("STORAGE_DIR", "local_storage") # === 启动时检查 === if not BOT_TOKEN: raise ValueError("错误:未设置 BOT_TOKEN 环境变量。") if AUTHORIZED_USER_ID == 0: raise ValueError("错误:未设置 AUTHORIZED_USER_ID 环境变量。") # === LOGGING === logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # === INITIAL SETUP === # 增加 timeout 设置,防止 session 层面的过早断开 local_api_server = TelegramAPIServer.from_base("http://bot-api:8081") session = AiohttpSession(api=local_api_server, timeout=7200) bot = Bot(token=BOT_TOKEN, session=session) dp = Dispatcher() # === STATE DEFINITION === class RenameState(StatesGroup): waiting_for_new_name = State() # === HELPER FUNCTIONS === def ensure_storage(): """Ensure the local storage directory exists.""" if not os.path.exists(STORAGE_DIR): os.makedirs(STORAGE_DIR) logger.info(f"Storage folder created at: {STORAGE_DIR}") else: pass def is_authorized(user_id: int) -> bool: """Check if the user is authorized.""" return user_id == AUTHORIZED_USER_ID def search_files(keyword: str): """Search for files containing the keyword.""" matches = [] for root, _, files in os.walk(STORAGE_DIR): for f in files: if keyword.lower() in f.lower(): matches.append(os.path.join(root, f)) return matches def get_storage_summary(): """Return summary of all files.""" total_size = 0 files_info = [] for root, dirs, files in os.walk(STORAGE_DIR): for f in files: path = os.path.join(root, f) try: size = os.path.getsize(path) mtime = datetime.fromtimestamp(os.path.getmtime(path)) total_size += size files_info.append((f, size, mtime)) except FileNotFoundError: logger.warning(f"File not found during summary: {path}") return total_size, files_info def get_extension_from_mime(mime_type: str) -> str: """Guess the correct extension from MIME type.""" ext = mimetypes.guess_extension(mime_type or "") return ext if ext else "" # === COMMAND HANDLERS === @dp.message(Command("start")) async def cmd_start(message: types.Message): if not is_authorized(message.from_user.id): await message.answer("🚫 Access denied.") return ensure_storage() await message.answer("✅ Bot initialized! Local storage ready.") @dp.message(Command("overview")) async def cmd_overview(message: types.Message): if not is_authorized(message.from_user.id): await message.answer("🚫 Access denied.") return total_size, files_info = get_storage_summary() if not files_info: await message.answer("📁 Storage is empty.") return files_info.sort(key=lambda x: x[2], reverse=True) summary = "\n".join( [f"{f} — {s/1024:.1f} KB — {d.strftime('%Y-%m-%d %H:%M')}" for f, s, d in files_info] ) if len(summary) > 3500: summary = summary[:3500] + "\n... (list truncated)" await message.answer(f"📦 Files:\n{summary}\n\nTotal: {total_size/1024:.1f} KB") @dp.message(Command("search")) async def cmd_search(message: types.Message): if not is_authorized(message.from_user.id): await message.answer("🚫 Access denied.") return args = message.text.split(maxsplit=1) if len(args) < 2: await message.answer("Usage: /search ") return keyword = args[1] results = search_files(keyword) if not results: await message.answer("🔍 No matching files found.") return builder = InlineKeyboardBuilder() for i, f in enumerate(results[:50]): builder.button(text=f"{i+1}. {os.path.basename(f)}", callback_data=f"get|{f}") builder.adjust(1) await message.answer(f"🔍 Found {len(results)} file(s):", reply_markup=builder.as_markup()) @dp.callback_query(F.data.startswith("get|")) async def cb_get_file(callback: types.CallbackQuery): if not is_authorized(callback.from_user.id): await callback.message.answer("🚫 Access denied.") return path = callback.data.split("|", 1)[1] if not os.path.exists(path): await callback.message.answer("File not found (it may have been deleted).") await callback.answer() return await callback.answer(f"Sending {os.path.basename(path)}...") try: await callback.message.answer_document(FSInputFile(path)) except Exception as e: logger.error(f"Failed to send file {path}: {e}") await callback.message.answer(f"Error sending file: {e}") @dp.message(Command("delete")) async def cmd_delete(message: types.Message): if not is_authorized(message.from_user.id): await message.answer("🚫 Access denied.") return args = message.text.split(maxsplit=1) if len(args) < 2: await message.answer("Usage: /delete ") return keyword = args[1] matches = search_files(keyword) if not matches: await message.answer("File not found.") return deleted_files = [] for f in matches: try: os.remove(f) deleted_files.append(os.path.basename(f)) except OSError as e: logger.error(f"Failed to delete {f}: {e}") if deleted_files: await message.answer(f"🗑️ Deleted {len(deleted_files)} file(s):\n" + "\n".join(deleted_files)) else: await message.answer("Could not delete matching files.") # === MESSAGE HANDLER FOR FILES/TEXT === # [修改2] 增加 StateFilter(None),只有在当前没有处于任何 FSM 状态时,才执行这个保存逻辑 # 这样当我们在 waiting_for_new_name 状态时,就不会误触发这里 @dp.message(F.photo | F.video | F.document | F.text, StateFilter(None)) async def handle_incoming(message: types.Message): if not is_authorized(message.from_user.id): await message.answer("🚫 Access denied.") return ensure_storage() timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') # --- Handle text --- if message.text and not (message.photo or message.video or message.document): filename = f"{timestamp}.txt" subdir = "documents" target_dir = os.path.join(STORAGE_DIR, subdir) if not os.path.exists(target_dir): os.makedirs(target_dir) path = os.path.join(target_dir, filename) with open(path, "w", encoding="utf-8") as f: f.write(message.text) await message.answer(f"💾 Text saved as `{filename}` (in /{subdir})", parse_mode="Markdown") return # --- Handle media/documents --- file_obj = None file_name = None mime_type = None if message.document: file_obj = message.document file_name = message.document.file_name mime_type = message.document.mime_type elif message.photo: file_obj = message.photo[-1] # Highest resolution file_name = f"{timestamp}.jpg" mime_type = "image/jpeg" elif message.video: file_obj = message.video file_name = message.video.file_name mime_type = message.video.mime_type if not file_name: # Fallback name file_name = timestamp if file_obj: logger.info(f"Received file. Original file_name variable: '{file_name}' (MIME: {mime_type})") base_name, ext = os.path.splitext(file_name) if not ext: ext = get_extension_from_mime(mime_type) if not ext: ext = ".bin" safe_base_name = "".join(c for c in base_name if c.isalnum() or c in (' ', '.', '_', '-')).rstrip() final_name = f"{timestamp}_{safe_base_name}{ext}" # --- 子目录分类逻辑 --- subdir = "others" if mime_type: if mime_type.startswith("image/"): subdir = "pictures" elif mime_type.startswith("video/"): subdir = "video" elif mime_type.startswith("audio/"): subdir = "audio" elif mime_type in ("application/pdf", "application/msword", "application/vnd.openxmlformats-officedocument.wordprocessingml.document", "application/vnd.ms-excel", "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet", "application/vnd.ms-powerpoint", "application/vnd.openxmlformats-officedocument.presentationml.presentation", "application/rtf", "application/epub+zip", "text/plain", "text/csv", "text/markdown", "text/html"): subdir = "documents" elif mime_type in ("application/zip", "application/vnd.rar", "application/x-zip-compressed", "application/x-rar-compressed", "application/gzip", "application/x-7z-compressed", "application/x-tar", "application/x-bzip2"): subdir = "archives" elif mime_type in ("application/vnd.android.package-archive", "application/octet-stream", "application/x-msdownload", "application/x-apple-diskimage", "application/x-debian-package", "application/x-rpm", "application/x-sh", "application/x-csh", "application/bat"): subdir = "executables" elif mime_type in ("application/x-iso9660-image",): subdir = "disk_images" target_dir = os.path.join(STORAGE_DIR, subdir) if not os.path.exists(target_dir): try: os.makedirs(target_dir) except OSError as e: logger.error(f"Failed to create subdirectory {target_dir}: {e}") target_dir = STORAGE_DIR destination_path = os.path.join(target_dir, final_name) logger.info(f"Target file: '{final_name}' in '{subdir}'") # === 重试循环 === max_retries = 3 for attempt in range(max_retries): try: logger.info(f"Requesting file info for file_id: {file_obj.file_id} (Attempt {attempt+1}/{max_retries})") # 7200秒超时 file_info = await bot.get_file(file_obj.file_id, request_timeout=7200) source_path = file_info.file_path logger.info(f"File info received. Path on disk: {source_path}") if not os.path.abspath(source_path).startswith(os.path.abspath(STORAGE_DIR)): logger.error(f"FATAL: File path {source_path} is outside STORAGE_DIR. Fallback download.") await bot.download_file(file_info.file_path, destination_path) else: if os.path.exists(source_path): os.rename(source_path, destination_path) logger.info(f"Moved file to {destination_path}") else: raise FileNotFoundError(f"API says file is at {source_path} but it is missing.") break except Exception as e: logger.warning(f"Attempt {attempt+1} failed: {e}") if attempt == max_retries - 1: logger.error("All retry attempts failed.") await message.answer(f"❌ Failed after {max_retries} attempts. Server Error: {e}") return await asyncio.sleep(5) # === 文件保存成功后 === try: builder = InlineKeyboardBuilder() builder.button(text="✏️ 重命名", callback_data="rename_file") await message.answer( f"💾 Saved `{final_name}` (in /{subdir})", parse_mode="Markdown", reply_markup=builder.as_markup() ) except Exception as e: logger.error(f"Failed to send success message: {e}") # === RENAME FEATURE HANDLERS === @dp.callback_query(F.data == "rename_file") async def click_rename(callback: types.CallbackQuery, state: FSMContext): if not is_authorized(callback.from_user.id): return message_text = callback.message.text match = re.search(r"Saved (.+) \(in /(.+)\)", message_text) if not match: await callback.answer("❌ 无法解析文件路径,可能消息格式已变。", show_alert=True) return old_filename = match.group(1).strip() subdir = match.group(2).strip() full_path = os.path.join(STORAGE_DIR, subdir, old_filename) if not os.path.exists(full_path): await callback.answer("❌ 文件已不存在(可能已被删除或移动)。", show_alert=True) return await state.update_data( file_path=full_path, file_dir=os.path.dirname(full_path), old_ext=os.path.splitext(old_filename)[1] ) await state.set_state(RenameState.waiting_for_new_name) await callback.message.answer( f"当前文件名: `{old_filename}`\n请直接回复**新的文件名** (无需后缀):", parse_mode="Markdown", reply_markup=ForceReply(selective=True) ) await callback.answer() @dp.message(RenameState.waiting_for_new_name) async def process_rename(message: types.Message, state: FSMContext): if not is_authorized(message.from_user.id): return new_name_input = message.text.strip() safe_new_name = "".join(c for c in new_name_input if c.isalnum() or c in (' ', '.', '_', '-')).rstrip() if not safe_new_name: await message.answer("❌ 文件名无效,请重新输入。") return data = await state.get_data() old_path = data.get('file_path') file_dir = data.get('file_dir') old_ext = data.get('old_ext') if not old_path or not os.path.exists(old_path): await message.answer("❌ 原文件已丢失,操作取消。") await state.clear() return if not os.path.splitext(safe_new_name)[1]: safe_new_name += old_ext new_path = os.path.join(file_dir, safe_new_name) if os.path.exists(new_path): await message.answer("❌ 目标文件名已存在,请换一个名字。") return try: os.rename(old_path, new_path) await message.answer(f"✅ 重命名成功!\n`{safe_new_name}`", parse_mode="Markdown") except OSError as e: logger.error(f"Rename failed: {e}") await message.answer(f"❌ 重命名失败: {e}") await state.clear() # === MAIN ENTRY === async def main(): ensure_storage() logger.info(f"🚀 Bot is starting... Storage: {STORAGE_DIR}") await dp.start_polling(bot) if __name__ == "__main__": asyncio.run(main())