Files
telegram-downloader/module/web.py
T
yuming a697ce8852
部署到群晖 / deploy (push) Successful in 46s
feat: 宿主机 ↔ 容器路径双向映射
config.yaml 新增 path_mapping 段声明宿主机前缀与容器挂载点前缀的对应关系。
用户在 Web 界面和 config 里填宿主机真实路径(如群晖 /volume2/...),
程序在真正落盘、调用云盘 SDK 前透明转成容器内路径,保证文件写到 bind mount
而不是容器 overlay 文件系统;前端显示/返回的路径再反向转回宿主机形式。

无映射配置时所有 to_container/to_host 均原样返回,向后兼容老配置。

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-24 21:43:55 +08:00

1317 lines
44 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""web ui for media download"""
import asyncio
import json
import logging
import os
import threading
from datetime import datetime
from typing import Callable, Optional
import pyrogram
from flask import Flask, jsonify, render_template, request
from flask_login import LoginManager, UserMixin, login_required, login_user
from loguru import logger
from ruamel.yaml import YAML
from werkzeug.serving import make_server
import utils
from module import path_mapper
from module.app import Application, ChatDownloadConfig, TaskNode
from module.download_stat import (
DownloadState,
clear_completed_chats,
set_task_queue,
get_download_result,
get_download_state,
get_total_download_speed,
get_task_progress,
set_download_state,
pause_message,
resume_message,
skip_message,
is_message_paused,
remove_download_entry,
)
from utils.crypto import AesBase64
from utils.format import format_byte, replace_date_time
# Config file path
CONFIG_FILE_PATH = "config.yaml"
CHANNEL_HISTORY_FILE = os.path.join(os.path.abspath("."), "appdata", "channel_history.json")
log = logging.getLogger("werkzeug")
log.setLevel(logging.ERROR)
_flask_app = Flask(__name__)
_flask_app.secret_key = "tdl"
_login_manager = LoginManager()
_login_manager.login_view = "login"
_login_manager.init_app(_flask_app)
web_login_users: dict = {}
deAesCrypt = AesBase64("1234123412ABCDEF", "ABCDEF1234123412")
# Global references for dynamic download
_app: Optional[Application] = None
_client: Optional[pyrogram.Client] = None
_add_download_task: Optional[Callable] = None
_download_chat_task: Optional[Callable] = None
_web_server = None # Reference to the web server for shutdown
class User(UserMixin):
"""Web Login User"""
def __init__(self):
self.sid = "root"
@property
def id(self):
"""ID"""
return self.sid
@_login_manager.user_loader
def load_user(_):
"""
Load a user object from the user ID.
Returns:
User: The user object.
"""
return User()
def get_flask_app() -> Flask:
"""get flask app instance"""
return _flask_app
# pylint: disable = W0603
def init_web(
app: Application,
client: pyrogram.Client = None,
add_download_task: Callable = None,
download_chat_task: Callable = None,
):
"""
Set the value of the users variable.
Args:
app: The application instance.
client: The pyrogram client instance.
add_download_task: Function to add download task.
download_chat_task: Function to download chat task.
Returns:
None.
"""
global web_login_users, _app, _client, _add_download_task, _download_chat_task, _web_server
_app = app
_client = client
_add_download_task = add_download_task
_download_chat_task = download_chat_task
if app.web_login_secret:
web_login_users = {"root": app.web_login_secret}
else:
_flask_app.config["LOGIN_DISABLED"] = True
# Create a stoppable server
_web_server = make_server(app.web_host, app.web_port, _flask_app, threaded=True)
def run_server():
_web_server.serve_forever()
threading.Thread(target=run_server, daemon=True).start()
def shutdown_web():
"""Shutdown the web server"""
global _web_server
if _web_server:
logger.info("正在关闭 Web 服务器...")
_web_server.shutdown()
_web_server = None
@_flask_app.route("/login", methods=["GET", "POST"])
def login():
"""
Function to handle the login route.
Parameters:
- No parameters
Returns:
- If the request method is "POST" and the username and
password match the ones in the web_login_users dictionary,
it returns a JSON response with a code of "1".
- Otherwise, it returns a JSON response with a code of "0".
- If the request method is not "POST", it returns the rendered "login.html" template.
"""
if request.method == "POST":
username = "root"
web_login_form = {}
for key, value in request.form.items():
if value:
value = deAesCrypt.decrypt(value)
web_login_form[key] = value
if not web_login_form.get("password"):
return jsonify({"code": "0"})
password = web_login_form["password"]
if username in web_login_users and web_login_users[username] == password:
user = User()
login_user(user)
return jsonify({"code": "1"})
return jsonify({"code": "0"})
return render_template("login.html")
@_flask_app.route("/")
@login_required
def index():
"""Index html"""
return render_template(
"index.html",
download_state=(
"pause" if get_download_state() is DownloadState.Downloading else "continue"
),
)
@_flask_app.route("/get_download_status")
@login_required
def get_download_speed():
"""Get download speed and state"""
is_downloading = get_download_state() is DownloadState.Downloading
state = "pause" if is_downloading else "continue"
# 暂停状态时速度直接返回0
speed = format_byte(get_total_download_speed()) if is_downloading else "0 B"
return (
'{ "download_speed" : "'
+ speed
+ '/s" , "upload_speed" : "0.00 B/s", "state" : "' + state + '" }'
)
def _compute_current_task_queue() -> list:
"""动态构造当前任务队列。
由于 restart 走 os.execvdownload_stat 里的 _task_queue 全局变量会丢;
所以真实来源以 _app.chat_download_configconfig.yaml 重载后的结果)为准,
chat_title 从 channel_history 查。
"""
global _app
if not _app or not _app.chat_download_config:
return []
history_map = {}
for h in _load_channel_history():
cid = str(h.get("chat_id", "") or "")
if cid:
history_map[cid] = h.get("chat_title", "") or cid
queue = []
for chat_id, cfg in _app.chat_download_config.items():
cid_str = str(chat_id)
queue.append({
"chat_id": cid_str,
"chat_title": history_map.get(cid_str, cid_str),
"download_filter": getattr(cfg, "download_filter", "") or "",
})
return queue
@_flask_app.route("/api/task_progress")
@login_required
def api_task_progress():
"""Get current task progress including skipped files"""
progress = get_task_progress()
# 覆盖 task_queuedownload_stat 里的全局变量在 os.execv 重启后会丢,用 app 动态算才稳
progress["task_queue"] = _compute_current_task_queue()
return jsonify(progress)
@_flask_app.route("/set_download_state", methods=["POST"])
@login_required
def web_set_download_state():
"""Set download state"""
state = request.args.get("state")
if state == "continue" and get_download_state() is DownloadState.StopDownload:
set_download_state(DownloadState.Downloading)
return "pause"
if state == "pause" and get_download_state() is DownloadState.Downloading:
set_download_state(DownloadState.StopDownload)
return "continue"
return state
@_flask_app.route("/api/message_control", methods=["POST"])
@login_required
def api_message_control():
"""单条下载任务控制:暂停/继续/跳过"""
import module.database as db
data = request.get_json(silent=True) or {}
chat_id = str(data.get("chat_id", ""))
message_id = int(data.get("message_id", 0))
action = data.get("action", "")
if not chat_id or not message_id:
return jsonify({"success": False, "error": "缺少参数"}), 400
if action == "pause":
pause_message(chat_id, message_id)
elif action == "resume":
resume_message(chat_id, message_id)
elif action == "skip":
file_name = str(data.get("file_name", ""))
skip_message(chat_id, message_id)
remove_download_entry(chat_id, message_id)
db.record_skip(chat_id, "", message_id, file_name)
else:
return jsonify({"success": False, "error": "未知操作"}), 400
return jsonify({"success": True})
@_flask_app.route("/api/undo_skip", methods=["POST"])
@login_required
def api_undo_skip():
"""撤销手动跳过,删除数据库记录"""
import module.database as db
data = request.get_json(silent=True) or {}
chat_id = str(data.get("chat_id", ""))
message_id = int(data.get("message_id", 0))
if not chat_id or not message_id:
return jsonify({"success": False, "error": "缺少参数"}), 400
db.delete_record(chat_id, message_id)
return jsonify({"success": True})
@_flask_app.route("/get_app_version")
def get_app_version():
"""Get telegram_media_downloader version"""
return utils.__version__
@_flask_app.route("/get_download_list")
@login_required
def get_download_list():
"""get download list"""
import module.database as db
import json
if request.args.get("already_down") is None:
return "[]"
already_down = request.args.get("already_down") == "true"
if already_down:
# 已完成列表从数据库读取,重启后不丢失,支持分页
try:
page = max(1, int(request.args.get("page", 1)))
except (ValueError, TypeError):
page = 1
try:
page_size = min(100, max(10, int(request.args.get("page_size", 50))))
except (ValueError, TypeError):
page_size = 50
offset = (page - 1) * page_size
records = db.get_recent_history(limit=page_size, offset=offset)
total = db.count_recent_history()
items = []
for r in records:
items.append({
"chat": r.get("chat_title") or r.get("chat_id", ""),
"chat_id": r.get("chat_id", ""),
"id": str(r.get("message_id", "")),
"filename": r.get("file_name", ""),
"total_size": format_byte(r.get("file_size") or 0),
"download_progress": "100",
"download_speed": r.get("download_time", ""),
"save_path": path_mapper.to_host((r.get("file_path") or "").replace("\\", "/")),
"status": r.get("status", "success"),
})
return json.dumps({
"items": items,
"total": total,
"page": page,
"page_size": page_size,
"total_pages": max(1, (total + page_size - 1) // page_size),
}, ensure_ascii=False)
download_result = get_download_result()
items = []
for chat_id, messages in download_result.items():
for idx, value in messages.items():
is_already_down = value["down_byte"] == value["total_size"]
if is_already_down:
continue
download_speed = format_byte(value["download_speed"]) + "/s"
items.append({
"chat": str(chat_id),
"id": str(idx),
"filename": os.path.basename(value["file_name"]),
"total_size": format_byte(value["total_size"]),
"download_progress": str(round(value["down_byte"] / value["total_size"] * 100, 1)),
"download_speed": download_speed,
"save_path": path_mapper.to_host(value["file_name"].replace("\\", "/")),
"paused": is_message_paused(str(chat_id), idx),
})
return json.dumps(items, ensure_ascii=False)
@_flask_app.route("/control")
@login_required
def control_page():
"""Control page - redirects to unified index page"""
return render_template(
"index.html",
download_state=(
"pause" if get_download_state() is DownloadState.Downloading else "continue"
),
)
@_flask_app.route("/api/setup_status")
def api_setup_status():
"""检测配置完成度,无需登录,新用户引导使用"""
try:
yaml_inst = YAML()
with open(CONFIG_FILE_PATH, "r", encoding="utf-8") as f:
config = yaml_inst.load(f)
api_id = config.get("api_id", 0) or 0
api_hash = str(config.get("api_hash", "") or "")
has_api_credentials = (
bool(api_id) and str(api_id) != "0"
and bool(api_hash) and len(api_hash) >= 8
and api_hash not in ("your_api_hash", "YOUR_API_HASH", "")
)
session_paths = [
os.path.join("sessions", "media_downloader.session"),
"media_downloader.session",
]
has_session = any(os.path.exists(p) for p in session_paths)
chats = config.get("chat", []) or []
has_chat = bool(chats and chats[0].get("chat_id"))
proxy = dict(config.get("proxy", {}) or {})
return jsonify({
"has_api_credentials": has_api_credentials,
"has_session": has_session,
"has_chat": has_chat,
"proxy": proxy,
"save_path": config.get("save_path", ""),
})
except Exception as e:
logger.exception(f"setup_status error: {e}")
return jsonify({"has_api_credentials": False, "error": str(e)})
@_flask_app.route("/api/save_initial_config", methods=["POST"])
def api_save_initial_config():
"""保存初始配置(API 凭证 + 代理 + 路径),无需登录"""
global _app
try:
data = request.get_json()
api_id_raw = data.get("api_id", "")
api_hash = str(data.get("api_hash", "")).strip()
save_path = str(data.get("save_path", "")).strip()
proxy_enabled = bool(data.get("proxy_enabled", False))
proxy_scheme = str(data.get("proxy_scheme", "socks5")).strip()
proxy_hostname = str(data.get("proxy_hostname", "")).strip()
proxy_port_raw = data.get("proxy_port", 7891)
try:
api_id = int(api_id_raw)
except (ValueError, TypeError):
return jsonify({"success": False, "error": "api_id 必须是数字"})
try:
proxy_port = int(proxy_port_raw)
except (ValueError, TypeError):
proxy_port = 7891
if not api_id or not api_hash:
return jsonify({"success": False, "error": "api_id 和 api_hash 不能为空"})
yaml_inst = YAML()
yaml_inst.preserve_quotes = True
with open(CONFIG_FILE_PATH, "r", encoding="utf-8") as f:
config = yaml_inst.load(f)
config["api_id"] = api_id
config["api_hash"] = api_hash
if save_path:
config["save_path"] = save_path
if proxy_enabled and proxy_hostname:
config["proxy"] = {
"scheme": proxy_scheme,
"hostname": proxy_hostname,
"port": proxy_port,
}
elif not proxy_enabled and "proxy" in config:
del config["proxy"]
with open(CONFIG_FILE_PATH, "w", encoding="utf-8") as f:
yaml_inst.dump(config, f)
if _app:
_app.restart_program = True
return jsonify({"success": True, "message": "配置已保存,程序即将重启"})
except Exception as e:
logger.exception(f"save_initial_config error: {e}")
return jsonify({"success": False, "error": str(e)})
@_flask_app.route("/api/get_config")
@login_required
def api_get_config():
"""Get current configuration"""
try:
yaml = YAML()
with open(CONFIG_FILE_PATH, "r", encoding="utf-8") as f:
config = yaml.load(f)
chat_config = config.get("chat", [{}])[0] if config.get("chat") else {}
return jsonify({
"chat_id": chat_config.get("chat_id", ""),
"download_filter": chat_config.get("download_filter", ""),
"save_path": config.get("save_path", "")
})
except Exception as e:
return jsonify({"error": str(e)}), 500
def _build_download_filter(start_date: str, end_date: str) -> str:
"""Build download filter string from date range"""
if not start_date:
return ""
download_filter = f"message_date >= {start_date} 00:00:00"
if end_date:
download_filter += f" and message_date <= {end_date} 23:59:59"
return download_filter
# ============ Channel History Management ============
def _load_channel_history() -> list:
"""Load channel history from file"""
try:
if os.path.exists(CHANNEL_HISTORY_FILE):
with open(CHANNEL_HISTORY_FILE, "r", encoding="utf-8") as f:
return json.load(f)
except Exception as e:
logger.warning(f"Failed to load channel history: {e}")
return []
def _save_channel_history(history: list):
"""Save channel history to file"""
try:
with open(CHANNEL_HISTORY_FILE, "w", encoding="utf-8") as f:
json.dump(history, f, ensure_ascii=False, indent=2)
except Exception as e:
logger.warning(f"Failed to save channel history: {e}")
def _add_to_channel_history(chat_id: str, chat_title: str, chat_type: str = ""):
"""Add or update a channel in history"""
history = _load_channel_history()
# Check if already exists, update if so
for item in history:
if item.get("chat_id") == chat_id:
item["chat_title"] = chat_title or item.get("chat_title", chat_id)
item["chat_type"] = chat_type or item.get("chat_type", "")
item["last_used"] = datetime.now().isoformat()
item["use_count"] = item.get("use_count", 0) + 1
break
else:
# Not found, add new
history.insert(0, {
"chat_id": chat_id,
"chat_title": chat_title or chat_id,
"chat_type": chat_type,
"last_used": datetime.now().isoformat(),
"use_count": 1
})
# Sort by last_used (most recent first) and limit to 20 items
history.sort(key=lambda x: x.get("last_used", ""), reverse=True)
history = history[:20]
_save_channel_history(history)
def _remove_from_channel_history(chat_id: str) -> bool:
"""Remove a channel from history"""
history = _load_channel_history()
original_len = len(history)
history = [item for item in history if item.get("chat_id") != chat_id]
if len(history) < original_len:
_save_channel_history(history)
return True
return False
@_flask_app.route("/api/channel_history")
@login_required
def api_get_channel_history():
"""Get channel history list"""
try:
history = _load_channel_history()
return jsonify({"success": True, "history": history})
except Exception as e:
logger.exception(f"Error getting channel history: {e}")
return jsonify({"success": False, "error": str(e)})
@_flask_app.route("/api/channel_history/<chat_id>", methods=["DELETE"])
@login_required
def api_delete_channel_history(chat_id: str):
"""Delete a channel from history"""
try:
if _remove_from_channel_history(chat_id):
return jsonify({"success": True, "message": "已删除"})
return jsonify({"success": False, "error": "未找到该频道"})
except Exception as e:
logger.exception(f"Error deleting channel history: {e}")
return jsonify({"success": False, "error": str(e)})
@_flask_app.route("/api/channel_history/clear", methods=["POST"])
@login_required
def api_clear_channel_history():
"""Clear all channel history"""
try:
_save_channel_history([])
return jsonify({"success": True, "message": "历史记录已清空"})
except Exception as e:
logger.exception(f"Error clearing channel history: {e}")
return jsonify({"success": False, "error": str(e)})
def _update_chat_config(chat_id: str, download_filter: str, update_memory: bool = False) -> dict:
"""
Unified function to update chat configuration.
Args:
chat_id: The chat/channel ID
download_filter: The download filter string
update_memory: Whether to also update in-memory config
Returns:
dict with keys: success, error (if failed), chat_title (if available)
"""
global _app
try:
# Read current config
yaml = YAML()
yaml.preserve_quotes = True
with open(CONFIG_FILE_PATH, "r", encoding="utf-8") as f:
config = yaml.load(f)
# Update chat config in file
if "chat" not in config or not config["chat"]:
config["chat"] = [{}]
config["chat"][0]["chat_id"] = chat_id
config["chat"][0]["last_read_message_id"] = 0
if download_filter:
config["chat"][0]["download_filter"] = download_filter
elif "download_filter" in config["chat"][0]:
del config["chat"][0]["download_filter"]
# Write config to file
with open(CONFIG_FILE_PATH, "w", encoding="utf-8") as f:
yaml.dump(config, f)
# Update in-memory config if requested
if update_memory and _app:
# Update _app.config
if "chat" not in _app.config or not _app.config["chat"]:
_app.config["chat"] = [{}]
_app.config["chat"][0]["chat_id"] = chat_id
_app.config["chat"][0]["last_read_message_id"] = 0
if download_filter:
_app.config["chat"][0]["download_filter"] = download_filter
elif "download_filter" in _app.config["chat"][0]:
del _app.config["chat"][0]["download_filter"]
# Clear old chat_download_config and add new one
_app.chat_download_config.clear()
new_config = ChatDownloadConfig()
new_config.last_read_message_id = 0
new_config.download_filter = download_filter
_app.chat_download_config[chat_id] = new_config
return {"success": True}
except Exception as e:
logger.exception(f"Error updating config: {e}")
return {"success": False, "error": str(e)}
def _update_chat_configs_multi(items: list, update_memory: bool = True) -> dict:
"""
多频道版本:把 items 数组写入 config["chat"],完整覆盖原数组。
每个 item 形如 {"chat_id": "xxx", "download_filter": "xxx"}。
同步更新内存里的 _app.chat_download_config。
"""
global _app
if not items:
return {"success": False, "error": "队列为空"}
try:
yaml = YAML()
yaml.preserve_quotes = True
with open(CONFIG_FILE_PATH, "r", encoding="utf-8") as f:
config = yaml.load(f)
# 按队列顺序重建 chat 数组
new_chat_list = []
for item in items:
chat_id = str(item.get("chat_id", "")).strip()
download_filter = str(item.get("download_filter", "") or "").strip()
if not chat_id:
continue
entry = {"chat_id": chat_id, "last_read_message_id": 0}
if download_filter:
entry["download_filter"] = download_filter
new_chat_list.append(entry)
if not new_chat_list:
return {"success": False, "error": "队列中没有有效的频道"}
config["chat"] = new_chat_list
with open(CONFIG_FILE_PATH, "w", encoding="utf-8") as f:
yaml.dump(config, f)
# 同步内存配置
if update_memory and _app:
_app.config["chat"] = new_chat_list
_app.chat_download_config.clear()
for entry in new_chat_list:
c = ChatDownloadConfig()
c.last_read_message_id = 0
c.download_filter = entry.get("download_filter", "")
_app.chat_download_config[entry["chat_id"]] = c
return {"success": True, "count": len(new_chat_list)}
except Exception as e:
logger.exception(f"Error updating multi chat config: {e}")
return {"success": False, "error": str(e)}
async def _validate_chat(chat_id: str) -> dict:
"""
Validate chat/channel and get its info.
Returns:
dict with keys: valid, chat_id, chat_title, error (if invalid)
"""
global _client
if not _client:
return {"valid": False, "error": "Telegram 客户端未就绪"}
try:
chat = await _client.get_chat(chat_id)
return {
"valid": True,
"chat_id": str(chat.id),
"chat_title": chat.title or chat.first_name or chat_id,
"chat_type": str(chat.type).split(".")[-1] if chat.type else "unknown"
}
except pyrogram.errors.exceptions.bad_request_400.UsernameNotOccupied:
return {"valid": False, "error": f"频道/群组 '{chat_id}' 不存在"}
except pyrogram.errors.exceptions.bad_request_400.PeerIdInvalid:
return {"valid": False, "error": f"无效的频道/群组 ID: {chat_id}"}
except Exception as e:
return {"valid": False, "error": f"验证失败: {str(e)}"}
@_flask_app.route("/api/validate_chat", methods=["POST"])
@login_required
def api_validate_chat():
"""Validate chat/channel and return its info"""
global _app
try:
data = request.get_json()
chat_id = data.get("chat_id", "").strip()
if not chat_id:
return jsonify({"valid": False, "error": "频道ID不能为空"})
if not _app or not _client:
return jsonify({"valid": False, "error": "服务未就绪,请稍后再试"})
# Run validation in event loop
future = asyncio.run_coroutine_threadsafe(_validate_chat(chat_id), _app.loop)
result = future.result(timeout=10)
return jsonify(result)
except asyncio.TimeoutError:
return jsonify({"valid": False, "error": "验证超时,请检查网络连接"})
except Exception as e:
logger.exception(f"Error validating chat: {e}")
return jsonify({"valid": False, "error": str(e)})
@_flask_app.route("/api/save_path", methods=["POST"])
@login_required
def api_save_path():
"""修改文件保存路径"""
try:
data = request.get_json()
save_path = str(data.get("save_path", "")).strip()
if not save_path:
return jsonify({"success": False, "error": "路径不能为空"})
yaml_inst = YAML()
yaml_inst.preserve_quotes = True
with open(CONFIG_FILE_PATH, "r", encoding="utf-8") as f:
config = yaml_inst.load(f)
config["save_path"] = save_path
with open(CONFIG_FILE_PATH, "w", encoding="utf-8") as f:
yaml_inst.dump(config, f)
# 同步更新内存中的配置
if _app:
_app.save_path = save_path
return jsonify({"success": True, "save_path": save_path})
except Exception as e:
logger.exception(f"save_path error: {e}")
return jsonify({"success": False, "error": str(e)})
@_flask_app.route("/api/set_config", methods=["POST"])
@login_required
def api_set_config():
"""Set download configuration (file only, no restart)"""
try:
data = request.get_json()
chat_id = data.get("chat_id", "").strip()
start_date = data.get("start_date", "").strip()
end_date = data.get("end_date", "").strip()
if not chat_id:
return jsonify({"success": False, "error": "频道ID不能为空"})
download_filter = _build_download_filter(start_date, end_date)
result = _update_chat_config(chat_id, download_filter, update_memory=False)
return jsonify(result)
except Exception as e:
logger.exception(f"Error in api_set_config: {e}")
return jsonify({"success": False, "error": str(e)})
@_flask_app.route("/api/save_and_restart", methods=["POST"])
@login_required
def api_save_and_restart():
"""Save configuration and restart the program"""
global _app
try:
data = request.get_json()
chat_id = data.get("chat_id", "").strip()
start_date = data.get("start_date", "").strip()
end_date = data.get("end_date", "").strip()
chat_title = data.get("chat_title", "") # Optional, from validation
chat_type = data.get("chat_type", "") # Optional, from validation
if not chat_id:
return jsonify({"success": False, "error": "频道ID不能为空"})
# 优先使用前端传来的完整过滤表达式,否则从日期范围构建
custom_filter = data.get("download_filter", "").strip()
if custom_filter:
download_filter = custom_filter
else:
download_filter = _build_download_filter(start_date, end_date)
# Update both file and memory
result = _update_chat_config(chat_id, download_filter, update_memory=True)
if not result.get("success"):
return jsonify(result)
# Add to channel history
_add_to_channel_history(chat_id, chat_title, chat_type)
# 重置队列总览元信息(单频道场景队列就一项)
clear_completed_chats()
set_task_queue([{"chat_id": chat_id, "chat_title": chat_title or chat_id}])
# Trigger restart
if _app:
_app.restart_program = True
logger.info(f"Restart flag set, new chat_id: {chat_id} ({chat_title})")
return jsonify({
"success": True,
"message": f"配置已保存,正在重启下载 {chat_title or chat_id}..."
})
except Exception as e:
logger.exception(f"Error in api_save_and_restart: {e}")
return jsonify({"success": False, "error": str(e)})
@_flask_app.route("/api/save_and_restart_multi", methods=["POST"])
@login_required
def api_save_and_restart_multi():
"""多频道版本的保存并重启。接收 items 数组,每项含 chat_id + download_filter。"""
global _app
try:
data = request.get_json() or {}
items = data.get("items", [])
if not isinstance(items, list) or not items:
return jsonify({"success": False, "error": "队列为空"})
# 规范化 items:每项必须有 chat_iddownload_filter 可由 start_date/end_date 构建
normalized = []
for it in items:
cid = str(it.get("chat_id", "") or "").strip()
if not cid:
continue
flt = str(it.get("download_filter", "") or "").strip()
if not flt:
sd = str(it.get("start_date", "") or "").strip()
ed = str(it.get("end_date", "") or "").strip()
if sd:
flt = _build_download_filter(sd, ed)
normalized.append({
"chat_id": cid,
"download_filter": flt,
"chat_title": it.get("chat_title", "") or "",
"chat_type": it.get("chat_type", "") or "",
})
if not normalized:
return jsonify({"success": False, "error": "没有有效的频道"})
result = _update_chat_configs_multi(normalized, update_memory=True)
if not result.get("success"):
return jsonify(result)
# 历史记录更新
for it in normalized:
_add_to_channel_history(it["chat_id"], it["chat_title"], it["chat_type"])
# 重置并设置本次任务的完整队列,供前端渲染"任务队列"卡片
clear_completed_chats()
set_task_queue([
{"chat_id": it["chat_id"], "chat_title": it["chat_title"] or it["chat_id"]}
for it in normalized
])
# 触发重启
if _app:
_app.restart_program = True
logger.info(f"Restart flag set, {len(normalized)} chats queued")
return jsonify({
"success": True,
"count": len(normalized),
"message": f"已保存 {len(normalized)} 个频道任务,正在重启..."
})
except Exception as e:
logger.exception(f"Error in api_save_and_restart_multi: {e}")
return jsonify({"success": False, "error": str(e)})
@_flask_app.route("/api/start_download", methods=["POST"])
@login_required
def api_start_download():
"""Start download task dynamically without restart"""
global _app, _client, _download_chat_task
if not _client or not _download_chat_task or not _app:
return jsonify({
"success": False,
"error": "下载服务未就绪,请稍后再试"
})
try:
data = request.get_json()
chat_id = data.get("chat_id", "").strip()
start_date = data.get("start_date", "").strip()
end_date = data.get("end_date", "").strip()
if not chat_id:
return jsonify({"success": False, "error": "频道ID不能为空"})
# 优先使用前端传来的完整过滤表达式
custom_filter = data.get("download_filter", "").strip()
if custom_filter:
download_filter = custom_filter
else:
download_filter = _build_download_filter(start_date, end_date)
if download_filter:
download_filter = replace_date_time(download_filter)
# Create download config
chat_download_config = ChatDownloadConfig()
chat_download_config.last_read_message_id = 0
chat_download_config.download_filter = download_filter
# Create task node
node = TaskNode(chat_id=chat_id)
# Also update the app's chat_download_config for persistence
_app.chat_download_config[chat_id] = chat_download_config
# Start download task in the event loop
async def start_task():
try:
chat = await _client.get_chat(chat_id)
logger.info(f"Starting download from web UI: {chat.title or chat_id}")
await _download_chat_task(_client, chat_download_config, node)
except Exception as e:
logger.error(f"Error starting download task: {e}")
raise e
# Schedule the task
future = asyncio.run_coroutine_threadsafe(start_task(), _app.loop)
# Wait a short time to check if task started successfully
try:
future.result(timeout=5)
except asyncio.TimeoutError:
# Task is running, which is expected
pass
except Exception as e:
return jsonify({
"success": False,
"error": f"启动下载任务失败: {str(e)}"
})
return jsonify({
"success": True,
"message": f"下载任务已启动: {chat_id}"
})
except Exception as e:
logger.exception(f"Error in api_start_download: {e}")
return jsonify({"success": False, "error": str(e)})
# ============ 代理连通性测试 ============
@_flask_app.route("/api/test_proxy", methods=["POST"])
def api_test_proxy():
"""测试代理连通性,无需登录,向导配置时使用"""
import time as _time
import requests as _req
data = request.get_json() or {}
scheme = str(data.get("scheme", "")).strip()
hostname = str(data.get("hostname", "")).strip()
port_raw = data.get("port", 0)
# 若未传参数,从配置文件读取
if not hostname or not port_raw:
try:
yaml_inst = YAML()
with open(CONFIG_FILE_PATH, "r", encoding="utf-8") as f:
config = yaml_inst.load(f)
proxy_cfg = config.get("proxy", {}) or {}
scheme = scheme or str(proxy_cfg.get("scheme", "http"))
hostname = hostname or str(proxy_cfg.get("hostname", ""))
port_raw = port_raw or proxy_cfg.get("port", 0)
except Exception as e:
return jsonify({"success": False, "error": f"读取配置失败:{e}"})
if not hostname or not port_raw:
return jsonify({"success": False, "error": "未配置代理地址或端口"})
try:
port = int(port_raw)
except (ValueError, TypeError):
return jsonify({"success": False, "error": "端口号必须是数字"})
proxy_url = f"{scheme}://{hostname}:{port}"
proxies = {"http": proxy_url, "https": proxy_url}
try:
t0 = _time.time()
resp = _req.get(
"https://api.telegram.org",
proxies=proxies,
timeout=8,
allow_redirects=True,
)
latency = round((_time.time() - t0) * 1000)
return jsonify({
"success": True,
"latency": latency,
"message": f"代理连接成功,延迟 {latency} ms",
})
except _req.exceptions.ProxyError:
return jsonify({"success": False, "error": "代理认证失败或协议不匹配,请检查代理类型(socks5/http"})
except _req.exceptions.ConnectionError:
return jsonify({"success": False, "error": f"代理地址不可达({hostname}:{port}),请确认代理软件已启动"})
except _req.exceptions.Timeout:
return jsonify({"success": False, "error": "代理连接超时(8秒),节点可能不可用,请切换节点后重试"})
except Exception as e:
return jsonify({"success": False, "error": f"测试失败:{str(e)}"})
# ============ 过滤器语法校验 ============
# 字段名 → 中文名映射,用于错误提示
_FILTER_FIELD_NAMES = {
"message_date": "消息日期",
"message_id": "消息ID",
"message_caption": "消息说明",
"media_file_size": "文件大小",
"media_width": "视频宽度",
"media_height": "视频高度",
"media_file_name": "文件名",
"media_duration": "视频时长",
"sender_id": "发送者ID",
"sender_name": "发送者名称",
"reply_to_message_id": "回复消息ID",
}
_VALID_FILTER_FIELDS = set(_FILTER_FIELD_NAMES.keys()) | {
"id", "caption", "file_size", "file_name", "media_type",
"file_extension", "message_thread_id", "topic_id",
}
def _localize_filter_error(error: str) -> str:
"""将过滤器错误信息转为中文友好提示"""
if not error:
return error
if "Undefined name" in error:
# 提取字段名
import re as _re
m = _re.search(r"Undefined name['\s]+([a-zA-Z_][a-zA-Z0-9_]*)", error)
field = m.group(1) if m else ""
valid = "".join(list(_FILTER_FIELD_NAMES.keys())[:6]) + ""
return f"未知字段「{field}」,可用字段:{valid}"
if "Syntax error at EOF" in error:
return "表达式不完整,请检查结尾是否缺少值"
if "Syntax error at" in error:
import re as _re
m = _re.search(r"Syntax error at '(.+?)'", error)
token = m.group(1) if m else ""
return f"语法错误:「{token}」位置不正确,请检查操作符或引号"
if "is str but" in error or "is int but" in error or "is datetime but" in error:
return f"类型不匹配:{error}(字符串字段请用单引号括起来,如 'abc'"
return error
@_flask_app.route("/api/validate_filter", methods=["POST"])
@login_required
def api_validate_filter():
"""校验过滤器表达式语法,返回 {valid, error}"""
from datetime import datetime as _dt
from module.filter import Filter
from utils.meta_data import MetaData
try:
data = request.get_json() or {}
filter_str = (data.get("filter") or "").strip()
if not filter_str:
return jsonify({"valid": True, "error": ""})
# 用带类型值的虚拟元数据执行校验(确保类型检查也能通过)
dummy = MetaData(
message_date=_dt(2024, 1, 1),
message_id=1,
message_caption="test caption",
media_file_size=1024 * 1024 * 10,
media_width=1920,
media_height=1080,
media_file_name="test_video.mp4",
media_duration=120,
sender_id=123456,
sender_name="TestUser",
reply_to_message_id=0,
)
f = Filter()
f.set_meta_data(dummy)
valid, error = f.check_filter(filter_str)
return jsonify({
"valid": valid,
"error": _localize_filter_error(error) if error else "",
})
except Exception as e:
return jsonify({"valid": False, "error": _localize_filter_error(str(e))})
# ============ 设置页面 ============
@_flask_app.route("/settings")
@login_required
def settings_page():
"""设置页面"""
return render_template("settings.html")
@_flask_app.route("/api/settings/general", methods=["GET", "POST"])
@login_required
def api_settings_general():
"""读取或保存常规设置(并发数、日志级别、保存路径、代理)"""
yaml_inst = YAML()
yaml_inst.preserve_quotes = True
if request.method == "GET":
with open(CONFIG_FILE_PATH, "r", encoding="utf-8") as f:
config = yaml_inst.load(f)
proxy = config.get("proxy") or {}
return jsonify({
"max_download_task": config.get("max_download_task", 5),
"log_level": config.get("log_level", "INFO"),
"save_path": config.get("save_path", ""),
"proxy": {
"scheme": proxy.get("scheme", "socks5"),
"hostname": proxy.get("hostname", ""),
"port": proxy.get("port", 7890),
},
})
# POST — 写入配置并重启
data = request.get_json(silent=True) or {}
with open(CONFIG_FILE_PATH, "r", encoding="utf-8") as f:
config = yaml_inst.load(f)
if "max_download_task" in data:
v = int(data["max_download_task"])
if not (1 <= v <= 20):
return jsonify({"success": False, "error": "并发数须在 1-20 之间"})
config["max_download_task"] = v
if "log_level" in data:
config["log_level"] = str(data["log_level"])
if "save_path" in data and data["save_path"]:
config["save_path"] = str(data["save_path"])
if "proxy" in data:
p = data["proxy"]
if p.get("hostname"):
config["proxy"] = {
"scheme": p.get("scheme", "socks5"),
"hostname": p["hostname"],
"port": int(p.get("port", 7890)),
}
else:
config.pop("proxy", None)
with open(CONFIG_FILE_PATH, "w", encoding="utf-8") as f:
yaml_inst.dump(config, f)
if _app:
_app.restart_program = True
return jsonify({"success": True})
@_flask_app.route("/api/settings/logs")
@login_required
def api_settings_logs():
"""读取运行日志,支持分页和过滤"""
page = max(1, int(request.args.get("page", 1)))
page_size = min(500, max(10, int(request.args.get("page_size", 100))))
level_filter = request.args.get("level", "").upper()
keyword = request.args.get("keyword", "").strip()
log_path = os.path.join(os.path.abspath("."), "log", "tdl.log")
if not os.path.exists(log_path):
return jsonify({"lines": [], "total": 0, "page": 1, "pages": 0})
with open(log_path, "r", encoding="utf-8", errors="replace") as f:
all_lines = f.readlines()
# 倒序,最新在前
all_lines = [l.rstrip("\n") for l in reversed(all_lines) if l.strip()]
# 过滤
if level_filter and level_filter != "ALL":
all_lines = [l for l in all_lines if f"| {level_filter}" in l or f"|{level_filter}" in l]
if keyword:
kw_lower = keyword.lower()
all_lines = [l for l in all_lines if kw_lower in l.lower()]
total = len(all_lines)
pages = max(1, (total + page_size - 1) // page_size)
start = (page - 1) * page_size
lines = all_lines[start: start + page_size]
return jsonify({"lines": lines, "total": total, "page": page, "pages": pages})
@_flask_app.route("/api/settings/db_records")
@login_required
def api_settings_db_records():
"""查询数据库下载记录,支持分页和多条件过滤"""
import module.database as db
page = max(1, int(request.args.get("page", 1)))
page_size = min(200, max(10, int(request.args.get("page_size", 50))))
offset = (page - 1) * page_size
records, total = db.query_records(
chat_id=request.args.get("chat_id", ""),
file_name=request.args.get("file_name", ""),
status=request.args.get("status", ""),
media_type=request.args.get("media_type", ""),
date_from=request.args.get("date_from", ""),
date_to=request.args.get("date_to", ""),
limit=page_size,
offset=offset,
)
pages = max(1, (total + page_size - 1) // page_size)
return jsonify({"records": records, "total": total, "page": page, "pages": pages})
@_flask_app.route("/api/settings/clear_session", methods=["POST"])
@login_required
def api_settings_clear_session():
"""删除 session 文件并重启,用于切换 TG 账户"""
session_path = os.path.join(
os.path.abspath("."), "appdata", "sessions", "media_downloader.session"
)
if os.path.exists(session_path):
try:
os.remove(session_path)
except Exception as e:
return jsonify({"success": False, "error": f"删除 session 失败: {e}"})
if _app:
_app.restart_program = True
return jsonify({"success": True})