"""web ui for media download""" import asyncio import json import logging import os import threading from datetime import datetime from typing import Callable, Optional import pyrogram from flask import Flask, jsonify, render_template, request from flask_login import LoginManager, UserMixin, login_required, login_user from loguru import logger from ruamel.yaml import YAML from werkzeug.serving import make_server import utils from module.app import Application, ChatDownloadConfig, TaskNode from module.download_stat import ( DownloadState, get_download_result, get_download_state, get_total_download_speed, get_task_progress, set_download_state, pause_message, resume_message, skip_message, is_message_paused, remove_download_entry, ) from utils.crypto import AesBase64 from utils.format import format_byte, replace_date_time # Config file path CONFIG_FILE_PATH = "config.yaml" CHANNEL_HISTORY_FILE = os.path.join(os.path.abspath("."), "appdata", "channel_history.json") log = logging.getLogger("werkzeug") log.setLevel(logging.ERROR) _flask_app = Flask(__name__) _flask_app.secret_key = "tdl" _login_manager = LoginManager() _login_manager.login_view = "login" _login_manager.init_app(_flask_app) web_login_users: dict = {} deAesCrypt = AesBase64("1234123412ABCDEF", "ABCDEF1234123412") # Global references for dynamic download _app: Optional[Application] = None _client: Optional[pyrogram.Client] = None _add_download_task: Optional[Callable] = None _download_chat_task: Optional[Callable] = None _web_server = None # Reference to the web server for shutdown class User(UserMixin): """Web Login User""" def __init__(self): self.sid = "root" @property def id(self): """ID""" return self.sid @_login_manager.user_loader def load_user(_): """ Load a user object from the user ID. Returns: User: The user object. """ return User() def get_flask_app() -> Flask: """get flask app instance""" return _flask_app # pylint: disable = W0603 def init_web( app: Application, client: pyrogram.Client = None, add_download_task: Callable = None, download_chat_task: Callable = None, ): """ Set the value of the users variable. Args: app: The application instance. client: The pyrogram client instance. add_download_task: Function to add download task. download_chat_task: Function to download chat task. Returns: None. """ global web_login_users, _app, _client, _add_download_task, _download_chat_task, _web_server _app = app _client = client _add_download_task = add_download_task _download_chat_task = download_chat_task if app.web_login_secret: web_login_users = {"root": app.web_login_secret} else: _flask_app.config["LOGIN_DISABLED"] = True # Create a stoppable server _web_server = make_server(app.web_host, app.web_port, _flask_app, threaded=True) def run_server(): _web_server.serve_forever() threading.Thread(target=run_server, daemon=True).start() def shutdown_web(): """Shutdown the web server""" global _web_server if _web_server: logger.info("正在关闭 Web 服务器...") _web_server.shutdown() _web_server = None @_flask_app.route("/login", methods=["GET", "POST"]) def login(): """ Function to handle the login route. Parameters: - No parameters Returns: - If the request method is "POST" and the username and password match the ones in the web_login_users dictionary, it returns a JSON response with a code of "1". - Otherwise, it returns a JSON response with a code of "0". - If the request method is not "POST", it returns the rendered "login.html" template. """ if request.method == "POST": username = "root" web_login_form = {} for key, value in request.form.items(): if value: value = deAesCrypt.decrypt(value) web_login_form[key] = value if not web_login_form.get("password"): return jsonify({"code": "0"}) password = web_login_form["password"] if username in web_login_users and web_login_users[username] == password: user = User() login_user(user) return jsonify({"code": "1"}) return jsonify({"code": "0"}) return render_template("login.html") @_flask_app.route("/") @login_required def index(): """Index html""" return render_template( "index.html", download_state=( "pause" if get_download_state() is DownloadState.Downloading else "continue" ), ) @_flask_app.route("/get_download_status") @login_required def get_download_speed(): """Get download speed and state""" is_downloading = get_download_state() is DownloadState.Downloading state = "pause" if is_downloading else "continue" # 暂停状态时速度直接返回0 speed = format_byte(get_total_download_speed()) if is_downloading else "0 B" return ( '{ "download_speed" : "' + speed + '/s" , "upload_speed" : "0.00 B/s", "state" : "' + state + '" }' ) @_flask_app.route("/api/task_progress") @login_required def api_task_progress(): """Get current task progress including skipped files""" progress = get_task_progress() return jsonify(progress) @_flask_app.route("/set_download_state", methods=["POST"]) @login_required def web_set_download_state(): """Set download state""" state = request.args.get("state") if state == "continue" and get_download_state() is DownloadState.StopDownload: set_download_state(DownloadState.Downloading) return "pause" if state == "pause" and get_download_state() is DownloadState.Downloading: set_download_state(DownloadState.StopDownload) return "continue" return state @_flask_app.route("/api/message_control", methods=["POST"]) @login_required def api_message_control(): """单条下载任务控制:暂停/继续/跳过""" import module.database as db data = request.get_json(silent=True) or {} chat_id = str(data.get("chat_id", "")) message_id = int(data.get("message_id", 0)) action = data.get("action", "") if not chat_id or not message_id: return jsonify({"success": False, "error": "缺少参数"}), 400 if action == "pause": pause_message(chat_id, message_id) elif action == "resume": resume_message(chat_id, message_id) elif action == "skip": file_name = str(data.get("file_name", "")) skip_message(chat_id, message_id) remove_download_entry(chat_id, message_id) db.record_skip(chat_id, "", message_id, file_name) else: return jsonify({"success": False, "error": "未知操作"}), 400 return jsonify({"success": True}) @_flask_app.route("/api/undo_skip", methods=["POST"]) @login_required def api_undo_skip(): """撤销手动跳过,删除数据库记录""" import module.database as db data = request.get_json(silent=True) or {} chat_id = str(data.get("chat_id", "")) message_id = int(data.get("message_id", 0)) if not chat_id or not message_id: return jsonify({"success": False, "error": "缺少参数"}), 400 db.delete_record(chat_id, message_id) return jsonify({"success": True}) @_flask_app.route("/get_app_version") def get_app_version(): """Get telegram_media_downloader version""" return utils.__version__ @_flask_app.route("/get_download_list") @login_required def get_download_list(): """get download list""" import module.database as db import json if request.args.get("already_down") is None: return "[]" already_down = request.args.get("already_down") == "true" if already_down: # 已完成列表从数据库读取,重启后不丢失,支持分页 try: page = max(1, int(request.args.get("page", 1))) except (ValueError, TypeError): page = 1 try: page_size = min(100, max(10, int(request.args.get("page_size", 50)))) except (ValueError, TypeError): page_size = 50 offset = (page - 1) * page_size records = db.get_recent_history(limit=page_size, offset=offset) total = db.count_recent_history() items = [] for r in records: items.append({ "chat": r.get("chat_title") or r.get("chat_id", ""), "chat_id": r.get("chat_id", ""), "id": str(r.get("message_id", "")), "filename": r.get("file_name", ""), "total_size": format_byte(r.get("file_size") or 0), "download_progress": "100", "download_speed": r.get("download_time", ""), "save_path": (r.get("file_path") or "").replace("\\", "/"), "status": r.get("status", "success"), }) return json.dumps({ "items": items, "total": total, "page": page, "page_size": page_size, "total_pages": max(1, (total + page_size - 1) // page_size), }, ensure_ascii=False) download_result = get_download_result() items = [] for chat_id, messages in download_result.items(): for idx, value in messages.items(): is_already_down = value["down_byte"] == value["total_size"] if is_already_down: continue download_speed = format_byte(value["download_speed"]) + "/s" items.append({ "chat": str(chat_id), "id": str(idx), "filename": os.path.basename(value["file_name"]), "total_size": format_byte(value["total_size"]), "download_progress": str(round(value["down_byte"] / value["total_size"] * 100, 1)), "download_speed": download_speed, "save_path": value["file_name"].replace("\\", "/"), "paused": is_message_paused(str(chat_id), idx), }) return json.dumps(items, ensure_ascii=False) @_flask_app.route("/control") @login_required def control_page(): """Control page - redirects to unified index page""" return render_template( "index.html", download_state=( "pause" if get_download_state() is DownloadState.Downloading else "continue" ), ) @_flask_app.route("/api/setup_status") def api_setup_status(): """检测配置完成度,无需登录,新用户引导使用""" try: yaml_inst = YAML() with open(CONFIG_FILE_PATH, "r", encoding="utf-8") as f: config = yaml_inst.load(f) api_id = config.get("api_id", 0) or 0 api_hash = str(config.get("api_hash", "") or "") has_api_credentials = ( bool(api_id) and str(api_id) != "0" and bool(api_hash) and len(api_hash) >= 8 and api_hash not in ("your_api_hash", "YOUR_API_HASH", "") ) session_paths = [ os.path.join("sessions", "media_downloader.session"), "media_downloader.session", ] has_session = any(os.path.exists(p) for p in session_paths) chats = config.get("chat", []) or [] has_chat = bool(chats and chats[0].get("chat_id")) proxy = dict(config.get("proxy", {}) or {}) return jsonify({ "has_api_credentials": has_api_credentials, "has_session": has_session, "has_chat": has_chat, "proxy": proxy, "save_path": config.get("save_path", ""), }) except Exception as e: logger.exception(f"setup_status error: {e}") return jsonify({"has_api_credentials": False, "error": str(e)}) @_flask_app.route("/api/save_initial_config", methods=["POST"]) def api_save_initial_config(): """保存初始配置(API 凭证 + 代理 + 路径),无需登录""" global _app try: data = request.get_json() api_id_raw = data.get("api_id", "") api_hash = str(data.get("api_hash", "")).strip() save_path = str(data.get("save_path", "")).strip() proxy_enabled = bool(data.get("proxy_enabled", False)) proxy_scheme = str(data.get("proxy_scheme", "socks5")).strip() proxy_hostname = str(data.get("proxy_hostname", "")).strip() proxy_port_raw = data.get("proxy_port", 7891) try: api_id = int(api_id_raw) except (ValueError, TypeError): return jsonify({"success": False, "error": "api_id 必须是数字"}) try: proxy_port = int(proxy_port_raw) except (ValueError, TypeError): proxy_port = 7891 if not api_id or not api_hash: return jsonify({"success": False, "error": "api_id 和 api_hash 不能为空"}) yaml_inst = YAML() yaml_inst.preserve_quotes = True with open(CONFIG_FILE_PATH, "r", encoding="utf-8") as f: config = yaml_inst.load(f) config["api_id"] = api_id config["api_hash"] = api_hash if save_path: config["save_path"] = save_path if proxy_enabled and proxy_hostname: config["proxy"] = { "scheme": proxy_scheme, "hostname": proxy_hostname, "port": proxy_port, } elif not proxy_enabled and "proxy" in config: del config["proxy"] with open(CONFIG_FILE_PATH, "w", encoding="utf-8") as f: yaml_inst.dump(config, f) if _app: _app.restart_program = True return jsonify({"success": True, "message": "配置已保存,程序即将重启"}) except Exception as e: logger.exception(f"save_initial_config error: {e}") return jsonify({"success": False, "error": str(e)}) @_flask_app.route("/api/get_config") @login_required def api_get_config(): """Get current configuration""" try: yaml = YAML() with open(CONFIG_FILE_PATH, "r", encoding="utf-8") as f: config = yaml.load(f) chat_config = config.get("chat", [{}])[0] if config.get("chat") else {} return jsonify({ "chat_id": chat_config.get("chat_id", ""), "download_filter": chat_config.get("download_filter", ""), "save_path": config.get("save_path", "") }) except Exception as e: return jsonify({"error": str(e)}), 500 def _build_download_filter(start_date: str, end_date: str) -> str: """Build download filter string from date range""" if not start_date: return "" download_filter = f"message_date >= {start_date} 00:00:00" if end_date: download_filter += f" and message_date <= {end_date} 23:59:59" return download_filter # ============ Channel History Management ============ def _load_channel_history() -> list: """Load channel history from file""" try: if os.path.exists(CHANNEL_HISTORY_FILE): with open(CHANNEL_HISTORY_FILE, "r", encoding="utf-8") as f: return json.load(f) except Exception as e: logger.warning(f"Failed to load channel history: {e}") return [] def _save_channel_history(history: list): """Save channel history to file""" try: with open(CHANNEL_HISTORY_FILE, "w", encoding="utf-8") as f: json.dump(history, f, ensure_ascii=False, indent=2) except Exception as e: logger.warning(f"Failed to save channel history: {e}") def _add_to_channel_history(chat_id: str, chat_title: str, chat_type: str = ""): """Add or update a channel in history""" history = _load_channel_history() # Check if already exists, update if so for item in history: if item.get("chat_id") == chat_id: item["chat_title"] = chat_title or item.get("chat_title", chat_id) item["chat_type"] = chat_type or item.get("chat_type", "") item["last_used"] = datetime.now().isoformat() item["use_count"] = item.get("use_count", 0) + 1 break else: # Not found, add new history.insert(0, { "chat_id": chat_id, "chat_title": chat_title or chat_id, "chat_type": chat_type, "last_used": datetime.now().isoformat(), "use_count": 1 }) # Sort by last_used (most recent first) and limit to 20 items history.sort(key=lambda x: x.get("last_used", ""), reverse=True) history = history[:20] _save_channel_history(history) def _remove_from_channel_history(chat_id: str) -> bool: """Remove a channel from history""" history = _load_channel_history() original_len = len(history) history = [item for item in history if item.get("chat_id") != chat_id] if len(history) < original_len: _save_channel_history(history) return True return False @_flask_app.route("/api/channel_history") @login_required def api_get_channel_history(): """Get channel history list""" try: history = _load_channel_history() return jsonify({"success": True, "history": history}) except Exception as e: logger.exception(f"Error getting channel history: {e}") return jsonify({"success": False, "error": str(e)}) @_flask_app.route("/api/channel_history/", methods=["DELETE"]) @login_required def api_delete_channel_history(chat_id: str): """Delete a channel from history""" try: if _remove_from_channel_history(chat_id): return jsonify({"success": True, "message": "已删除"}) return jsonify({"success": False, "error": "未找到该频道"}) except Exception as e: logger.exception(f"Error deleting channel history: {e}") return jsonify({"success": False, "error": str(e)}) @_flask_app.route("/api/channel_history/clear", methods=["POST"]) @login_required def api_clear_channel_history(): """Clear all channel history""" try: _save_channel_history([]) return jsonify({"success": True, "message": "历史记录已清空"}) except Exception as e: logger.exception(f"Error clearing channel history: {e}") return jsonify({"success": False, "error": str(e)}) def _update_chat_config(chat_id: str, download_filter: str, update_memory: bool = False) -> dict: """ Unified function to update chat configuration. Args: chat_id: The chat/channel ID download_filter: The download filter string update_memory: Whether to also update in-memory config Returns: dict with keys: success, error (if failed), chat_title (if available) """ global _app try: # Read current config yaml = YAML() yaml.preserve_quotes = True with open(CONFIG_FILE_PATH, "r", encoding="utf-8") as f: config = yaml.load(f) # Update chat config in file if "chat" not in config or not config["chat"]: config["chat"] = [{}] config["chat"][0]["chat_id"] = chat_id config["chat"][0]["last_read_message_id"] = 0 if download_filter: config["chat"][0]["download_filter"] = download_filter elif "download_filter" in config["chat"][0]: del config["chat"][0]["download_filter"] # Write config to file with open(CONFIG_FILE_PATH, "w", encoding="utf-8") as f: yaml.dump(config, f) # Update in-memory config if requested if update_memory and _app: # Update _app.config if "chat" not in _app.config or not _app.config["chat"]: _app.config["chat"] = [{}] _app.config["chat"][0]["chat_id"] = chat_id _app.config["chat"][0]["last_read_message_id"] = 0 if download_filter: _app.config["chat"][0]["download_filter"] = download_filter elif "download_filter" in _app.config["chat"][0]: del _app.config["chat"][0]["download_filter"] # Clear old chat_download_config and add new one _app.chat_download_config.clear() new_config = ChatDownloadConfig() new_config.last_read_message_id = 0 new_config.download_filter = download_filter _app.chat_download_config[chat_id] = new_config return {"success": True} except Exception as e: logger.exception(f"Error updating config: {e}") return {"success": False, "error": str(e)} def _update_chat_configs_multi(items: list, update_memory: bool = True) -> dict: """ 多频道版本:把 items 数组写入 config["chat"],完整覆盖原数组。 每个 item 形如 {"chat_id": "xxx", "download_filter": "xxx"}。 同步更新内存里的 _app.chat_download_config。 """ global _app if not items: return {"success": False, "error": "队列为空"} try: yaml = YAML() yaml.preserve_quotes = True with open(CONFIG_FILE_PATH, "r", encoding="utf-8") as f: config = yaml.load(f) # 按队列顺序重建 chat 数组 new_chat_list = [] for item in items: chat_id = str(item.get("chat_id", "")).strip() download_filter = str(item.get("download_filter", "") or "").strip() if not chat_id: continue entry = {"chat_id": chat_id, "last_read_message_id": 0} if download_filter: entry["download_filter"] = download_filter new_chat_list.append(entry) if not new_chat_list: return {"success": False, "error": "队列中没有有效的频道"} config["chat"] = new_chat_list with open(CONFIG_FILE_PATH, "w", encoding="utf-8") as f: yaml.dump(config, f) # 同步内存配置 if update_memory and _app: _app.config["chat"] = new_chat_list _app.chat_download_config.clear() for entry in new_chat_list: c = ChatDownloadConfig() c.last_read_message_id = 0 c.download_filter = entry.get("download_filter", "") _app.chat_download_config[entry["chat_id"]] = c return {"success": True, "count": len(new_chat_list)} except Exception as e: logger.exception(f"Error updating multi chat config: {e}") return {"success": False, "error": str(e)} async def _validate_chat(chat_id: str) -> dict: """ Validate chat/channel and get its info. Returns: dict with keys: valid, chat_id, chat_title, error (if invalid) """ global _client if not _client: return {"valid": False, "error": "Telegram 客户端未就绪"} try: chat = await _client.get_chat(chat_id) return { "valid": True, "chat_id": str(chat.id), "chat_title": chat.title or chat.first_name or chat_id, "chat_type": str(chat.type).split(".")[-1] if chat.type else "unknown" } except pyrogram.errors.exceptions.bad_request_400.UsernameNotOccupied: return {"valid": False, "error": f"频道/群组 '{chat_id}' 不存在"} except pyrogram.errors.exceptions.bad_request_400.PeerIdInvalid: return {"valid": False, "error": f"无效的频道/群组 ID: {chat_id}"} except Exception as e: return {"valid": False, "error": f"验证失败: {str(e)}"} @_flask_app.route("/api/validate_chat", methods=["POST"]) @login_required def api_validate_chat(): """Validate chat/channel and return its info""" global _app try: data = request.get_json() chat_id = data.get("chat_id", "").strip() if not chat_id: return jsonify({"valid": False, "error": "频道ID不能为空"}) if not _app or not _client: return jsonify({"valid": False, "error": "服务未就绪,请稍后再试"}) # Run validation in event loop future = asyncio.run_coroutine_threadsafe(_validate_chat(chat_id), _app.loop) result = future.result(timeout=10) return jsonify(result) except asyncio.TimeoutError: return jsonify({"valid": False, "error": "验证超时,请检查网络连接"}) except Exception as e: logger.exception(f"Error validating chat: {e}") return jsonify({"valid": False, "error": str(e)}) @_flask_app.route("/api/save_path", methods=["POST"]) @login_required def api_save_path(): """修改文件保存路径""" try: data = request.get_json() save_path = str(data.get("save_path", "")).strip() if not save_path: return jsonify({"success": False, "error": "路径不能为空"}) yaml_inst = YAML() yaml_inst.preserve_quotes = True with open(CONFIG_FILE_PATH, "r", encoding="utf-8") as f: config = yaml_inst.load(f) config["save_path"] = save_path with open(CONFIG_FILE_PATH, "w", encoding="utf-8") as f: yaml_inst.dump(config, f) # 同步更新内存中的配置 if _app: _app.save_path = save_path return jsonify({"success": True, "save_path": save_path}) except Exception as e: logger.exception(f"save_path error: {e}") return jsonify({"success": False, "error": str(e)}) @_flask_app.route("/api/set_config", methods=["POST"]) @login_required def api_set_config(): """Set download configuration (file only, no restart)""" try: data = request.get_json() chat_id = data.get("chat_id", "").strip() start_date = data.get("start_date", "").strip() end_date = data.get("end_date", "").strip() if not chat_id: return jsonify({"success": False, "error": "频道ID不能为空"}) download_filter = _build_download_filter(start_date, end_date) result = _update_chat_config(chat_id, download_filter, update_memory=False) return jsonify(result) except Exception as e: logger.exception(f"Error in api_set_config: {e}") return jsonify({"success": False, "error": str(e)}) @_flask_app.route("/api/save_and_restart", methods=["POST"]) @login_required def api_save_and_restart(): """Save configuration and restart the program""" global _app try: data = request.get_json() chat_id = data.get("chat_id", "").strip() start_date = data.get("start_date", "").strip() end_date = data.get("end_date", "").strip() chat_title = data.get("chat_title", "") # Optional, from validation chat_type = data.get("chat_type", "") # Optional, from validation if not chat_id: return jsonify({"success": False, "error": "频道ID不能为空"}) # 优先使用前端传来的完整过滤表达式,否则从日期范围构建 custom_filter = data.get("download_filter", "").strip() if custom_filter: download_filter = custom_filter else: download_filter = _build_download_filter(start_date, end_date) # Update both file and memory result = _update_chat_config(chat_id, download_filter, update_memory=True) if not result.get("success"): return jsonify(result) # Add to channel history _add_to_channel_history(chat_id, chat_title, chat_type) # Trigger restart if _app: _app.restart_program = True logger.info(f"Restart flag set, new chat_id: {chat_id} ({chat_title})") return jsonify({ "success": True, "message": f"配置已保存,正在重启下载 {chat_title or chat_id}..." }) except Exception as e: logger.exception(f"Error in api_save_and_restart: {e}") return jsonify({"success": False, "error": str(e)}) @_flask_app.route("/api/save_and_restart_multi", methods=["POST"]) @login_required def api_save_and_restart_multi(): """多频道版本的保存并重启。接收 items 数组,每项含 chat_id + download_filter。""" global _app try: data = request.get_json() or {} items = data.get("items", []) if not isinstance(items, list) or not items: return jsonify({"success": False, "error": "队列为空"}) # 规范化 items:每项必须有 chat_id;download_filter 可由 start_date/end_date 构建 normalized = [] for it in items: cid = str(it.get("chat_id", "") or "").strip() if not cid: continue flt = str(it.get("download_filter", "") or "").strip() if not flt: sd = str(it.get("start_date", "") or "").strip() ed = str(it.get("end_date", "") or "").strip() if sd: flt = _build_download_filter(sd, ed) normalized.append({ "chat_id": cid, "download_filter": flt, "chat_title": it.get("chat_title", "") or "", "chat_type": it.get("chat_type", "") or "", }) if not normalized: return jsonify({"success": False, "error": "没有有效的频道"}) result = _update_chat_configs_multi(normalized, update_memory=True) if not result.get("success"): return jsonify(result) # 历史记录更新 for it in normalized: _add_to_channel_history(it["chat_id"], it["chat_title"], it["chat_type"]) # 触发重启 if _app: _app.restart_program = True logger.info(f"Restart flag set, {len(normalized)} chats queued") return jsonify({ "success": True, "count": len(normalized), "message": f"已保存 {len(normalized)} 个频道任务,正在重启..." }) except Exception as e: logger.exception(f"Error in api_save_and_restart_multi: {e}") return jsonify({"success": False, "error": str(e)}) @_flask_app.route("/api/start_download", methods=["POST"]) @login_required def api_start_download(): """Start download task dynamically without restart""" global _app, _client, _download_chat_task if not _client or not _download_chat_task or not _app: return jsonify({ "success": False, "error": "下载服务未就绪,请稍后再试" }) try: data = request.get_json() chat_id = data.get("chat_id", "").strip() start_date = data.get("start_date", "").strip() end_date = data.get("end_date", "").strip() if not chat_id: return jsonify({"success": False, "error": "频道ID不能为空"}) # 优先使用前端传来的完整过滤表达式 custom_filter = data.get("download_filter", "").strip() if custom_filter: download_filter = custom_filter else: download_filter = _build_download_filter(start_date, end_date) if download_filter: download_filter = replace_date_time(download_filter) # Create download config chat_download_config = ChatDownloadConfig() chat_download_config.last_read_message_id = 0 chat_download_config.download_filter = download_filter # Create task node node = TaskNode(chat_id=chat_id) # Also update the app's chat_download_config for persistence _app.chat_download_config[chat_id] = chat_download_config # Start download task in the event loop async def start_task(): try: chat = await _client.get_chat(chat_id) logger.info(f"Starting download from web UI: {chat.title or chat_id}") await _download_chat_task(_client, chat_download_config, node) except Exception as e: logger.error(f"Error starting download task: {e}") raise e # Schedule the task future = asyncio.run_coroutine_threadsafe(start_task(), _app.loop) # Wait a short time to check if task started successfully try: future.result(timeout=5) except asyncio.TimeoutError: # Task is running, which is expected pass except Exception as e: return jsonify({ "success": False, "error": f"启动下载任务失败: {str(e)}" }) return jsonify({ "success": True, "message": f"下载任务已启动: {chat_id}" }) except Exception as e: logger.exception(f"Error in api_start_download: {e}") return jsonify({"success": False, "error": str(e)}) # ============ 代理连通性测试 ============ @_flask_app.route("/api/test_proxy", methods=["POST"]) def api_test_proxy(): """测试代理连通性,无需登录,向导配置时使用""" import time as _time import requests as _req data = request.get_json() or {} scheme = str(data.get("scheme", "")).strip() hostname = str(data.get("hostname", "")).strip() port_raw = data.get("port", 0) # 若未传参数,从配置文件读取 if not hostname or not port_raw: try: yaml_inst = YAML() with open(CONFIG_FILE_PATH, "r", encoding="utf-8") as f: config = yaml_inst.load(f) proxy_cfg = config.get("proxy", {}) or {} scheme = scheme or str(proxy_cfg.get("scheme", "http")) hostname = hostname or str(proxy_cfg.get("hostname", "")) port_raw = port_raw or proxy_cfg.get("port", 0) except Exception as e: return jsonify({"success": False, "error": f"读取配置失败:{e}"}) if not hostname or not port_raw: return jsonify({"success": False, "error": "未配置代理地址或端口"}) try: port = int(port_raw) except (ValueError, TypeError): return jsonify({"success": False, "error": "端口号必须是数字"}) proxy_url = f"{scheme}://{hostname}:{port}" proxies = {"http": proxy_url, "https": proxy_url} try: t0 = _time.time() resp = _req.get( "https://api.telegram.org", proxies=proxies, timeout=8, allow_redirects=True, ) latency = round((_time.time() - t0) * 1000) return jsonify({ "success": True, "latency": latency, "message": f"代理连接成功,延迟 {latency} ms", }) except _req.exceptions.ProxyError: return jsonify({"success": False, "error": "代理认证失败或协议不匹配,请检查代理类型(socks5/http)"}) except _req.exceptions.ConnectionError: return jsonify({"success": False, "error": f"代理地址不可达({hostname}:{port}),请确认代理软件已启动"}) except _req.exceptions.Timeout: return jsonify({"success": False, "error": "代理连接超时(8秒),节点可能不可用,请切换节点后重试"}) except Exception as e: return jsonify({"success": False, "error": f"测试失败:{str(e)}"}) # ============ 过滤器语法校验 ============ # 字段名 → 中文名映射,用于错误提示 _FILTER_FIELD_NAMES = { "message_date": "消息日期", "message_id": "消息ID", "message_caption": "消息说明", "media_file_size": "文件大小", "media_width": "视频宽度", "media_height": "视频高度", "media_file_name": "文件名", "media_duration": "视频时长", "sender_id": "发送者ID", "sender_name": "发送者名称", "reply_to_message_id": "回复消息ID", } _VALID_FILTER_FIELDS = set(_FILTER_FIELD_NAMES.keys()) | { "id", "caption", "file_size", "file_name", "media_type", "file_extension", "message_thread_id", "topic_id", } def _localize_filter_error(error: str) -> str: """将过滤器错误信息转为中文友好提示""" if not error: return error if "Undefined name" in error: # 提取字段名 import re as _re m = _re.search(r"Undefined name['\s]+([a-zA-Z_][a-zA-Z0-9_]*)", error) field = m.group(1) if m else "" valid = "、".join(list(_FILTER_FIELD_NAMES.keys())[:6]) + " 等" return f"未知字段「{field}」,可用字段:{valid}" if "Syntax error at EOF" in error: return "表达式不完整,请检查结尾是否缺少值" if "Syntax error at" in error: import re as _re m = _re.search(r"Syntax error at '(.+?)'", error) token = m.group(1) if m else "" return f"语法错误:「{token}」位置不正确,请检查操作符或引号" if "is str but" in error or "is int but" in error or "is datetime but" in error: return f"类型不匹配:{error}(字符串字段请用单引号括起来,如 'abc')" return error @_flask_app.route("/api/validate_filter", methods=["POST"]) @login_required def api_validate_filter(): """校验过滤器表达式语法,返回 {valid, error}""" from datetime import datetime as _dt from module.filter import Filter from utils.meta_data import MetaData try: data = request.get_json() or {} filter_str = (data.get("filter") or "").strip() if not filter_str: return jsonify({"valid": True, "error": ""}) # 用带类型值的虚拟元数据执行校验(确保类型检查也能通过) dummy = MetaData( message_date=_dt(2024, 1, 1), message_id=1, message_caption="test caption", media_file_size=1024 * 1024 * 10, media_width=1920, media_height=1080, media_file_name="test_video.mp4", media_duration=120, sender_id=123456, sender_name="TestUser", reply_to_message_id=0, ) f = Filter() f.set_meta_data(dummy) valid, error = f.check_filter(filter_str) return jsonify({ "valid": valid, "error": _localize_filter_error(error) if error else "", }) except Exception as e: return jsonify({"valid": False, "error": _localize_filter_error(str(e))}) # ============ 设置页面 ============ @_flask_app.route("/settings") @login_required def settings_page(): """设置页面""" return render_template("settings.html") @_flask_app.route("/api/settings/general", methods=["GET", "POST"]) @login_required def api_settings_general(): """读取或保存常规设置(并发数、日志级别、保存路径、代理)""" yaml_inst = YAML() yaml_inst.preserve_quotes = True if request.method == "GET": with open(CONFIG_FILE_PATH, "r", encoding="utf-8") as f: config = yaml_inst.load(f) proxy = config.get("proxy") or {} return jsonify({ "max_download_task": config.get("max_download_task", 5), "log_level": config.get("log_level", "INFO"), "save_path": config.get("save_path", ""), "proxy": { "scheme": proxy.get("scheme", "socks5"), "hostname": proxy.get("hostname", ""), "port": proxy.get("port", 7890), }, }) # POST — 写入配置并重启 data = request.get_json(silent=True) or {} with open(CONFIG_FILE_PATH, "r", encoding="utf-8") as f: config = yaml_inst.load(f) if "max_download_task" in data: v = int(data["max_download_task"]) if not (1 <= v <= 20): return jsonify({"success": False, "error": "并发数须在 1-20 之间"}) config["max_download_task"] = v if "log_level" in data: config["log_level"] = str(data["log_level"]) if "save_path" in data and data["save_path"]: config["save_path"] = str(data["save_path"]) if "proxy" in data: p = data["proxy"] if p.get("hostname"): config["proxy"] = { "scheme": p.get("scheme", "socks5"), "hostname": p["hostname"], "port": int(p.get("port", 7890)), } else: config.pop("proxy", None) with open(CONFIG_FILE_PATH, "w", encoding="utf-8") as f: yaml_inst.dump(config, f) if _app: _app.restart_program = True return jsonify({"success": True}) @_flask_app.route("/api/settings/logs") @login_required def api_settings_logs(): """读取运行日志,支持分页和过滤""" page = max(1, int(request.args.get("page", 1))) page_size = min(500, max(10, int(request.args.get("page_size", 100)))) level_filter = request.args.get("level", "").upper() keyword = request.args.get("keyword", "").strip() log_path = os.path.join(os.path.abspath("."), "log", "tdl.log") if not os.path.exists(log_path): return jsonify({"lines": [], "total": 0, "page": 1, "pages": 0}) with open(log_path, "r", encoding="utf-8", errors="replace") as f: all_lines = f.readlines() # 倒序,最新在前 all_lines = [l.rstrip("\n") for l in reversed(all_lines) if l.strip()] # 过滤 if level_filter and level_filter != "ALL": all_lines = [l for l in all_lines if f"| {level_filter}" in l or f"|{level_filter}" in l] if keyword: kw_lower = keyword.lower() all_lines = [l for l in all_lines if kw_lower in l.lower()] total = len(all_lines) pages = max(1, (total + page_size - 1) // page_size) start = (page - 1) * page_size lines = all_lines[start: start + page_size] return jsonify({"lines": lines, "total": total, "page": page, "pages": pages}) @_flask_app.route("/api/settings/db_records") @login_required def api_settings_db_records(): """查询数据库下载记录,支持分页和多条件过滤""" import module.database as db page = max(1, int(request.args.get("page", 1))) page_size = min(200, max(10, int(request.args.get("page_size", 50)))) offset = (page - 1) * page_size records, total = db.query_records( chat_id=request.args.get("chat_id", ""), file_name=request.args.get("file_name", ""), status=request.args.get("status", ""), media_type=request.args.get("media_type", ""), date_from=request.args.get("date_from", ""), date_to=request.args.get("date_to", ""), limit=page_size, offset=offset, ) pages = max(1, (total + page_size - 1) // page_size) return jsonify({"records": records, "total": total, "page": page, "pages": pages}) @_flask_app.route("/api/settings/clear_session", methods=["POST"]) @login_required def api_settings_clear_session(): """删除 session 文件并重启,用于切换 TG 账户""" session_path = os.path.join( os.path.abspath("."), "appdata", "sessions", "media_downloader.session" ) if os.path.exists(session_path): try: os.remove(session_path) except Exception as e: return jsonify({"success": False, "error": f"删除 session 失败: {e}"}) if _app: _app.restart_program = True return jsonify({"success": True})