feat: 萤石 CB60 录像每日同步初始版本

通过 ADB 自动化模拟器中的萤石 App,每天把 SD 卡里昨天的录像批量
下载到本地。云 API 方案 (sync.py/probe.py) 是备选,需先在萤石开放
平台关闭设备码流加密。

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
yuming
2026-06-16 10:41:38 +08:00
commit 45d17ee576
5 changed files with 814 additions and 0 deletions
+488
View File
@@ -0,0 +1,488 @@
#!/usr/bin/env python3
"""
萤石录像自动下载脚本
通过 ADB 自动化模拟器中的萤石 App,下载前一天的所有报警录像。
"""
import subprocess, time, os, shutil, struct, zlib
from datetime import datetime, timedelta
from pathlib import Path
PYTHON = "/usr/local/bin/python3.12"
ADB = os.path.expanduser("~/Library/Android/sdk/platform-tools/adb")
DEVICE = "emulator-5554"
OUTPUT_DIR = Path("./recordings")
PKG = "com.videogo"
# 屏幕坐标(Pixel 6 模拟器 1080x24002026-06-15 实测校准)
COORD = {
"home_camera_card": (540, 500), # 首页摄像头卡片
"live_video_btn": (270, 1340), # 摄像头详情页-实时视频按钮
"record_tab": (540, 2290), # 底部录像 Tab
"prev_day_arrow": (80, 1130), # < 昨天箭头
"download_btn": (426, 880), # 录像页工具栏-下载按钮
"batch_download": (540, 1904), # 弹出菜单"批量下载"UIAutomator dump 实测)
"reload_btn": (540, 1960), # 加载失败页的"重新加载"按钮
}
# 批量下载界面缩略图布局(UIAutomator dump 实测)
# 3 列 x 中心:201, 540, 878
# 行高 203 像素;y 中心从 1240 开始
THUMB_X = [201, 540, 878]
THUMB_ROW_H = 203
# 每次点 3 整行 = 9 个(不触发 10 并发限制),然后滚动 3 行
def adb(*args):
return subprocess.run([ADB, "-s", DEVICE] + list(args),
capture_output=True, text=True)
def tap(x, y, delay=0.8):
adb("shell", "input", "tap", str(x), str(y))
time.sleep(delay)
def find_by_text(target_text, retries=4, delay=2.0):
"""通过 uiautomator dump 查找指定 text 节点的中心坐标。
返回 (cx, cy) 或 None。重试次数受限,每次间隔 >= 2s,避免 ANR。
"""
import re
# 用 [^<>]*? 限制在同一 <node /> 标签内,并跨过含 / 的属性
# (如 resource-id="com.videogo:id/tv_download_all"
pat = re.compile(
r'text="' + re.escape(target_text) + r'"[^<>]*?bounds="\[(\d+),(\d+)\]\[(\d+),(\d+)\]"',
)
for i in range(retries):
adb("shell", "uiautomator", "dump", "/sdcard/uib.xml")
adb("pull", "/sdcard/uib.xml", "/tmp/uib.xml")
try:
with open("/tmp/uib.xml") as f:
content = f.read()
except FileNotFoundError:
content = ""
m = pat.search(content)
if m:
x1, y1, x2, y2 = map(int, m.groups())
return ((x1 + x2) // 2, (y1 + y2) // 2)
if i < retries - 1:
time.sleep(delay)
return None
def tap_text(text, after_delay=1.5, retries=8):
"""查找文字元素并点击。找不到抛 RuntimeError,避免误触别的页面。"""
pos = find_by_text(text, retries=retries)
if pos is None:
raise RuntimeError(f"找不到目标文字:'{text}',停止以避免误触")
cx, cy = pos
print(f" → tap '{text}' at ({cx}, {cy})")
adb("shell", "input", "tap", str(cx), str(cy))
time.sleep(after_delay)
def tap_until_text(x, y, expected_text, max_tries=4, after_tap_wait=4.0):
"""点击 (x,y) 并等待 expected_text 出现,验证页面切换。
没切换则重试点击。tap 后等 4 秒再 dump,避免 ANR(dump 太密集会让 App 卡死)。
"""
for trial in range(max_tries):
adb("shell", "input", "tap", str(x), str(y))
time.sleep(after_tap_wait)
if find_by_text(expected_text, retries=1):
print(f" ✓ tap ({x},{y}) 后看到 '{expected_text}'")
return True
print(f" ! tap ({x},{y}) 后未出现 '{expected_text}',重试 {trial+1}/{max_tries}")
# ANR 风险点:连续失败时给 App 喘息时间
time.sleep(2)
raise RuntimeError(f"点击 ({x},{y}) 重试 {max_tries} 次仍未跳转到含 '{expected_text}' 的页面")
def dismiss_anr_if_any():
"""检测系统 ANR 弹窗,存在则点 Wait 让 App 继续运行(不杀进程)。"""
result = adb("shell", "dumpsys", "activity", "activities")
if "isn't responding" in result.stdout or "AppErrorDialog" in result.stdout:
print(" ⚠ 检测到 ANR 弹窗,点 Wait")
# Wait 按钮 bounds=[70,1299][1010,1425],中心 (540,1362)
adb("shell", "input", "tap", "540", "1362")
time.sleep(4)
return True
return False
def tap_until_activity(x, y, expected_activity_keyword, max_tries=6, after_tap_wait=6.0):
"""点击 (x,y),通过 dumpsys 检查 Activity 是否切换到含 keyword 的页面。
每次尝试前都检查 ANR,存在则先点 Wait。间隔放宽到 6s 给模拟器+萤石喘息时间。
"""
for trial in range(max_tries):
dismiss_anr_if_any()
adb("shell", "input", "tap", str(x), str(y))
time.sleep(after_tap_wait)
cur = current_activity()
if cur and expected_activity_keyword in cur:
print(f" ✓ tap ({x},{y}) 后进入 '{cur}'")
return True
print(f" ! tap ({x},{y}) 后 Activity='{cur}' 不含 '{expected_activity_keyword}',重试 {trial+1}/{max_tries}")
# 失败时给 App 更长的喘息时间(避免连环 ANR)
time.sleep(5)
raise RuntimeError(
f"点击 ({x},{y}) 重试 {max_tries} 次仍未跳到含 '{expected_activity_keyword}' 的 Activity"
)
def current_activity():
"""返回当前 topResumedActivity 的简短名(如 'MainTabActivity')。"""
import re
result = adb("shell", "dumpsys", "activity", "activities")
m = re.search(r'topResumedActivity=ActivityRecord\{[^}]*\s+com\.videogo/\.?([^\s}]+)', result.stdout)
return m.group(1) if m else None
def assert_activity(keyword, hint=""):
"""断言当前 Activity 名含 keyword。不含则报错。避免在错页面盲点。"""
cur = current_activity()
if not cur or keyword not in cur:
raise RuntimeError(f"期望 Activity 含 '{keyword}',实际为 '{cur}'{hint}")
return cur
def tap_safe(x, y, expect_activity_keyword, hint="", delay=1.5):
"""先断言当前 Activity 匹配 keyword,再点击 (x,y),避免误触别的页面。"""
cur = assert_activity(expect_activity_keyword, hint)
print(f" → tap ({x},{y}) [当前 Activity: {cur}]")
adb("shell", "input", "tap", str(x), str(y))
time.sleep(delay)
def sample_pixel_rgb(x, y, png_path="/tmp/_pixchk.png"):
"""截图并采样指定坐标 RGB。需要系统装了 ImageMagick (`magick`)。"""
import re
adb("shell", "screencap", "-p", "/sdcard/_pixchk.png")
adb("pull", "/sdcard/_pixchk.png", png_path)
out = subprocess.run(
["magick", png_path, "-format", "%[pixel:p{" + str(x) + "," + str(y) + "}]", "info:"],
capture_output=True, text=True,
)
m = re.match(r"srgba?\((\d+),(\d+),(\d+)", out.stdout.strip())
if not m:
return (0, 0, 0)
return tuple(map(int, m.groups()))
def tap_until_pixel_changed(x, y, sample_x, sample_y, condition_fn,
max_tries=5, timeout_per_try=8, interval=1.0):
"""点 (x,y),然后轮询 (sample_x, sample_y) 像素,直到 condition_fn(rgb) 为真。
超时则重试点击。用于无文字可定位、但有视觉信号的场景(如切日期后工具栏变化)。
"""
for trial in range(max_tries):
adb("shell", "input", "tap", str(x), str(y))
for sec in range(int(timeout_per_try / interval)):
time.sleep(interval)
rgb = sample_pixel_rgb(sample_x, sample_y)
if condition_fn(rgb):
print(f" ✓ tap ({x},{y}) 生效,{sec+1}s 后检测到像素变化 rgb={rgb}")
return True
print(f" ! tap ({x},{y}) 后像素无变化({timeout_per_try}s),重试 {trial+1}/{max_tries}")
raise RuntimeError(f"点击 ({x},{y}) 重试 {max_tries} 次后像素仍未变化")
def screenshot(path="/tmp/ezviz_screen.png"):
adb("shell", "screencap", "-p", "/sdcard/ezviz_tmp.png")
adb("pull", "/sdcard/ezviz_tmp.png", path)
return path
def launch_app():
"""从头启动 App,等待 618 开屏广告自动关闭后回到首页。
用 am start -S 强制重启主 Activity(比 monkey 更可靠:避免之前残留的
RN 业务页占据栈顶导致 monkey 不进入首页)。
"""
adb("shell", "am", "force-stop", PKG)
time.sleep(2)
adb("shell", "am", "start", "-S", "-n", f"{PKG}/.LauncherActivity")
# 等开屏广告(618 全屏广告自动播放 5~8 秒)+ App 启动稳定
time.sleep(15)
def is_error_page():
"""检测当前是否为'加载失败'白屏页面
取屏幕中央像素,白色/浅灰 = 错误页,深色 = 有视频内容。
"""
tmp = "/tmp/ezviz_check.png"
adb("shell", "screencap", "-p", "/sdcard/ezviz_chk.png")
adb("pull", "/sdcard/ezviz_chk.png", tmp)
try:
with open(tmp, "rb") as f:
data = f.read()
# 解析 PNG IDAT 得到原始像素
pos, idat = 8, b""
width = height = 0
while pos < len(data):
length = struct.unpack(">I", data[pos:pos+4])[0]
ctype = data[pos+4:pos+8]
cdata = data[pos+8:pos+8+length]
if ctype == b"IHDR":
width, height = struct.unpack(">II", cdata[:8])
elif ctype == b"IDAT":
idat += cdata
elif ctype == b"IEND":
break
pos += 12 + length
raw = zlib.decompress(idat)
# screencap 输出 RGBA,每行有 1 字节 filter 前缀
bpp = 4
stride = width * bpp + 1
cx, cy = width // 2, height // 3 # 取屏幕上 1/3 处中央(视频区域)
row_start = cy * stride + 1
r = raw[row_start + cx * bpp]
g = raw[row_start + cx * bpp + 1]
b = raw[row_start + cx * bpp + 2]
# 亮度 > 220 认为是白屏(错误页)
return (r + g + b) // 3 > 220
except Exception:
return False # 解析失败时不做干预
def navigate_to_yesterday_batch():
"""导航到昨天的 SD 卡批量下载模式。
关键节点用 tap_text 通过 UI dump 定位,避免坐标错位时误触首页底部
Tab(萤石/对话/商城/社区/我的,"商城"中心 x=540 跟录像 Tab 撞车)。
"""
print("→ 点击首页摄像头卡片(用 Activity 切换验证,避免频繁 dump 触发 ANR)")
# 详情页是 RN 实现的 EZReactBizActivity
tap_until_activity(*COORD["home_camera_card"], expected_activity_keyword="EZReactBizActivity")
print("→ 点击实时视频(详情页有文字,可以 dump 定位)")
tap_text("实时视频", after_delay=5)
for attempt in range(3):
if not is_error_page():
print(" 实时视频加载成功")
break
print(f" 检测到加载失败,重试 {attempt+1}/3 ...")
tap(*COORD["reload_btn"], delay=6)
else:
print(" 三次失败,返回重试")
adb("shell", "input", "keyevent", "KEYCODE_BACK")
time.sleep(2)
tap(*COORD["home_camera_card"], delay=3)
tap_text("实时视频", after_delay=8)
# 实时视频页全自绘,dump 不到底部 Tab 文字。
# 但当前 Activity = VideoPlayActivityNewVersion,跟首页 MainTabActivity 不同。
# 用 Activity 校验保护坐标点击,避免误触首页商城。
print("→ 切换到录像 Tab(先校验在实时视频页)")
tap_safe(*COORD["record_tab"], expect_activity_keyword="VideoPlayActivity",
hint="未进入实时视频页,可能上一步点击实时视频按钮失败")
print("→ 切换到昨天(像素验证:切换后日期标签 (180,1130) 出现深色文字)")
assert_activity("VideoPlayActivity", "未在实时视频/录像页")
# "今日" 状态下 (180,1130) 是浅蓝灰背景 (244,246,252)
# "06-XX" 状态下该位置是数字深色文字(R~44)
tap_until_pixel_changed(
*COORD["prev_day_arrow"],
sample_x=180, sample_y=1130,
condition_fn=lambda rgb: rgb[0] < 100,
)
# 日期切换后工具栏不会自动切换。需要点一个缩略图把"录像选中状态"打开,
# 工具栏才从"延时摄影/清晰度/对讲/..." 切到"收藏/下载/剪辑/..."
# 工具栏第 3 位 (540,900) 由浅蓝(对讲圆圈)变白即为切换完成
print("→ 点一个缩略图触发工具栏切换")
adb("shell", "input", "tap", "200", "1400") # 第一行第一列缩略图
for _ in range(15):
time.sleep(1)
rgb = sample_pixel_rgb(540, 900)
if rgb[0] > 240 and rgb[1] > 240 and rgb[2] > 240:
print(f" ✓ 工具栏已切换到含'下载'按钮(rgb={rgb}")
break
else:
raise RuntimeError("点缩略图后工具栏未切换到下载模式")
# 等视频开始稳定播放再操作(否则 tap 会被加载过程吸收)
time.sleep(3)
# 下载按钮可能因为视频还在加载而被吸收,循环重点击直到菜单弹出
print("→ 点击下载按钮 + 等批量下载菜单出现(最多 4 次重试)")
assert_activity("VideoPlayActivity", "未在录像页")
for trial in range(4):
adb("shell", "input", "tap", str(COORD["download_btn"][0]), str(COORD["download_btn"][1]))
time.sleep(3)
pos = find_by_text("批量下载", retries=3, delay=1.5)
if pos:
print(f" ✓ 菜单弹出,'批量下载' at {pos}")
cx, cy = pos
adb("shell", "input", "tap", str(cx), str(cy))
time.sleep(3)
break
print(f" ! 第 {trial+1} 次点下载未弹出菜单,重试")
else:
raise RuntimeError("反复点下载按钮仍未弹出批量下载菜单")
def dump_thumbnail_bounds():
"""通过 uiautomator dump 拿到当前批量下载界面可见缩略图的 bounds 列表。
返回 [(x1,y1,x2,y2), ...] 按 y 升序。
过滤:宽 > 300,高 > 150y1 在 [1000, 2300] 区间。
"""
import re
adb("shell", "uiautomator", "dump", "/sdcard/uib.xml")
adb("pull", "/sdcard/uib.xml", "/tmp/uib.xml")
try:
with open("/tmp/uib.xml") as f:
content = f.read()
except FileNotFoundError:
return []
pat = re.compile(r'clickable="true"[^/]*bounds="\[(\d+),(\d+)\]\[(\d+),(\d+)\]"')
out = []
for m in pat.finditer(content):
x1, y1, x2, y2 = map(int, m.groups())
if 1000 <= y1 <= 2300 and (x2 - x1) > 300 and (y2 - y1) > 150:
out.append((x1, y1, x2, y2))
out.sort(key=lambda b: (b[1], b[0]))
return out
def dismiss_dialog_if_any():
"""检测并关闭"使用手机网络下载将消耗流量"等萤石弹窗。
点击 text="确定" 按钮。
"""
pos = find_by_text("确定", retries=1)
if pos:
print(f" ⚠ 检测到弹窗,点击'确定' at {pos}")
adb("shell", "input", "tap", str(pos[0]), str(pos[1]))
time.sleep(2)
return True
return False
def tap_all_thumbnails(max_rounds=100, batch_sleep=35):
"""循环:dump → 点击 9 个 → 等下载 → 滚动 3 行。
每轮点 3 整行(9 个)避免触发 10 并发限制。
滚动距离 = 3 行 = ~609 像素,保证下轮可见行与本轮不重叠。
检测到底:连续 2 轮 bounds 列表相同则结束。
"""
print(f"→ 开始批量下载循环(最多 {max_rounds} 轮)")
# 进入批量模式后会有"使用手机网络流量"弹窗,先关掉
dismiss_dialog_if_any()
last_signature = None
same_count = 0
for r in range(max_rounds):
bounds = dump_thumbnail_bounds()
if not bounds:
print(f"{r+1}: 没拿到缩略图 bounds,结束")
break
# 取完整可见行(高度 > 195)
full = [b for b in bounds if (b[3] - b[1]) > 195]
if len(full) < 3:
print(f"{r+1}: 完整行不足 ({len(full)} < 3),结束")
break
# 检测到底:bounds 内容跟上次相同
signature = tuple(full[:9])
if signature == last_signature:
same_count += 1
if same_count >= 2:
print(f"{r+1}: 连续 2 轮内容相同,已到底,结束")
break
else:
same_count = 0
last_signature = signature
to_click = full[:9]
for x1, y1, x2, y2 in to_click:
cx, cy = (x1 + x2) // 2, (y1 + y2) // 2
adb("shell", "input", "tap", str(cx), str(cy))
time.sleep(0.3)
print(f"{r+1}: 点击 {len(to_click)} 个,等 {batch_sleep}s 下载...")
time.sleep(batch_sleep)
# 向下滚动 3 行(约 609 像素)
adb("shell", "input", "swipe", "540", "1820", "540", "1211", "400")
time.sleep(1.5)
print("→ 批量下载循环结束")
def wait_for_downloads(baseline_files, timeout=600):
"""等待下载目录新增文件稳定
文件名使用下载时刻时间戳,固定存到今天的目录。
baseline_files 为下载前已有的文件集合,用于计算新增数量。
"""
today = datetime.now().strftime("%Y%m%d")
remote_dir = f"/sdcard/Android/data/{PKG}/files/{today}"
print(f"→ 等待下载完成(目录 {today},起始已有 {len(baseline_files)} 个)...")
last_new = -1
stable_rounds = 0
deadline = time.time() + timeout
while time.time() < deadline:
result = adb("shell", "find", remote_dir, "-name", "*_BH*", "-type", "f")
all_files = set(l for l in result.stdout.strip().splitlines() if l)
new_files = all_files - baseline_files
count = len(new_files)
print(f" 新下载 {count} 个文件...")
if count > 0 and count == last_new:
stable_rounds += 1
if stable_rounds >= 3:
print(f"→ 下载稳定,共新增 {count} 个文件")
return list(new_files)
else:
stable_rounds = 0
last_new = count
time.sleep(10)
print("警告:等待超时")
# 返回已下载的新文件(即便未完全稳定)
result = adb("shell", "find", remote_dir, "-name", "*_BH*", "-type", "f")
all_files = set(l for l in result.stdout.strip().splitlines() if l)
return list(all_files - baseline_files)
def pull_files(remote_files):
"""把下载好的文件 adb pull 到本地"""
yesterday = (datetime.now() - timedelta(days=1)).strftime("%Y%m%d")
# 本地目录按录像日期(昨天)命名,方便归档
local_dir = OUTPUT_DIR / "BH5455783" / yesterday
local_dir.mkdir(parents=True, exist_ok=True)
print(f"→ 拉取 {len(remote_files)} 个文件到 {local_dir}")
for remote_path in remote_files:
fname = Path(remote_path).name + ".mp4"
local_path = local_dir / fname
if local_path.exists():
print(f" 跳过(已存在): {fname}")
continue
adb("pull", remote_path, str(local_path))
print(f" 已拉取: {fname}")
print(f"→ 完成,文件保存在 {local_dir}")
def get_baseline_files():
"""获取下载开始前已有的文件列表"""
today = datetime.now().strftime("%Y%m%d")
remote_dir = f"/sdcard/Android/data/{PKG}/files/{today}"
result = adb("shell", "find", remote_dir, "-name", "*_BH*", "-type", "f")
return set(l for l in result.stdout.strip().splitlines() if l)
def sync():
print("===== 萤石录像自动同步开始 =====")
launch_app()
# 记录下载前的基准文件集合
baseline = get_baseline_files()
print(f"→ 基准文件数: {len(baseline)}")
navigate_to_yesterday_batch()
tap_all_thumbnails() # 默认 max_rounds=100,跑到滚到底为止
files = wait_for_downloads(baseline)
if files:
pull_files(files)
else:
print("未找到新下载文件")
print("===== 同步完成 =====")
if __name__ == "__main__":
sync()