From 45d17ee576a32e3f1df69d4fcee4ade03f9986a0 Mon Sep 17 00:00:00 2001 From: yuming Date: Tue, 16 Jun 2026 10:41:38 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E8=90=A4=E7=9F=B3=20CB60=20=E5=BD=95?= =?UTF-8?q?=E5=83=8F=E6=AF=8F=E6=97=A5=E5=90=8C=E6=AD=A5=E5=88=9D=E5=A7=8B?= =?UTF-8?q?=E7=89=88=E6=9C=AC?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 通过 ADB 自动化模拟器中的萤石 App,每天把 SD 卡里昨天的录像批量 下载到本地。云 API 方案 (sync.py/probe.py) 是备选,需先在萤石开放 平台关闭设备码流加密。 Co-Authored-By: Claude Sonnet 4.6 --- .gitignore | 22 +++ automate.py | 488 ++++++++++++++++++++++++++++++++++++++++++++++ config.example.py | 8 + probe.py | 141 ++++++++++++++ sync.py | 155 +++++++++++++++ 5 files changed, 814 insertions(+) create mode 100644 .gitignore create mode 100644 automate.py create mode 100644 config.example.py create mode 100644 probe.py create mode 100644 sync.py diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..c7f8c0e --- /dev/null +++ b/.gitignore @@ -0,0 +1,22 @@ +# 敏感配置和缓存 +config.py +token_cache.json +sync.log + +# 下载的录像(每天几百 MB) +recordings/ + +# Python +__pycache__/ +*.pyc +.venv/ +*.egg-info/ + +# IDE / 系统 +.DS_Store +.idea/ +.vscode/ + +# 临时 +*.tmp +/tmp/ diff --git a/automate.py b/automate.py new file mode 100644 index 0000000..58fca70 --- /dev/null +++ b/automate.py @@ -0,0 +1,488 @@ +#!/usr/bin/env python3 +""" +萤石录像自动下载脚本 +通过 ADB 自动化模拟器中的萤石 App,下载前一天的所有报警录像。 +""" +import subprocess, time, os, shutil, struct, zlib +from datetime import datetime, timedelta +from pathlib import Path + +PYTHON = "/usr/local/bin/python3.12" + +ADB = os.path.expanduser("~/Library/Android/sdk/platform-tools/adb") +DEVICE = "emulator-5554" +OUTPUT_DIR = Path("./recordings") +PKG = "com.videogo" + +# 屏幕坐标(Pixel 6 模拟器 1080x2400,2026-06-15 实测校准) +COORD = { + "home_camera_card": (540, 500), # 首页摄像头卡片 + "live_video_btn": (270, 1340), # 摄像头详情页-实时视频按钮 + "record_tab": (540, 2290), # 底部录像 Tab + "prev_day_arrow": (80, 1130), # < 昨天箭头 + "download_btn": (426, 880), # 录像页工具栏-下载按钮 + "batch_download": (540, 1904), # 弹出菜单"批量下载"(UIAutomator dump 实测) + "reload_btn": (540, 1960), # 加载失败页的"重新加载"按钮 +} + +# 批量下载界面缩略图布局(UIAutomator dump 实测) +# 3 列 x 中心:201, 540, 878 +# 行高 203 像素;y 中心从 1240 开始 +THUMB_X = [201, 540, 878] +THUMB_ROW_H = 203 +# 每次点 3 整行 = 9 个(不触发 10 并发限制),然后滚动 3 行 + + +def adb(*args): + return subprocess.run([ADB, "-s", DEVICE] + list(args), + capture_output=True, text=True) + + +def tap(x, y, delay=0.8): + adb("shell", "input", "tap", str(x), str(y)) + time.sleep(delay) + + +def find_by_text(target_text, retries=4, delay=2.0): + """通过 uiautomator dump 查找指定 text 节点的中心坐标。 + 返回 (cx, cy) 或 None。重试次数受限,每次间隔 >= 2s,避免 ANR。 + """ + import re + # 用 [^<>]*? 限制在同一 标签内,并跨过含 / 的属性 + # (如 resource-id="com.videogo:id/tv_download_all") + pat = re.compile( + r'text="' + re.escape(target_text) + r'"[^<>]*?bounds="\[(\d+),(\d+)\]\[(\d+),(\d+)\]"', + ) + for i in range(retries): + adb("shell", "uiautomator", "dump", "/sdcard/uib.xml") + adb("pull", "/sdcard/uib.xml", "/tmp/uib.xml") + try: + with open("/tmp/uib.xml") as f: + content = f.read() + except FileNotFoundError: + content = "" + m = pat.search(content) + if m: + x1, y1, x2, y2 = map(int, m.groups()) + return ((x1 + x2) // 2, (y1 + y2) // 2) + if i < retries - 1: + time.sleep(delay) + return None + + +def tap_text(text, after_delay=1.5, retries=8): + """查找文字元素并点击。找不到抛 RuntimeError,避免误触别的页面。""" + pos = find_by_text(text, retries=retries) + if pos is None: + raise RuntimeError(f"找不到目标文字:'{text}',停止以避免误触") + cx, cy = pos + print(f" → tap '{text}' at ({cx}, {cy})") + adb("shell", "input", "tap", str(cx), str(cy)) + time.sleep(after_delay) + + +def tap_until_text(x, y, expected_text, max_tries=4, after_tap_wait=4.0): + """点击 (x,y) 并等待 expected_text 出现,验证页面切换。 + 没切换则重试点击。tap 后等 4 秒再 dump,避免 ANR(dump 太密集会让 App 卡死)。 + """ + for trial in range(max_tries): + adb("shell", "input", "tap", str(x), str(y)) + time.sleep(after_tap_wait) + if find_by_text(expected_text, retries=1): + print(f" ✓ tap ({x},{y}) 后看到 '{expected_text}'") + return True + print(f" ! tap ({x},{y}) 后未出现 '{expected_text}',重试 {trial+1}/{max_tries}") + # ANR 风险点:连续失败时给 App 喘息时间 + time.sleep(2) + raise RuntimeError(f"点击 ({x},{y}) 重试 {max_tries} 次仍未跳转到含 '{expected_text}' 的页面") + + +def dismiss_anr_if_any(): + """检测系统 ANR 弹窗,存在则点 Wait 让 App 继续运行(不杀进程)。""" + result = adb("shell", "dumpsys", "activity", "activities") + if "isn't responding" in result.stdout or "AppErrorDialog" in result.stdout: + print(" ⚠ 检测到 ANR 弹窗,点 Wait") + # Wait 按钮 bounds=[70,1299][1010,1425],中心 (540,1362) + adb("shell", "input", "tap", "540", "1362") + time.sleep(4) + return True + return False + + +def tap_until_activity(x, y, expected_activity_keyword, max_tries=6, after_tap_wait=6.0): + """点击 (x,y),通过 dumpsys 检查 Activity 是否切换到含 keyword 的页面。 + 每次尝试前都检查 ANR,存在则先点 Wait。间隔放宽到 6s 给模拟器+萤石喘息时间。 + """ + for trial in range(max_tries): + dismiss_anr_if_any() + adb("shell", "input", "tap", str(x), str(y)) + time.sleep(after_tap_wait) + cur = current_activity() + if cur and expected_activity_keyword in cur: + print(f" ✓ tap ({x},{y}) 后进入 '{cur}'") + return True + print(f" ! tap ({x},{y}) 后 Activity='{cur}' 不含 '{expected_activity_keyword}',重试 {trial+1}/{max_tries}") + # 失败时给 App 更长的喘息时间(避免连环 ANR) + time.sleep(5) + raise RuntimeError( + f"点击 ({x},{y}) 重试 {max_tries} 次仍未跳到含 '{expected_activity_keyword}' 的 Activity" + ) + + +def current_activity(): + """返回当前 topResumedActivity 的简短名(如 'MainTabActivity')。""" + import re + result = adb("shell", "dumpsys", "activity", "activities") + m = re.search(r'topResumedActivity=ActivityRecord\{[^}]*\s+com\.videogo/\.?([^\s}]+)', result.stdout) + return m.group(1) if m else None + + +def assert_activity(keyword, hint=""): + """断言当前 Activity 名含 keyword。不含则报错。避免在错页面盲点。""" + cur = current_activity() + if not cur or keyword not in cur: + raise RuntimeError(f"期望 Activity 含 '{keyword}',实际为 '{cur}'。{hint}") + return cur + + +def tap_safe(x, y, expect_activity_keyword, hint="", delay=1.5): + """先断言当前 Activity 匹配 keyword,再点击 (x,y),避免误触别的页面。""" + cur = assert_activity(expect_activity_keyword, hint) + print(f" → tap ({x},{y}) [当前 Activity: {cur}]") + adb("shell", "input", "tap", str(x), str(y)) + time.sleep(delay) + + +def sample_pixel_rgb(x, y, png_path="/tmp/_pixchk.png"): + """截图并采样指定坐标 RGB。需要系统装了 ImageMagick (`magick`)。""" + import re + adb("shell", "screencap", "-p", "/sdcard/_pixchk.png") + adb("pull", "/sdcard/_pixchk.png", png_path) + out = subprocess.run( + ["magick", png_path, "-format", "%[pixel:p{" + str(x) + "," + str(y) + "}]", "info:"], + capture_output=True, text=True, + ) + m = re.match(r"srgba?\((\d+),(\d+),(\d+)", out.stdout.strip()) + if not m: + return (0, 0, 0) + return tuple(map(int, m.groups())) + + +def tap_until_pixel_changed(x, y, sample_x, sample_y, condition_fn, + max_tries=5, timeout_per_try=8, interval=1.0): + """点 (x,y),然后轮询 (sample_x, sample_y) 像素,直到 condition_fn(rgb) 为真。 + 超时则重试点击。用于无文字可定位、但有视觉信号的场景(如切日期后工具栏变化)。 + """ + for trial in range(max_tries): + adb("shell", "input", "tap", str(x), str(y)) + for sec in range(int(timeout_per_try / interval)): + time.sleep(interval) + rgb = sample_pixel_rgb(sample_x, sample_y) + if condition_fn(rgb): + print(f" ✓ tap ({x},{y}) 生效,{sec+1}s 后检测到像素变化 rgb={rgb}") + return True + print(f" ! tap ({x},{y}) 后像素无变化({timeout_per_try}s),重试 {trial+1}/{max_tries}") + raise RuntimeError(f"点击 ({x},{y}) 重试 {max_tries} 次后像素仍未变化") + + +def screenshot(path="/tmp/ezviz_screen.png"): + adb("shell", "screencap", "-p", "/sdcard/ezviz_tmp.png") + adb("pull", "/sdcard/ezviz_tmp.png", path) + return path + + +def launch_app(): + """从头启动 App,等待 618 开屏广告自动关闭后回到首页。 + 用 am start -S 强制重启主 Activity(比 monkey 更可靠:避免之前残留的 + RN 业务页占据栈顶导致 monkey 不进入首页)。 + """ + adb("shell", "am", "force-stop", PKG) + time.sleep(2) + adb("shell", "am", "start", "-S", "-n", f"{PKG}/.LauncherActivity") + # 等开屏广告(618 全屏广告自动播放 5~8 秒)+ App 启动稳定 + time.sleep(15) + + +def is_error_page(): + """检测当前是否为'加载失败'白屏页面 + 取屏幕中央像素,白色/浅灰 = 错误页,深色 = 有视频内容。 + """ + tmp = "/tmp/ezviz_check.png" + adb("shell", "screencap", "-p", "/sdcard/ezviz_chk.png") + adb("pull", "/sdcard/ezviz_chk.png", tmp) + try: + with open(tmp, "rb") as f: + data = f.read() + # 解析 PNG IDAT 得到原始像素 + pos, idat = 8, b"" + width = height = 0 + while pos < len(data): + length = struct.unpack(">I", data[pos:pos+4])[0] + ctype = data[pos+4:pos+8] + cdata = data[pos+8:pos+8+length] + if ctype == b"IHDR": + width, height = struct.unpack(">II", cdata[:8]) + elif ctype == b"IDAT": + idat += cdata + elif ctype == b"IEND": + break + pos += 12 + length + raw = zlib.decompress(idat) + # screencap 输出 RGBA,每行有 1 字节 filter 前缀 + bpp = 4 + stride = width * bpp + 1 + cx, cy = width // 2, height // 3 # 取屏幕上 1/3 处中央(视频区域) + row_start = cy * stride + 1 + r = raw[row_start + cx * bpp] + g = raw[row_start + cx * bpp + 1] + b = raw[row_start + cx * bpp + 2] + # 亮度 > 220 认为是白屏(错误页) + return (r + g + b) // 3 > 220 + except Exception: + return False # 解析失败时不做干预 + + +def navigate_to_yesterday_batch(): + """导航到昨天的 SD 卡批量下载模式。 + 关键节点用 tap_text 通过 UI dump 定位,避免坐标错位时误触首页底部 + Tab(萤石/对话/商城/社区/我的,"商城"中心 x=540 跟录像 Tab 撞车)。 + """ + print("→ 点击首页摄像头卡片(用 Activity 切换验证,避免频繁 dump 触发 ANR)") + # 详情页是 RN 实现的 EZReactBizActivity + tap_until_activity(*COORD["home_camera_card"], expected_activity_keyword="EZReactBizActivity") + + print("→ 点击实时视频(详情页有文字,可以 dump 定位)") + tap_text("实时视频", after_delay=5) + + for attempt in range(3): + if not is_error_page(): + print(" 实时视频加载成功") + break + print(f" 检测到加载失败,重试 {attempt+1}/3 ...") + tap(*COORD["reload_btn"], delay=6) + else: + print(" 三次失败,返回重试") + adb("shell", "input", "keyevent", "KEYCODE_BACK") + time.sleep(2) + tap(*COORD["home_camera_card"], delay=3) + tap_text("实时视频", after_delay=8) + + # 实时视频页全自绘,dump 不到底部 Tab 文字。 + # 但当前 Activity = VideoPlayActivityNewVersion,跟首页 MainTabActivity 不同。 + # 用 Activity 校验保护坐标点击,避免误触首页商城。 + print("→ 切换到录像 Tab(先校验在实时视频页)") + tap_safe(*COORD["record_tab"], expect_activity_keyword="VideoPlayActivity", + hint="未进入实时视频页,可能上一步点击实时视频按钮失败") + + print("→ 切换到昨天(像素验证:切换后日期标签 (180,1130) 出现深色文字)") + assert_activity("VideoPlayActivity", "未在实时视频/录像页") + # "今日" 状态下 (180,1130) 是浅蓝灰背景 (244,246,252) + # "06-XX" 状态下该位置是数字深色文字(R~44) + tap_until_pixel_changed( + *COORD["prev_day_arrow"], + sample_x=180, sample_y=1130, + condition_fn=lambda rgb: rgb[0] < 100, + ) + # 日期切换后工具栏不会自动切换。需要点一个缩略图把"录像选中状态"打开, + # 工具栏才从"延时摄影/清晰度/对讲/..." 切到"收藏/下载/剪辑/..." + # 工具栏第 3 位 (540,900) 由浅蓝(对讲圆圈)变白即为切换完成 + print("→ 点一个缩略图触发工具栏切换") + adb("shell", "input", "tap", "200", "1400") # 第一行第一列缩略图 + for _ in range(15): + time.sleep(1) + rgb = sample_pixel_rgb(540, 900) + if rgb[0] > 240 and rgb[1] > 240 and rgb[2] > 240: + print(f" ✓ 工具栏已切换到含'下载'按钮(rgb={rgb})") + break + else: + raise RuntimeError("点缩略图后工具栏未切换到下载模式") + # 等视频开始稳定播放再操作(否则 tap 会被加载过程吸收) + time.sleep(3) + + # 下载按钮可能因为视频还在加载而被吸收,循环重点击直到菜单弹出 + print("→ 点击下载按钮 + 等批量下载菜单出现(最多 4 次重试)") + assert_activity("VideoPlayActivity", "未在录像页") + for trial in range(4): + adb("shell", "input", "tap", str(COORD["download_btn"][0]), str(COORD["download_btn"][1])) + time.sleep(3) + pos = find_by_text("批量下载", retries=3, delay=1.5) + if pos: + print(f" ✓ 菜单弹出,'批量下载' at {pos}") + cx, cy = pos + adb("shell", "input", "tap", str(cx), str(cy)) + time.sleep(3) + break + print(f" ! 第 {trial+1} 次点下载未弹出菜单,重试") + else: + raise RuntimeError("反复点下载按钮仍未弹出批量下载菜单") + + +def dump_thumbnail_bounds(): + """通过 uiautomator dump 拿到当前批量下载界面可见缩略图的 bounds 列表。 + 返回 [(x1,y1,x2,y2), ...] 按 y 升序。 + 过滤:宽 > 300,高 > 150,y1 在 [1000, 2300] 区间。 + """ + import re + adb("shell", "uiautomator", "dump", "/sdcard/uib.xml") + adb("pull", "/sdcard/uib.xml", "/tmp/uib.xml") + try: + with open("/tmp/uib.xml") as f: + content = f.read() + except FileNotFoundError: + return [] + pat = re.compile(r'clickable="true"[^/]*bounds="\[(\d+),(\d+)\]\[(\d+),(\d+)\]"') + out = [] + for m in pat.finditer(content): + x1, y1, x2, y2 = map(int, m.groups()) + if 1000 <= y1 <= 2300 and (x2 - x1) > 300 and (y2 - y1) > 150: + out.append((x1, y1, x2, y2)) + out.sort(key=lambda b: (b[1], b[0])) + return out + + +def dismiss_dialog_if_any(): + """检测并关闭"使用手机网络下载将消耗流量"等萤石弹窗。 + 点击 text="确定" 按钮。 + """ + pos = find_by_text("确定", retries=1) + if pos: + print(f" ⚠ 检测到弹窗,点击'确定' at {pos}") + adb("shell", "input", "tap", str(pos[0]), str(pos[1])) + time.sleep(2) + return True + return False + + +def tap_all_thumbnails(max_rounds=100, batch_sleep=35): + """循环:dump → 点击 9 个 → 等下载 → 滚动 3 行。 + 每轮点 3 整行(9 个)避免触发 10 并发限制。 + 滚动距离 = 3 行 = ~609 像素,保证下轮可见行与本轮不重叠。 + 检测到底:连续 2 轮 bounds 列表相同则结束。 + """ + print(f"→ 开始批量下载循环(最多 {max_rounds} 轮)") + # 进入批量模式后会有"使用手机网络流量"弹窗,先关掉 + dismiss_dialog_if_any() + last_signature = None + same_count = 0 + + for r in range(max_rounds): + bounds = dump_thumbnail_bounds() + if not bounds: + print(f" 轮 {r+1}: 没拿到缩略图 bounds,结束") + break + + # 取完整可见行(高度 > 195) + full = [b for b in bounds if (b[3] - b[1]) > 195] + if len(full) < 3: + print(f" 轮 {r+1}: 完整行不足 ({len(full)} < 3),结束") + break + + # 检测到底:bounds 内容跟上次相同 + signature = tuple(full[:9]) + if signature == last_signature: + same_count += 1 + if same_count >= 2: + print(f" 轮 {r+1}: 连续 2 轮内容相同,已到底,结束") + break + else: + same_count = 0 + last_signature = signature + + to_click = full[:9] + for x1, y1, x2, y2 in to_click: + cx, cy = (x1 + x2) // 2, (y1 + y2) // 2 + adb("shell", "input", "tap", str(cx), str(cy)) + time.sleep(0.3) + + print(f" 轮 {r+1}: 点击 {len(to_click)} 个,等 {batch_sleep}s 下载...") + time.sleep(batch_sleep) + + # 向下滚动 3 行(约 609 像素) + adb("shell", "input", "swipe", "540", "1820", "540", "1211", "400") + time.sleep(1.5) + + print("→ 批量下载循环结束") + + +def wait_for_downloads(baseline_files, timeout=600): + """等待下载目录新增文件稳定 + 文件名使用下载时刻时间戳,固定存到今天的目录。 + baseline_files 为下载前已有的文件集合,用于计算新增数量。 + """ + today = datetime.now().strftime("%Y%m%d") + remote_dir = f"/sdcard/Android/data/{PKG}/files/{today}" + print(f"→ 等待下载完成(目录 {today},起始已有 {len(baseline_files)} 个)...") + + last_new = -1 + stable_rounds = 0 + deadline = time.time() + timeout + + while time.time() < deadline: + result = adb("shell", "find", remote_dir, "-name", "*_BH*", "-type", "f") + all_files = set(l for l in result.stdout.strip().splitlines() if l) + new_files = all_files - baseline_files + count = len(new_files) + print(f" 新下载 {count} 个文件...") + if count > 0 and count == last_new: + stable_rounds += 1 + if stable_rounds >= 3: + print(f"→ 下载稳定,共新增 {count} 个文件") + return list(new_files) + else: + stable_rounds = 0 + last_new = count + time.sleep(10) + + print("警告:等待超时") + # 返回已下载的新文件(即便未完全稳定) + result = adb("shell", "find", remote_dir, "-name", "*_BH*", "-type", "f") + all_files = set(l for l in result.stdout.strip().splitlines() if l) + return list(all_files - baseline_files) + + +def pull_files(remote_files): + """把下载好的文件 adb pull 到本地""" + yesterday = (datetime.now() - timedelta(days=1)).strftime("%Y%m%d") + # 本地目录按录像日期(昨天)命名,方便归档 + local_dir = OUTPUT_DIR / "BH5455783" / yesterday + local_dir.mkdir(parents=True, exist_ok=True) + + print(f"→ 拉取 {len(remote_files)} 个文件到 {local_dir}") + for remote_path in remote_files: + fname = Path(remote_path).name + ".mp4" + local_path = local_dir / fname + if local_path.exists(): + print(f" 跳过(已存在): {fname}") + continue + adb("pull", remote_path, str(local_path)) + print(f" 已拉取: {fname}") + + print(f"→ 完成,文件保存在 {local_dir}") + + +def get_baseline_files(): + """获取下载开始前已有的文件列表""" + today = datetime.now().strftime("%Y%m%d") + remote_dir = f"/sdcard/Android/data/{PKG}/files/{today}" + result = adb("shell", "find", remote_dir, "-name", "*_BH*", "-type", "f") + return set(l for l in result.stdout.strip().splitlines() if l) + + +def sync(): + print("===== 萤石录像自动同步开始 =====") + launch_app() + # 记录下载前的基准文件集合 + baseline = get_baseline_files() + print(f"→ 基准文件数: {len(baseline)}") + navigate_to_yesterday_batch() + tap_all_thumbnails() # 默认 max_rounds=100,跑到滚到底为止 + files = wait_for_downloads(baseline) + if files: + pull_files(files) + else: + print("未找到新下载文件") + print("===== 同步完成 =====") + + +if __name__ == "__main__": + sync() diff --git a/config.example.py b/config.example.py new file mode 100644 index 0000000..d4a522b --- /dev/null +++ b/config.example.py @@ -0,0 +1,8 @@ +# 复制此文件为 config.py 后填入真实值。config.py 已加入 .gitignore。 +APP_KEY = "你的萤石开放平台 AppKey" +APP_SECRET = "你的萤石开放平台 AppSecret" +DEVICE_SERIAL = "BHxxxxxxx" # 设备序列号,在萤石 App 设备信息里 +VERIFY_CODE = "XXXXXX" # 设备验证码(贴在摄像头上) +CHANNEL_NO = 1 +OUTPUT_DIR = "./recordings" +TOKEN_CACHE = "./token_cache.json" diff --git a/probe.py b/probe.py new file mode 100644 index 0000000..792bbf4 --- /dev/null +++ b/probe.py @@ -0,0 +1,141 @@ +#!/usr/bin/env python3 +""" +第一步:探测萤石API,确认录像列表和回放流接口的实际返回格式 +""" + +import json, os, requests +from datetime import datetime, timedelta +from config import * + +BASE = "https://open.ys7.com/api/lapp" + + +def get_token(): + """获取AccessToken,缓存到本地(有效期7天)""" + if os.path.exists(TOKEN_CACHE): + cache = json.load(open(TOKEN_CACHE)) + if datetime.now().timestamp() < cache["expire_ts"] - 3600: + print(f"使用缓存Token,有效期至: {cache['expire_str']}") + return cache["token"] + + r = requests.post(f"{BASE}/token/get", + data={"appKey": APP_KEY, "appSecret": APP_SECRET}) + d = r.json() + if d["code"] != "200": + raise Exception(f"Token获取失败: {d}") + + token = d["data"]["accessToken"] + expire_ts = d["data"]["expireTime"] / 1000 + expire_str = datetime.fromtimestamp(expire_ts).strftime("%Y-%m-%d %H:%M:%S") + json.dump({"token": token, "expire_ts": expire_ts, "expire_str": expire_str}, + open(TOKEN_CACHE, "w")) + print(f"Token已刷新,有效期至: {expire_str}") + return token + + +def probe_record_search(token, date_str): + """探测:查询SD卡录像列表(试几个端点)""" + params = { + "accessToken": token, + "deviceSerial": DEVICE_SERIAL, + "channelNo": CHANNEL_NO, + "startTime": f"{date_str} 00:00:00", + "endTime": f"{date_str} 23:59:59", + "recType": 0, + "pageStart": 0, + "pageSize": 10 + } + endpoints = [ + "/record/search", + "/storage/recording/day/times", + "/device/record/list", + "/video/rec/list", + ] + for ep in endpoints: + print(f"\n{'='*50}") + print(f"【接口1变体】{ep} 日期={date_str}") + r = requests.post(f"{BASE}{ep}", data=params) + print(f"HTTP {r.status_code}") + print(r.text[:500]) + return None + + +def probe_video_by_time(token, date_str): + """探测:尝试多种时间格式""" + from datetime import timezone + dt_start = datetime.strptime(f"{date_str} 00:00:00", "%Y-%m-%d %H:%M:%S") + dt_end = datetime.strptime(f"{date_str} 01:00:00", "%Y-%m-%d %H:%M:%S") + ts_start = int(dt_start.timestamp() * 1000) # 毫秒时间戳 + ts_end = int(dt_end.timestamp() * 1000) + + formats = [ + ("yyyy-MM-dd HH:mm:ss", f"{date_str} 00:00:00", f"{date_str} 01:00:00"), + ("yyyyMMddHHmmss", date_str.replace("-","")+"000000", date_str.replace("-","")+"010000"), + ("毫秒时间戳", str(ts_start), str(ts_end)), + ("秒时间戳", str(ts_start//1000), str(ts_end//1000)), + ] + for fmt_name, s, e in formats: + print(f"\n{'='*50}") + print(f"【接口2】video/by/time 时间格式={fmt_name} start={s}") + r = requests.post(f"{BASE}/video/by/time", data={ + "accessToken": token, + "deviceSerial": DEVICE_SERIAL, + "channelNo": CHANNEL_NO, + "startTime": s, + "endTime": e + }) + print(f"HTTP {r.status_code} {r.text[:200]}") + return None + + +def probe_live_address(token): + """探测:获取直播流地址(顺带看看协议格式)""" + print(f"\n{'='*50}") + print("【接口3】直播流地址(探测协议格式)") + r = requests.post(f"{BASE}/v2/live/address/get", data={ + "accessToken": token, + "deviceSerial": DEVICE_SERIAL, + "channelNo": CHANNEL_NO, + "protocol": 2, # 1=ezopen 2=HLS 3=RTMP 4=FLV + "quality": 1 + }) + print(f"HTTP {r.status_code}") + print(r.text[:500]) + + +def probe_playback_address(token): + """探测回放流地址(已知录像时间段的毫秒时间戳)""" + rec_start = 1781179564000 + rec_end = 1781211539000 + endpoints = [ + ("/v2/live/address/get", {"startTime": rec_start, "endTime": rec_end, "protocol": 2, "quality": 1}), + ("/record/playback/address", {"startTime": rec_start, "endTime": rec_end}), + ("/v3/live/address/get", {"startTime": rec_start, "endTime": rec_end, "protocol": 2}), + ("/video/playback", {"startTime": rec_start, "endTime": rec_end}), + ] + for ep, extra in endpoints: + params = {"accessToken": token, "deviceSerial": DEVICE_SERIAL, "channelNo": CHANNEL_NO} + params.update(extra) + print(f"\n{'='*50}") + print(f"【回放】{ep}") + r = requests.post(f"{BASE}{ep}", data=params) + print(f"HTTP {r.status_code} {r.text[:300]}") + + +if __name__ == "__main__": + token = get_token() + + yesterday = (datetime.now() - timedelta(days=1)).strftime("%Y-%m-%d") + y_start_ms = int(datetime.strptime(f"{yesterday} 00:00:00", "%Y-%m-%d %H:%M:%S").timestamp() * 1000) + y_end_ms = int(datetime.strptime(f"{yesterday} 23:59:59", "%Y-%m-%d %H:%M:%S").timestamp() * 1000) + + print("\n=== 1. 查录像列表 ===") + r = requests.post(f"{BASE}/video/by/time", data={ + "accessToken": token, "deviceSerial": DEVICE_SERIAL, + "channelNo": CHANNEL_NO, + "startTime": y_start_ms, "endTime": y_end_ms + }) + print(r.text[:800]) + + print("\n=== 2. 探测回放流地址 ===") + probe_playback_address(token) diff --git a/sync.py b/sync.py new file mode 100644 index 0000000..d8c1e49 --- /dev/null +++ b/sync.py @@ -0,0 +1,155 @@ +#!/usr/bin/env python3 +""" +萤石CB60 SD卡录像每日同步脚本 +每天凌晨运行,下载前24小时内的所有SD卡录像到本地目录。 +""" +import json, os, re, subprocess, logging +from datetime import datetime +from pathlib import Path +import requests + +from config import * + +# ====== 路径配置 ====== +OUTPUT_DIR = Path(OUTPUT_DIR) +TOKEN_CACHE = Path(TOKEN_CACHE) +LOG_FILE = Path("./sync.log") + +# NAS上部署时改为 "ffmpeg"(Docker镜像内) +FFMPEG = "/opt/homebrew/bin/ffmpeg" + +BASE = "https://open.ys7.com/api/lapp" + +# ====== 日志 ====== +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s %(message)s", + handlers=[ + logging.FileHandler(LOG_FILE, encoding="utf-8"), + logging.StreamHandler() + ] +) +log = logging.getLogger(__name__) + + +# ====== Token 管理 ====== +def get_token(): + if TOKEN_CACHE.exists(): + cache = json.loads(TOKEN_CACHE.read_text()) + if datetime.now().timestamp() < cache["expire_ts"] - 3600: + log.info(f"使用缓存 Token,有效期至 {cache['expire_str']}") + return cache["token"] + + r = requests.post(f"{BASE}/token/get", + data={"appKey": APP_KEY, "appSecret": APP_SECRET}) + d = r.json() + if d["code"] != "200": + raise Exception(f"Token 获取失败: {d}") + + token = d["data"]["accessToken"] + expire_ts = d["data"]["expireTime"] / 1000 + expire_str = datetime.fromtimestamp(expire_ts).strftime("%Y-%m-%d %H:%M:%S") + TOKEN_CACHE.write_text(json.dumps({ + "token": token, "expire_ts": expire_ts, "expire_str": expire_str + })) + log.info(f"Token 已刷新,有效期至 {expire_str}") + return token + + +# ====== 查询录像列表 ====== +def get_recording_list(token, start_ms, end_ms): + r = requests.post(f"{BASE}/video/by/time", data={ + "accessToken": token, + "deviceSerial": DEVICE_SERIAL, + "channelNo": CHANNEL_NO, + "startTime": start_ms, + "endTime": end_ms, + }) + d = r.json() + if d["code"] != "200": + log.warning(f"录像列表查询失败: {d}") + return [] + return d.get("data", []) + + +# ====== 获取 FLV 回放流地址 ====== +def get_flv_url(token): + """ + type=2:SD卡回放流,覆盖过去24小时内所有录像片段(服务器拼接成一个 FLV) + protocol=4:HTTP FLV,ffmpeg 可直接下载 + """ + r = requests.post(f"{BASE}/v2/live/address/get", data={ + "accessToken": token, + "deviceSerial": DEVICE_SERIAL, + "channelNo": CHANNEL_NO, + "quality": 1, + "type": 2, + "protocol": 4, + }) + d = r.json() + if d["code"] != "200": + raise Exception(f"FLV 地址获取失败: {d}") + return d["data"]["url"], d["data"]["expireTime"] + + +# ====== ffmpeg 下载 ====== +def download_flv(url, output_path): + output_path.parent.mkdir(parents=True, exist_ok=True) + cmd = [ + FFMPEG, + "-i", url, + "-c", "copy", + "-y", str(output_path), + "-v", "warning", + ] + log.info(f"开始下载 → {output_path}") + result = subprocess.run(cmd, capture_output=True, text=True) + + if not output_path.exists() or output_path.stat().st_size < 1024: + log.error(f"下载失败或文件过小\n{result.stderr[:400]}") + return False + + size_kb = output_path.stat().st_size / 1024 + log.info(f"下载完成,大小 {size_kb:.0f} KB") + return True + + +# ====== 主流程 ====== +def sync(): + log.info("===== 开始同步 =====") + token = get_token() + + # 1. 查过去24小时是否有录像 + now_ms = int(datetime.now().timestamp() * 1000) + start_ms = now_ms - 86_400_000 + recordings = get_recording_list(token, start_ms, now_ms) + + if not recordings: + log.info("过去24小时无SD卡录像,退出") + return + + log.info(f"发现 {len(recordings)} 段录像") + + # 2. 获取 FLV 流(服务器把24小时内所有录像拼接好) + url, expire_time = get_flv_url(token) + log.info(f"FLV URL 有效期至 {expire_time}") + + # 3. 文件命名:按 begin 时间戳 + begin_match = re.search(r'begin=(\d{14})', url) + begin_str = begin_match.group(1) if begin_match else datetime.now().strftime("%Y%m%d%H%M%S") + date_str = begin_str[:8] # e.g. 20260613 + + output_path = OUTPUT_DIR / DEVICE_SERIAL / date_str / f"{begin_str}.mp4" + + # 4. 已下载则跳过 + if output_path.exists() and output_path.stat().st_size > 1024: + log.info(f"已存在 {output_path.name},跳过") + return + + # 5. 下载 + download_flv(url, output_path) + log.info("===== 同步完成 =====") + + +if __name__ == "__main__": + sync()