Files
yuming 45d17ee576 feat: 萤石 CB60 录像每日同步初始版本
通过 ADB 自动化模拟器中的萤石 App,每天把 SD 卡里昨天的录像批量
下载到本地。云 API 方案 (sync.py/probe.py) 是备选,需先在萤石开放
平台关闭设备码流加密。

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-06-16 10:41:38 +08:00

489 lines
20 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""
萤石录像自动下载脚本
通过 ADB 自动化模拟器中的萤石 App,下载前一天的所有报警录像。
"""
import subprocess, time, os, shutil, struct, zlib
from datetime import datetime, timedelta
from pathlib import Path
PYTHON = "/usr/local/bin/python3.12"
ADB = os.path.expanduser("~/Library/Android/sdk/platform-tools/adb")
DEVICE = "emulator-5554"
OUTPUT_DIR = Path("./recordings")
PKG = "com.videogo"
# 屏幕坐标(Pixel 6 模拟器 1080x24002026-06-15 实测校准)
COORD = {
"home_camera_card": (540, 500), # 首页摄像头卡片
"live_video_btn": (270, 1340), # 摄像头详情页-实时视频按钮
"record_tab": (540, 2290), # 底部录像 Tab
"prev_day_arrow": (80, 1130), # < 昨天箭头
"download_btn": (426, 880), # 录像页工具栏-下载按钮
"batch_download": (540, 1904), # 弹出菜单"批量下载"UIAutomator dump 实测)
"reload_btn": (540, 1960), # 加载失败页的"重新加载"按钮
}
# 批量下载界面缩略图布局(UIAutomator dump 实测)
# 3 列 x 中心:201, 540, 878
# 行高 203 像素;y 中心从 1240 开始
THUMB_X = [201, 540, 878]
THUMB_ROW_H = 203
# 每次点 3 整行 = 9 个(不触发 10 并发限制),然后滚动 3 行
def adb(*args):
return subprocess.run([ADB, "-s", DEVICE] + list(args),
capture_output=True, text=True)
def tap(x, y, delay=0.8):
adb("shell", "input", "tap", str(x), str(y))
time.sleep(delay)
def find_by_text(target_text, retries=4, delay=2.0):
"""通过 uiautomator dump 查找指定 text 节点的中心坐标。
返回 (cx, cy) 或 None。重试次数受限,每次间隔 >= 2s,避免 ANR。
"""
import re
# 用 [^<>]*? 限制在同一 <node /> 标签内,并跨过含 / 的属性
# (如 resource-id="com.videogo:id/tv_download_all"
pat = re.compile(
r'text="' + re.escape(target_text) + r'"[^<>]*?bounds="\[(\d+),(\d+)\]\[(\d+),(\d+)\]"',
)
for i in range(retries):
adb("shell", "uiautomator", "dump", "/sdcard/uib.xml")
adb("pull", "/sdcard/uib.xml", "/tmp/uib.xml")
try:
with open("/tmp/uib.xml") as f:
content = f.read()
except FileNotFoundError:
content = ""
m = pat.search(content)
if m:
x1, y1, x2, y2 = map(int, m.groups())
return ((x1 + x2) // 2, (y1 + y2) // 2)
if i < retries - 1:
time.sleep(delay)
return None
def tap_text(text, after_delay=1.5, retries=8):
"""查找文字元素并点击。找不到抛 RuntimeError,避免误触别的页面。"""
pos = find_by_text(text, retries=retries)
if pos is None:
raise RuntimeError(f"找不到目标文字:'{text}',停止以避免误触")
cx, cy = pos
print(f" → tap '{text}' at ({cx}, {cy})")
adb("shell", "input", "tap", str(cx), str(cy))
time.sleep(after_delay)
def tap_until_text(x, y, expected_text, max_tries=4, after_tap_wait=4.0):
"""点击 (x,y) 并等待 expected_text 出现,验证页面切换。
没切换则重试点击。tap 后等 4 秒再 dump,避免 ANR(dump 太密集会让 App 卡死)。
"""
for trial in range(max_tries):
adb("shell", "input", "tap", str(x), str(y))
time.sleep(after_tap_wait)
if find_by_text(expected_text, retries=1):
print(f" ✓ tap ({x},{y}) 后看到 '{expected_text}'")
return True
print(f" ! tap ({x},{y}) 后未出现 '{expected_text}',重试 {trial+1}/{max_tries}")
# ANR 风险点:连续失败时给 App 喘息时间
time.sleep(2)
raise RuntimeError(f"点击 ({x},{y}) 重试 {max_tries} 次仍未跳转到含 '{expected_text}' 的页面")
def dismiss_anr_if_any():
"""检测系统 ANR 弹窗,存在则点 Wait 让 App 继续运行(不杀进程)。"""
result = adb("shell", "dumpsys", "activity", "activities")
if "isn't responding" in result.stdout or "AppErrorDialog" in result.stdout:
print(" ⚠ 检测到 ANR 弹窗,点 Wait")
# Wait 按钮 bounds=[70,1299][1010,1425],中心 (540,1362)
adb("shell", "input", "tap", "540", "1362")
time.sleep(4)
return True
return False
def tap_until_activity(x, y, expected_activity_keyword, max_tries=6, after_tap_wait=6.0):
"""点击 (x,y),通过 dumpsys 检查 Activity 是否切换到含 keyword 的页面。
每次尝试前都检查 ANR,存在则先点 Wait。间隔放宽到 6s 给模拟器+萤石喘息时间。
"""
for trial in range(max_tries):
dismiss_anr_if_any()
adb("shell", "input", "tap", str(x), str(y))
time.sleep(after_tap_wait)
cur = current_activity()
if cur and expected_activity_keyword in cur:
print(f" ✓ tap ({x},{y}) 后进入 '{cur}'")
return True
print(f" ! tap ({x},{y}) 后 Activity='{cur}' 不含 '{expected_activity_keyword}',重试 {trial+1}/{max_tries}")
# 失败时给 App 更长的喘息时间(避免连环 ANR)
time.sleep(5)
raise RuntimeError(
f"点击 ({x},{y}) 重试 {max_tries} 次仍未跳到含 '{expected_activity_keyword}' 的 Activity"
)
def current_activity():
"""返回当前 topResumedActivity 的简短名(如 'MainTabActivity')。"""
import re
result = adb("shell", "dumpsys", "activity", "activities")
m = re.search(r'topResumedActivity=ActivityRecord\{[^}]*\s+com\.videogo/\.?([^\s}]+)', result.stdout)
return m.group(1) if m else None
def assert_activity(keyword, hint=""):
"""断言当前 Activity 名含 keyword。不含则报错。避免在错页面盲点。"""
cur = current_activity()
if not cur or keyword not in cur:
raise RuntimeError(f"期望 Activity 含 '{keyword}',实际为 '{cur}'{hint}")
return cur
def tap_safe(x, y, expect_activity_keyword, hint="", delay=1.5):
"""先断言当前 Activity 匹配 keyword,再点击 (x,y),避免误触别的页面。"""
cur = assert_activity(expect_activity_keyword, hint)
print(f" → tap ({x},{y}) [当前 Activity: {cur}]")
adb("shell", "input", "tap", str(x), str(y))
time.sleep(delay)
def sample_pixel_rgb(x, y, png_path="/tmp/_pixchk.png"):
"""截图并采样指定坐标 RGB。需要系统装了 ImageMagick (`magick`)。"""
import re
adb("shell", "screencap", "-p", "/sdcard/_pixchk.png")
adb("pull", "/sdcard/_pixchk.png", png_path)
out = subprocess.run(
["magick", png_path, "-format", "%[pixel:p{" + str(x) + "," + str(y) + "}]", "info:"],
capture_output=True, text=True,
)
m = re.match(r"srgba?\((\d+),(\d+),(\d+)", out.stdout.strip())
if not m:
return (0, 0, 0)
return tuple(map(int, m.groups()))
def tap_until_pixel_changed(x, y, sample_x, sample_y, condition_fn,
max_tries=5, timeout_per_try=8, interval=1.0):
"""点 (x,y),然后轮询 (sample_x, sample_y) 像素,直到 condition_fn(rgb) 为真。
超时则重试点击。用于无文字可定位、但有视觉信号的场景(如切日期后工具栏变化)。
"""
for trial in range(max_tries):
adb("shell", "input", "tap", str(x), str(y))
for sec in range(int(timeout_per_try / interval)):
time.sleep(interval)
rgb = sample_pixel_rgb(sample_x, sample_y)
if condition_fn(rgb):
print(f" ✓ tap ({x},{y}) 生效,{sec+1}s 后检测到像素变化 rgb={rgb}")
return True
print(f" ! tap ({x},{y}) 后像素无变化({timeout_per_try}s),重试 {trial+1}/{max_tries}")
raise RuntimeError(f"点击 ({x},{y}) 重试 {max_tries} 次后像素仍未变化")
def screenshot(path="/tmp/ezviz_screen.png"):
adb("shell", "screencap", "-p", "/sdcard/ezviz_tmp.png")
adb("pull", "/sdcard/ezviz_tmp.png", path)
return path
def launch_app():
"""从头启动 App,等待 618 开屏广告自动关闭后回到首页。
用 am start -S 强制重启主 Activity(比 monkey 更可靠:避免之前残留的
RN 业务页占据栈顶导致 monkey 不进入首页)。
"""
adb("shell", "am", "force-stop", PKG)
time.sleep(2)
adb("shell", "am", "start", "-S", "-n", f"{PKG}/.LauncherActivity")
# 等开屏广告(618 全屏广告自动播放 5~8 秒)+ App 启动稳定
time.sleep(15)
def is_error_page():
"""检测当前是否为'加载失败'白屏页面
取屏幕中央像素,白色/浅灰 = 错误页,深色 = 有视频内容。
"""
tmp = "/tmp/ezviz_check.png"
adb("shell", "screencap", "-p", "/sdcard/ezviz_chk.png")
adb("pull", "/sdcard/ezviz_chk.png", tmp)
try:
with open(tmp, "rb") as f:
data = f.read()
# 解析 PNG IDAT 得到原始像素
pos, idat = 8, b""
width = height = 0
while pos < len(data):
length = struct.unpack(">I", data[pos:pos+4])[0]
ctype = data[pos+4:pos+8]
cdata = data[pos+8:pos+8+length]
if ctype == b"IHDR":
width, height = struct.unpack(">II", cdata[:8])
elif ctype == b"IDAT":
idat += cdata
elif ctype == b"IEND":
break
pos += 12 + length
raw = zlib.decompress(idat)
# screencap 输出 RGBA,每行有 1 字节 filter 前缀
bpp = 4
stride = width * bpp + 1
cx, cy = width // 2, height // 3 # 取屏幕上 1/3 处中央(视频区域)
row_start = cy * stride + 1
r = raw[row_start + cx * bpp]
g = raw[row_start + cx * bpp + 1]
b = raw[row_start + cx * bpp + 2]
# 亮度 > 220 认为是白屏(错误页)
return (r + g + b) // 3 > 220
except Exception:
return False # 解析失败时不做干预
def navigate_to_yesterday_batch():
"""导航到昨天的 SD 卡批量下载模式。
关键节点用 tap_text 通过 UI dump 定位,避免坐标错位时误触首页底部
Tab(萤石/对话/商城/社区/我的,"商城"中心 x=540 跟录像 Tab 撞车)。
"""
print("→ 点击首页摄像头卡片(用 Activity 切换验证,避免频繁 dump 触发 ANR)")
# 详情页是 RN 实现的 EZReactBizActivity
tap_until_activity(*COORD["home_camera_card"], expected_activity_keyword="EZReactBizActivity")
print("→ 点击实时视频(详情页有文字,可以 dump 定位)")
tap_text("实时视频", after_delay=5)
for attempt in range(3):
if not is_error_page():
print(" 实时视频加载成功")
break
print(f" 检测到加载失败,重试 {attempt+1}/3 ...")
tap(*COORD["reload_btn"], delay=6)
else:
print(" 三次失败,返回重试")
adb("shell", "input", "keyevent", "KEYCODE_BACK")
time.sleep(2)
tap(*COORD["home_camera_card"], delay=3)
tap_text("实时视频", after_delay=8)
# 实时视频页全自绘,dump 不到底部 Tab 文字。
# 但当前 Activity = VideoPlayActivityNewVersion,跟首页 MainTabActivity 不同。
# 用 Activity 校验保护坐标点击,避免误触首页商城。
print("→ 切换到录像 Tab(先校验在实时视频页)")
tap_safe(*COORD["record_tab"], expect_activity_keyword="VideoPlayActivity",
hint="未进入实时视频页,可能上一步点击实时视频按钮失败")
print("→ 切换到昨天(像素验证:切换后日期标签 (180,1130) 出现深色文字)")
assert_activity("VideoPlayActivity", "未在实时视频/录像页")
# "今日" 状态下 (180,1130) 是浅蓝灰背景 (244,246,252)
# "06-XX" 状态下该位置是数字深色文字(R~44)
tap_until_pixel_changed(
*COORD["prev_day_arrow"],
sample_x=180, sample_y=1130,
condition_fn=lambda rgb: rgb[0] < 100,
)
# 日期切换后工具栏不会自动切换。需要点一个缩略图把"录像选中状态"打开,
# 工具栏才从"延时摄影/清晰度/对讲/..." 切到"收藏/下载/剪辑/..."
# 工具栏第 3 位 (540,900) 由浅蓝(对讲圆圈)变白即为切换完成
print("→ 点一个缩略图触发工具栏切换")
adb("shell", "input", "tap", "200", "1400") # 第一行第一列缩略图
for _ in range(15):
time.sleep(1)
rgb = sample_pixel_rgb(540, 900)
if rgb[0] > 240 and rgb[1] > 240 and rgb[2] > 240:
print(f" ✓ 工具栏已切换到含'下载'按钮(rgb={rgb}")
break
else:
raise RuntimeError("点缩略图后工具栏未切换到下载模式")
# 等视频开始稳定播放再操作(否则 tap 会被加载过程吸收)
time.sleep(3)
# 下载按钮可能因为视频还在加载而被吸收,循环重点击直到菜单弹出
print("→ 点击下载按钮 + 等批量下载菜单出现(最多 4 次重试)")
assert_activity("VideoPlayActivity", "未在录像页")
for trial in range(4):
adb("shell", "input", "tap", str(COORD["download_btn"][0]), str(COORD["download_btn"][1]))
time.sleep(3)
pos = find_by_text("批量下载", retries=3, delay=1.5)
if pos:
print(f" ✓ 菜单弹出,'批量下载' at {pos}")
cx, cy = pos
adb("shell", "input", "tap", str(cx), str(cy))
time.sleep(3)
break
print(f" ! 第 {trial+1} 次点下载未弹出菜单,重试")
else:
raise RuntimeError("反复点下载按钮仍未弹出批量下载菜单")
def dump_thumbnail_bounds():
"""通过 uiautomator dump 拿到当前批量下载界面可见缩略图的 bounds 列表。
返回 [(x1,y1,x2,y2), ...] 按 y 升序。
过滤:宽 > 300,高 > 150y1 在 [1000, 2300] 区间。
"""
import re
adb("shell", "uiautomator", "dump", "/sdcard/uib.xml")
adb("pull", "/sdcard/uib.xml", "/tmp/uib.xml")
try:
with open("/tmp/uib.xml") as f:
content = f.read()
except FileNotFoundError:
return []
pat = re.compile(r'clickable="true"[^/]*bounds="\[(\d+),(\d+)\]\[(\d+),(\d+)\]"')
out = []
for m in pat.finditer(content):
x1, y1, x2, y2 = map(int, m.groups())
if 1000 <= y1 <= 2300 and (x2 - x1) > 300 and (y2 - y1) > 150:
out.append((x1, y1, x2, y2))
out.sort(key=lambda b: (b[1], b[0]))
return out
def dismiss_dialog_if_any():
"""检测并关闭"使用手机网络下载将消耗流量"等萤石弹窗。
点击 text="确定" 按钮。
"""
pos = find_by_text("确定", retries=1)
if pos:
print(f" ⚠ 检测到弹窗,点击'确定' at {pos}")
adb("shell", "input", "tap", str(pos[0]), str(pos[1]))
time.sleep(2)
return True
return False
def tap_all_thumbnails(max_rounds=100, batch_sleep=35):
"""循环:dump → 点击 9 个 → 等下载 → 滚动 3 行。
每轮点 3 整行(9 个)避免触发 10 并发限制。
滚动距离 = 3 行 = ~609 像素,保证下轮可见行与本轮不重叠。
检测到底:连续 2 轮 bounds 列表相同则结束。
"""
print(f"→ 开始批量下载循环(最多 {max_rounds} 轮)")
# 进入批量模式后会有"使用手机网络流量"弹窗,先关掉
dismiss_dialog_if_any()
last_signature = None
same_count = 0
for r in range(max_rounds):
bounds = dump_thumbnail_bounds()
if not bounds:
print(f"{r+1}: 没拿到缩略图 bounds,结束")
break
# 取完整可见行(高度 > 195)
full = [b for b in bounds if (b[3] - b[1]) > 195]
if len(full) < 3:
print(f"{r+1}: 完整行不足 ({len(full)} < 3),结束")
break
# 检测到底:bounds 内容跟上次相同
signature = tuple(full[:9])
if signature == last_signature:
same_count += 1
if same_count >= 2:
print(f"{r+1}: 连续 2 轮内容相同,已到底,结束")
break
else:
same_count = 0
last_signature = signature
to_click = full[:9]
for x1, y1, x2, y2 in to_click:
cx, cy = (x1 + x2) // 2, (y1 + y2) // 2
adb("shell", "input", "tap", str(cx), str(cy))
time.sleep(0.3)
print(f"{r+1}: 点击 {len(to_click)} 个,等 {batch_sleep}s 下载...")
time.sleep(batch_sleep)
# 向下滚动 3 行(约 609 像素)
adb("shell", "input", "swipe", "540", "1820", "540", "1211", "400")
time.sleep(1.5)
print("→ 批量下载循环结束")
def wait_for_downloads(baseline_files, timeout=600):
"""等待下载目录新增文件稳定
文件名使用下载时刻时间戳,固定存到今天的目录。
baseline_files 为下载前已有的文件集合,用于计算新增数量。
"""
today = datetime.now().strftime("%Y%m%d")
remote_dir = f"/sdcard/Android/data/{PKG}/files/{today}"
print(f"→ 等待下载完成(目录 {today},起始已有 {len(baseline_files)} 个)...")
last_new = -1
stable_rounds = 0
deadline = time.time() + timeout
while time.time() < deadline:
result = adb("shell", "find", remote_dir, "-name", "*_BH*", "-type", "f")
all_files = set(l for l in result.stdout.strip().splitlines() if l)
new_files = all_files - baseline_files
count = len(new_files)
print(f" 新下载 {count} 个文件...")
if count > 0 and count == last_new:
stable_rounds += 1
if stable_rounds >= 3:
print(f"→ 下载稳定,共新增 {count} 个文件")
return list(new_files)
else:
stable_rounds = 0
last_new = count
time.sleep(10)
print("警告:等待超时")
# 返回已下载的新文件(即便未完全稳定)
result = adb("shell", "find", remote_dir, "-name", "*_BH*", "-type", "f")
all_files = set(l for l in result.stdout.strip().splitlines() if l)
return list(all_files - baseline_files)
def pull_files(remote_files):
"""把下载好的文件 adb pull 到本地"""
yesterday = (datetime.now() - timedelta(days=1)).strftime("%Y%m%d")
# 本地目录按录像日期(昨天)命名,方便归档
local_dir = OUTPUT_DIR / "BH5455783" / yesterday
local_dir.mkdir(parents=True, exist_ok=True)
print(f"→ 拉取 {len(remote_files)} 个文件到 {local_dir}")
for remote_path in remote_files:
fname = Path(remote_path).name + ".mp4"
local_path = local_dir / fname
if local_path.exists():
print(f" 跳过(已存在): {fname}")
continue
adb("pull", remote_path, str(local_path))
print(f" 已拉取: {fname}")
print(f"→ 完成,文件保存在 {local_dir}")
def get_baseline_files():
"""获取下载开始前已有的文件列表"""
today = datetime.now().strftime("%Y%m%d")
remote_dir = f"/sdcard/Android/data/{PKG}/files/{today}"
result = adb("shell", "find", remote_dir, "-name", "*_BH*", "-type", "f")
return set(l for l in result.stdout.strip().splitlines() if l)
def sync():
print("===== 萤石录像自动同步开始 =====")
launch_app()
# 记录下载前的基准文件集合
baseline = get_baseline_files()
print(f"→ 基准文件数: {len(baseline)}")
navigate_to_yesterday_batch()
tap_all_thumbnails() # 默认 max_rounds=100,跑到滚到底为止
files = wait_for_downloads(baseline)
if files:
pull_files(files)
else:
print("未找到新下载文件")
print("===== 同步完成 =====")
if __name__ == "__main__":
sync()