fix: 网络断开后任务永久卡在"下载中"
部署到群晖 / deploy (push) Failing after 34s

三处协同修复:
1) worker 异常分支补 _release_stuck_task,
   标记 FailedDownload、推进 finish_task、清残留。
2) download_media 中 fetch_message 加 try-except,
   连接异常返回 FailedDownload,不再让异常冒泡。
3) download_chat_task 用 try/finally 兜底回写
   chat_download_config.total_task,避免 _wait_node_finish
   误判频道已完成而切到下一个。

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
yuming
2026-05-11 22:46:05 +08:00
parent 1cb70dabe0
commit 53d3ab7769
+100 -43
View File
@@ -423,7 +423,15 @@ async def download_media(
task_start_time: float = time.time()
media_size = 0
_media = None
message = await fetch_message(client, message)
# 关键修复:fetch_message 是发起任何下载前的网络调用,连接异常时不能让异常一路冒泡,
# 否则 worker 只会打日志,状态永远停在 DownloadingUI 卡死。
try:
message = await fetch_message(client, message)
except Exception as fetch_err:
logger.warning(
f"Message[{getattr(message, 'id', '?')}] 拉取消息失败(可能连接断开): {fetch_err}"
)
return DownloadStatus.FailedDownload, None
try:
for _type in media_types:
_media = getattr(message, _type, None)
@@ -581,18 +589,23 @@ def _check_config() -> bool:
async def worker(client: pyrogram.client.Client):
"""Work for download task"""
"""下载任务消费者协程"""
while app.is_running:
message = None
node = None
try:
item = await queue.get()
message = item[0]
node: TaskNode = item[1]
node = item[1]
if node.is_stop_transmission:
# 主动中止:把队列里残留的下载中状态清掉,避免 UI 一直显示 Downloading
_release_stuck_task(node, message, DownloadStatus.SkipDownload)
continue
if is_message_skipped(str(node.chat_id), message.id):
skip_message(str(node.chat_id), message.id)
_release_stuck_task(node, message, DownloadStatus.SkipDownload)
continue
if node.client:
@@ -600,7 +613,39 @@ async def worker(client: pyrogram.client.Client):
else:
await download_task(client, message, node)
except Exception as e:
logger.exception(f"{e}")
logger.exception(f"worker 捕获到未处理异常: {e}")
# 关键修复:worker 吞异常时必须把状态推进,否则 finish_task 永远追不上 total_task
if node is not None and message is not None:
_release_stuck_task(node, message, DownloadStatus.FailedDownload)
def _release_stuck_task(
node: "TaskNode",
message: "pyrogram.types.Message",
status: "DownloadStatus",
):
"""将异常/中断的任务从"下载中"状态释放,避免任务队列永久卡死。
做三件事:
1) 标记 download_status,让 UI 不再显示"下载中"
2) 推进 finish_task,让 _wait_node_finish 能正常退出。
3) 清理 _download_result 残留条目,避免速度/列表脏数据。
"""
try:
msg_id = getattr(message, "id", None)
if msg_id is None:
return
node.download_status[msg_id] = status
# finish_task 通过 app.set_download_id 推进;bot 模式下原逻辑也不走 set_download_id
# 这里保持一致:非 bot 时才推进,避免重复计数。
if not node.bot:
try:
app.set_download_id(node, msg_id, status)
except Exception as inner:
logger.warning(f"释放卡死任务时 set_download_id 失败 msg_id={msg_id}: {inner}")
remove_download_entry(node.chat_id, msg_id)
except Exception as e:
logger.warning(f"释放卡死任务清理失败: {e}")
async def download_chat_task(
@@ -657,52 +702,64 @@ async def download_chat_task(
for message in skipped_messages:
await add_download_task(message, node)
async for message in messages_iter: # type: ignore
# Update checking progress for each message
increment_task_stat("checked_messages")
meta_data = MetaData()
# 关键修复:消息迭代和缓存写入必须包进 try/finally
# 否则中途抛 Connection lost 时 total_task 不会被回写,_wait_node_finish 误判频道已完成,
# 后续 worker 还在跑就被切到下一个频道,UI 永远显示"下载中"。
iter_completed = False
try:
async for message in messages_iter: # type: ignore
# Update checking progress for each message
increment_task_stat("checked_messages")
caption = message.caption
if caption:
caption = validate_title(caption)
app.set_caption_name(node.chat_id, message.media_group_id, caption)
app.set_caption_entities(
node.chat_id, message.media_group_id, message.caption_entities
)
else:
caption = app.get_caption_name(node.chat_id, message.media_group_id)
set_meta_data(meta_data, message, caption)
meta_data = MetaData()
if app.need_skip_message(chat_download_config, message.id):
continue
if app.exec_filter(chat_download_config, meta_data):
# 改动点 B:通过 filter 的消息计数,作为 X/N 的分母实时递增
increment_task_stat("qualified_files")
await add_download_task(message, node)
else:
node.download_status[message.id] = DownloadStatus.SkipDownload
increment_task_stat("skipped_files")
if message.media_group_id:
await upload_telegram_chat(
client,
node.upload_user,
app,
node,
message,
DownloadStatus.SkipDownload,
caption = message.caption
if caption:
caption = validate_title(caption)
app.set_caption_name(node.chat_id, message.media_group_id, caption)
app.set_caption_entities(
node.chat_id, message.media_group_id, message.caption_entities
)
else:
caption = app.get_caption_name(node.chat_id, message.media_group_id)
set_meta_data(meta_data, message, caption)
chat_download_config.need_check = True
chat_download_config.total_task = node.total_task
node.is_running = True
if app.need_skip_message(chat_download_config, message.id):
continue
if app.exec_filter(chat_download_config, meta_data):
# 改动点 B:通过 filter 的消息计数,作为 X/N 的分母实时递增
increment_task_stat("qualified_files")
await add_download_task(message, node)
else:
node.download_status[message.id] = DownloadStatus.SkipDownload
increment_task_stat("skipped_files")
if message.media_group_id:
await upload_telegram_chat(
client,
node.upload_user,
app,
node,
message,
DownloadStatus.SkipDownload,
)
iter_completed = True
finally:
# 不论遍历是否正常结束,都必须把 node.total_task 同步给 chat_download_config
# 否则 _wait_node_finish 会用 total_task=0 的旧值立即返回,跳过对 worker 的等待。
chat_download_config.need_check = True
chat_download_config.total_task = node.total_task
node.is_running = True
# 改动点 C:遍历正常完成,把实际 qualified_files 写入缓存并覆盖 estimated_total
# 仅在正常完成时写入(异常/中断时不执行,避免脏缓存)
actual_total = get_task_progress().get("qualified_files", 0)
db.save_scan_cache(str(node.chat_id), filter_key, actual_total)
update_task_progress(estimated_total=actual_total, is_checking=False)
if iter_completed:
actual_total = get_task_progress().get("qualified_files", 0)
db.save_scan_cache(str(node.chat_id), filter_key, actual_total)
update_task_progress(estimated_total=actual_total, is_checking=False)
else:
# 异常中断时也要把 is_checking 关掉,避免 UI 一直显示"扫描中"
update_task_progress(is_checking=False)
async def _wait_node_finish(chat_config, timeout: int = 3600):