276 lines
9.2 KiB
Python
276 lines
9.2 KiB
Python
"""SQLite 下载历史数据库封装"""
|
|
|
|
import hashlib
|
|
import json
|
|
import os
|
|
import re
|
|
import sqlite3
|
|
import time
|
|
from typing import Optional
|
|
|
|
_conn: Optional[sqlite3.Connection] = None
|
|
|
|
|
|
def init_db(db_path: str):
|
|
"""初始化数据库,创建表结构。在 main() 中调用一次。"""
|
|
global _conn
|
|
os.makedirs(os.path.dirname(db_path), exist_ok=True)
|
|
_conn = sqlite3.connect(db_path, check_same_thread=False)
|
|
_conn.row_factory = sqlite3.Row
|
|
_conn.execute("""
|
|
CREATE TABLE IF NOT EXISTS downloads (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
chat_id TEXT NOT NULL,
|
|
chat_title TEXT,
|
|
message_id INTEGER NOT NULL,
|
|
file_name TEXT,
|
|
file_path TEXT,
|
|
file_size INTEGER,
|
|
media_type TEXT,
|
|
download_time TEXT DEFAULT (datetime('now','localtime')),
|
|
status TEXT,
|
|
UNIQUE(chat_id, message_id)
|
|
)
|
|
""")
|
|
# 预扫描结果缓存表:(chat_id, filter_key) → estimated_total
|
|
_conn.execute("""
|
|
CREATE TABLE IF NOT EXISTS scan_cache (
|
|
chat_id TEXT NOT NULL,
|
|
filter_key TEXT NOT NULL,
|
|
estimated_total INTEGER NOT NULL,
|
|
scanned_at INTEGER NOT NULL,
|
|
PRIMARY KEY (chat_id, filter_key)
|
|
)
|
|
""")
|
|
_conn.commit()
|
|
|
|
|
|
def is_downloaded(chat_id: str, message_id: int) -> bool:
|
|
"""检查某条消息是否已成功下载过(status='success')。"""
|
|
if _conn is None:
|
|
return False
|
|
cur = _conn.execute(
|
|
"SELECT 1 FROM downloads WHERE chat_id=? AND message_id=? AND status='success'",
|
|
(chat_id, message_id),
|
|
)
|
|
return cur.fetchone() is not None
|
|
|
|
|
|
def should_skip(chat_id: str, message_id: int) -> tuple:
|
|
"""检查是否应跳过该消息(已成功下载或用户手动跳过)。
|
|
返回 (should_skip: bool, reason: str)
|
|
"""
|
|
if _conn is None:
|
|
return False, ""
|
|
cur = _conn.execute(
|
|
"SELECT status FROM downloads WHERE chat_id=? AND message_id=?",
|
|
(chat_id, message_id),
|
|
)
|
|
row = cur.fetchone()
|
|
if row is None:
|
|
return False, ""
|
|
if row["status"] == "success":
|
|
return True, "曾经下载过(本地已删除)"
|
|
if row["status"] == "skip":
|
|
return True, "用户已手动跳过"
|
|
return False, ""
|
|
|
|
|
|
def record_skip(chat_id: str, chat_title: str, message_id: int, file_name: str = ""):
|
|
"""记录用户手动跳过的消息。"""
|
|
if _conn is None:
|
|
return
|
|
_conn.execute(
|
|
"""
|
|
INSERT INTO downloads
|
|
(chat_id, chat_title, message_id, file_name, file_path, file_size, media_type,
|
|
download_time, status)
|
|
VALUES (?, ?, ?, ?, '', 0, '', datetime('now','localtime'), 'skip')
|
|
ON CONFLICT(chat_id, message_id) DO UPDATE SET
|
|
chat_title=excluded.chat_title,
|
|
download_time=excluded.download_time,
|
|
status='skip'
|
|
""",
|
|
(chat_id, chat_title, message_id, file_name),
|
|
)
|
|
_conn.commit()
|
|
|
|
|
|
def delete_record(chat_id: str, message_id: int):
|
|
"""删除下载记录(撤销跳过或重新下载)。"""
|
|
if _conn is None:
|
|
return
|
|
_conn.execute(
|
|
"DELETE FROM downloads WHERE chat_id=? AND message_id=?",
|
|
(chat_id, message_id),
|
|
)
|
|
_conn.commit()
|
|
|
|
|
|
def record_download(
|
|
chat_id: str,
|
|
chat_title: str,
|
|
message_id: int,
|
|
file_name: str,
|
|
file_path: str,
|
|
file_size: int,
|
|
media_type: str,
|
|
status: str,
|
|
):
|
|
"""写入或更新下载记录。同一 chat_id+message_id 重复插入时覆盖。"""
|
|
if _conn is None:
|
|
return
|
|
_conn.execute(
|
|
"""
|
|
INSERT INTO downloads
|
|
(chat_id, chat_title, message_id, file_name, file_path, file_size, media_type,
|
|
download_time, status)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?, datetime('now','localtime'), ?)
|
|
ON CONFLICT(chat_id, message_id) DO UPDATE SET
|
|
chat_title=excluded.chat_title,
|
|
file_name=excluded.file_name,
|
|
file_path=excluded.file_path,
|
|
file_size=excluded.file_size,
|
|
media_type=excluded.media_type,
|
|
download_time=excluded.download_time,
|
|
status=excluded.status
|
|
""",
|
|
(chat_id, chat_title, message_id, file_name, file_path, file_size, media_type, status),
|
|
)
|
|
_conn.commit()
|
|
|
|
|
|
def query_records(
|
|
chat_id: str = "",
|
|
file_name: str = "",
|
|
status: str = "",
|
|
media_type: str = "",
|
|
date_from: str = "",
|
|
date_to: str = "",
|
|
limit: int = 50,
|
|
offset: int = 0,
|
|
) -> tuple:
|
|
"""动态条件查询下载记录,返回 (records_list, total_count)。"""
|
|
if _conn is None:
|
|
return [], 0
|
|
conditions = []
|
|
params = []
|
|
if chat_id:
|
|
conditions.append("(chat_id LIKE ? OR chat_title LIKE ?)")
|
|
params.extend([f"%{chat_id}%", f"%{chat_id}%"])
|
|
if file_name:
|
|
conditions.append("file_name LIKE ?")
|
|
params.append(f"%{file_name}%")
|
|
if status:
|
|
conditions.append("status = ?")
|
|
params.append(status)
|
|
if media_type:
|
|
conditions.append("media_type = ?")
|
|
params.append(media_type)
|
|
if date_from:
|
|
conditions.append("download_time >= ?")
|
|
params.append(date_from)
|
|
if date_to:
|
|
conditions.append("download_time <= ?")
|
|
params.append(date_to + " 23:59:59")
|
|
where = ("WHERE " + " AND ".join(conditions)) if conditions else ""
|
|
total = _conn.execute(
|
|
f"SELECT COUNT(*) FROM downloads {where}", params
|
|
).fetchone()[0]
|
|
rows = _conn.execute(
|
|
f"""SELECT id, chat_id, chat_title, message_id, file_name, file_path,
|
|
file_size, media_type, download_time, status
|
|
FROM downloads {where} ORDER BY id DESC LIMIT ? OFFSET ?""",
|
|
params + [limit, offset],
|
|
).fetchall()
|
|
return [dict(row) for row in rows], total
|
|
|
|
|
|
def get_recent_history(limit: int = 50, offset: int = 0) -> list:
|
|
"""返回最近的成功下载和手动跳过记录,供 Web UI 的已完成列表使用。"""
|
|
if _conn is None:
|
|
return []
|
|
cur = _conn.execute(
|
|
"""
|
|
SELECT chat_id, chat_title, message_id, file_name, file_path,
|
|
file_size, media_type, download_time, status
|
|
FROM downloads
|
|
WHERE status IN ('success', 'skip')
|
|
ORDER BY id DESC
|
|
LIMIT ? OFFSET ?
|
|
""",
|
|
(limit, offset),
|
|
)
|
|
return [dict(row) for row in cur.fetchall()]
|
|
|
|
|
|
def count_recent_history() -> int:
|
|
"""返回已完成(成功+跳过)记录总数。"""
|
|
if _conn is None:
|
|
return 0
|
|
return _conn.execute(
|
|
"SELECT COUNT(*) FROM downloads WHERE status IN ('success', 'skip')"
|
|
).fetchone()[0]
|
|
|
|
|
|
# ────────────────────────────────────────────────────────────────
|
|
# 预扫描结果缓存(给 banner「已完成 X / N」进度条用)
|
|
# ────────────────────────────────────────────────────────────────
|
|
|
|
def build_filter_key(
|
|
download_filter: Optional[str],
|
|
media_types: Optional[list],
|
|
file_formats: Optional[dict],
|
|
) -> str:
|
|
"""根据过滤三元组构造稳定的缓存 key。
|
|
含动态时间关键字(now / today / date 函数)时返回 "__dynamic__",调用方应跳过缓存。
|
|
"""
|
|
df = (download_filter or "").strip()
|
|
# 动态时间过滤器不缓存(每次跑都要重扫)
|
|
if df and re.search(r"\b(now|today|yesterday)\b", df, re.IGNORECASE):
|
|
return "__dynamic__"
|
|
|
|
payload = json.dumps(
|
|
{
|
|
"f": df,
|
|
"m": sorted(media_types or []),
|
|
# 对每个 media 的 formats 也排序,保证顺序无关
|
|
"ff": {k: sorted(v or []) for k, v in sorted((file_formats or {}).items())},
|
|
},
|
|
ensure_ascii=False,
|
|
sort_keys=True,
|
|
)
|
|
return hashlib.sha1(payload.encode("utf-8")).hexdigest()
|
|
|
|
|
|
def get_scan_cache(chat_id: str, filter_key: str, ttl: int = 86400) -> Optional[int]:
|
|
"""读取缓存的预计总数;超过 ttl 或不存在返回 None。"""
|
|
if _conn is None or filter_key == "__dynamic__":
|
|
return None
|
|
row = _conn.execute(
|
|
"SELECT estimated_total, scanned_at FROM scan_cache WHERE chat_id=? AND filter_key=?",
|
|
(chat_id, filter_key),
|
|
).fetchone()
|
|
if row is None:
|
|
return None
|
|
if time.time() - row["scanned_at"] > ttl:
|
|
return None
|
|
return int(row["estimated_total"])
|
|
|
|
|
|
def save_scan_cache(chat_id: str, filter_key: str, estimated_total: int):
|
|
"""UPSERT 预扫描结果。"""
|
|
if _conn is None or filter_key == "__dynamic__":
|
|
return
|
|
_conn.execute(
|
|
"""
|
|
INSERT INTO scan_cache (chat_id, filter_key, estimated_total, scanned_at)
|
|
VALUES (?, ?, ?, ?)
|
|
ON CONFLICT(chat_id, filter_key) DO UPDATE SET
|
|
estimated_total=excluded.estimated_total,
|
|
scanned_at=excluded.scanned_at
|
|
""",
|
|
(chat_id, filter_key, int(estimated_total), int(time.time())),
|
|
)
|
|
_conn.commit()
|