From 53d3ab7769da075cd2244233c7df6e491960cf19 Mon Sep 17 00:00:00 2001 From: yuming Date: Mon, 11 May 2026 22:46:05 +0800 Subject: [PATCH] =?UTF-8?q?fix:=20=E7=BD=91=E7=BB=9C=E6=96=AD=E5=BC=80?= =?UTF-8?q?=E5=90=8E=E4=BB=BB=E5=8A=A1=E6=B0=B8=E4=B9=85=E5=8D=A1=E5=9C=A8?= =?UTF-8?q?"=E4=B8=8B=E8=BD=BD=E4=B8=AD"?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 三处协同修复: 1) worker 异常分支补 _release_stuck_task, 标记 FailedDownload、推进 finish_task、清残留。 2) download_media 中 fetch_message 加 try-except, 连接异常返回 FailedDownload,不再让异常冒泡。 3) download_chat_task 用 try/finally 兜底回写 chat_download_config.total_task,避免 _wait_node_finish 误判频道已完成而切到下一个。 Co-Authored-By: Claude Opus 4.7 (1M context) --- media_downloader.py | 143 +++++++++++++++++++++++++++++++------------- 1 file changed, 100 insertions(+), 43 deletions(-) diff --git a/media_downloader.py b/media_downloader.py index 02fbe7c..0786a64 100644 --- a/media_downloader.py +++ b/media_downloader.py @@ -423,7 +423,15 @@ async def download_media( task_start_time: float = time.time() media_size = 0 _media = None - message = await fetch_message(client, message) + # 关键修复:fetch_message 是发起任何下载前的网络调用,连接异常时不能让异常一路冒泡, + # 否则 worker 只会打日志,状态永远停在 Downloading,UI 卡死。 + try: + message = await fetch_message(client, message) + except Exception as fetch_err: + logger.warning( + f"Message[{getattr(message, 'id', '?')}] 拉取消息失败(可能连接断开): {fetch_err}" + ) + return DownloadStatus.FailedDownload, None try: for _type in media_types: _media = getattr(message, _type, None) @@ -581,18 +589,23 @@ def _check_config() -> bool: async def worker(client: pyrogram.client.Client): - """Work for download task""" + """下载任务消费者协程""" while app.is_running: + message = None + node = None try: item = await queue.get() message = item[0] - node: TaskNode = item[1] + node = item[1] if node.is_stop_transmission: + # 主动中止:把队列里残留的下载中状态清掉,避免 UI 一直显示 Downloading + _release_stuck_task(node, message, DownloadStatus.SkipDownload) continue if is_message_skipped(str(node.chat_id), message.id): skip_message(str(node.chat_id), message.id) + _release_stuck_task(node, message, DownloadStatus.SkipDownload) continue if node.client: @@ -600,7 +613,39 @@ async def worker(client: pyrogram.client.Client): else: await download_task(client, message, node) except Exception as e: - logger.exception(f"{e}") + logger.exception(f"worker 捕获到未处理异常: {e}") + # 关键修复:worker 吞异常时必须把状态推进,否则 finish_task 永远追不上 total_task + if node is not None and message is not None: + _release_stuck_task(node, message, DownloadStatus.FailedDownload) + + +def _release_stuck_task( + node: "TaskNode", + message: "pyrogram.types.Message", + status: "DownloadStatus", +): + """将异常/中断的任务从"下载中"状态释放,避免任务队列永久卡死。 + + 做三件事: + 1) 标记 download_status,让 UI 不再显示"下载中"。 + 2) 推进 finish_task,让 _wait_node_finish 能正常退出。 + 3) 清理 _download_result 残留条目,避免速度/列表脏数据。 + """ + try: + msg_id = getattr(message, "id", None) + if msg_id is None: + return + node.download_status[msg_id] = status + # finish_task 通过 app.set_download_id 推进;bot 模式下原逻辑也不走 set_download_id, + # 这里保持一致:非 bot 时才推进,避免重复计数。 + if not node.bot: + try: + app.set_download_id(node, msg_id, status) + except Exception as inner: + logger.warning(f"释放卡死任务时 set_download_id 失败 msg_id={msg_id}: {inner}") + remove_download_entry(node.chat_id, msg_id) + except Exception as e: + logger.warning(f"释放卡死任务清理失败: {e}") async def download_chat_task( @@ -657,52 +702,64 @@ async def download_chat_task( for message in skipped_messages: await add_download_task(message, node) - async for message in messages_iter: # type: ignore - # Update checking progress for each message - increment_task_stat("checked_messages") - - meta_data = MetaData() + # 关键修复:消息迭代和缓存写入必须包进 try/finally, + # 否则中途抛 Connection lost 时 total_task 不会被回写,_wait_node_finish 误判频道已完成, + # 后续 worker 还在跑就被切到下一个频道,UI 永远显示"下载中"。 + iter_completed = False + try: + async for message in messages_iter: # type: ignore + # Update checking progress for each message + increment_task_stat("checked_messages") - caption = message.caption - if caption: - caption = validate_title(caption) - app.set_caption_name(node.chat_id, message.media_group_id, caption) - app.set_caption_entities( - node.chat_id, message.media_group_id, message.caption_entities - ) - else: - caption = app.get_caption_name(node.chat_id, message.media_group_id) - set_meta_data(meta_data, message, caption) + meta_data = MetaData() - if app.need_skip_message(chat_download_config, message.id): - continue - - if app.exec_filter(chat_download_config, meta_data): - # 改动点 B:通过 filter 的消息计数,作为 X/N 的分母实时递增 - increment_task_stat("qualified_files") - await add_download_task(message, node) - else: - node.download_status[message.id] = DownloadStatus.SkipDownload - increment_task_stat("skipped_files") - if message.media_group_id: - await upload_telegram_chat( - client, - node.upload_user, - app, - node, - message, - DownloadStatus.SkipDownload, + caption = message.caption + if caption: + caption = validate_title(caption) + app.set_caption_name(node.chat_id, message.media_group_id, caption) + app.set_caption_entities( + node.chat_id, message.media_group_id, message.caption_entities ) + else: + caption = app.get_caption_name(node.chat_id, message.media_group_id) + set_meta_data(meta_data, message, caption) - chat_download_config.need_check = True - chat_download_config.total_task = node.total_task - node.is_running = True + if app.need_skip_message(chat_download_config, message.id): + continue + + if app.exec_filter(chat_download_config, meta_data): + # 改动点 B:通过 filter 的消息计数,作为 X/N 的分母实时递增 + increment_task_stat("qualified_files") + await add_download_task(message, node) + else: + node.download_status[message.id] = DownloadStatus.SkipDownload + increment_task_stat("skipped_files") + if message.media_group_id: + await upload_telegram_chat( + client, + node.upload_user, + app, + node, + message, + DownloadStatus.SkipDownload, + ) + iter_completed = True + finally: + # 不论遍历是否正常结束,都必须把 node.total_task 同步给 chat_download_config, + # 否则 _wait_node_finish 会用 total_task=0 的旧值立即返回,跳过对 worker 的等待。 + chat_download_config.need_check = True + chat_download_config.total_task = node.total_task + node.is_running = True # 改动点 C:遍历正常完成,把实际 qualified_files 写入缓存并覆盖 estimated_total # 仅在正常完成时写入(异常/中断时不执行,避免脏缓存) - actual_total = get_task_progress().get("qualified_files", 0) - db.save_scan_cache(str(node.chat_id), filter_key, actual_total) - update_task_progress(estimated_total=actual_total, is_checking=False) + if iter_completed: + actual_total = get_task_progress().get("qualified_files", 0) + db.save_scan_cache(str(node.chat_id), filter_key, actual_total) + update_task_progress(estimated_total=actual_total, is_checking=False) + else: + # 异常中断时也要把 is_checking 关掉,避免 UI 一直显示"扫描中" + update_task_progress(is_checking=False) async def _wait_node_finish(chat_config, timeout: int = 3600):