#!/usr/bin/env python3 """ 萤石CB60 SD卡录像每日同步脚本 每天凌晨运行,下载前24小时内的所有SD卡录像到本地目录。 """ import json, os, re, subprocess, logging from datetime import datetime from pathlib import Path import requests from config import * # ====== 路径配置 ====== OUTPUT_DIR = Path(OUTPUT_DIR) TOKEN_CACHE = Path(TOKEN_CACHE) LOG_FILE = Path("./sync.log") # NAS上部署时改为 "ffmpeg"(Docker镜像内) FFMPEG = "/opt/homebrew/bin/ffmpeg" BASE = "https://open.ys7.com/api/lapp" # ====== 日志 ====== logging.basicConfig( level=logging.INFO, format="%(asctime)s %(message)s", handlers=[ logging.FileHandler(LOG_FILE, encoding="utf-8"), logging.StreamHandler() ] ) log = logging.getLogger(__name__) # ====== Token 管理 ====== def get_token(): if TOKEN_CACHE.exists(): cache = json.loads(TOKEN_CACHE.read_text()) if datetime.now().timestamp() < cache["expire_ts"] - 3600: log.info(f"使用缓存 Token,有效期至 {cache['expire_str']}") return cache["token"] r = requests.post(f"{BASE}/token/get", data={"appKey": APP_KEY, "appSecret": APP_SECRET}) d = r.json() if d["code"] != "200": raise Exception(f"Token 获取失败: {d}") token = d["data"]["accessToken"] expire_ts = d["data"]["expireTime"] / 1000 expire_str = datetime.fromtimestamp(expire_ts).strftime("%Y-%m-%d %H:%M:%S") TOKEN_CACHE.write_text(json.dumps({ "token": token, "expire_ts": expire_ts, "expire_str": expire_str })) log.info(f"Token 已刷新,有效期至 {expire_str}") return token # ====== 查询录像列表 ====== def get_recording_list(token, start_ms, end_ms): r = requests.post(f"{BASE}/video/by/time", data={ "accessToken": token, "deviceSerial": DEVICE_SERIAL, "channelNo": CHANNEL_NO, "startTime": start_ms, "endTime": end_ms, }) d = r.json() if d["code"] != "200": log.warning(f"录像列表查询失败: {d}") return [] return d.get("data", []) # ====== 获取 FLV 回放流地址 ====== def get_flv_url(token): """ type=2:SD卡回放流,覆盖过去24小时内所有录像片段(服务器拼接成一个 FLV) protocol=4:HTTP FLV,ffmpeg 可直接下载 """ r = requests.post(f"{BASE}/v2/live/address/get", data={ "accessToken": token, "deviceSerial": DEVICE_SERIAL, "channelNo": CHANNEL_NO, "quality": 1, "type": 2, "protocol": 4, }) d = r.json() if d["code"] != "200": raise Exception(f"FLV 地址获取失败: {d}") return d["data"]["url"], d["data"]["expireTime"] # ====== ffmpeg 下载 ====== def download_flv(url, output_path): output_path.parent.mkdir(parents=True, exist_ok=True) cmd = [ FFMPEG, "-i", url, "-c", "copy", "-y", str(output_path), "-v", "warning", ] log.info(f"开始下载 → {output_path}") result = subprocess.run(cmd, capture_output=True, text=True) if not output_path.exists() or output_path.stat().st_size < 1024: log.error(f"下载失败或文件过小\n{result.stderr[:400]}") return False size_kb = output_path.stat().st_size / 1024 log.info(f"下载完成,大小 {size_kb:.0f} KB") return True # ====== 主流程 ====== def sync(): log.info("===== 开始同步 =====") token = get_token() # 1. 查过去24小时是否有录像 now_ms = int(datetime.now().timestamp() * 1000) start_ms = now_ms - 86_400_000 recordings = get_recording_list(token, start_ms, now_ms) if not recordings: log.info("过去24小时无SD卡录像,退出") return log.info(f"发现 {len(recordings)} 段录像") # 2. 获取 FLV 流(服务器把24小时内所有录像拼接好) url, expire_time = get_flv_url(token) log.info(f"FLV URL 有效期至 {expire_time}") # 3. 文件命名:按 begin 时间戳 begin_match = re.search(r'begin=(\d{14})', url) begin_str = begin_match.group(1) if begin_match else datetime.now().strftime("%Y%m%d%H%M%S") date_str = begin_str[:8] # e.g. 20260613 output_path = OUTPUT_DIR / DEVICE_SERIAL / date_str / f"{begin_str}.mp4" # 4. 已下载则跳过 if output_path.exists() and output_path.stat().st_size > 1024: log.info(f"已存在 {output_path.name},跳过") return # 5. 下载 download_flv(url, output_path) log.info("===== 同步完成 =====") if __name__ == "__main__": sync()