对讲功能自动化测试方案与实现
一、背景与目标
1.1 为什么要进行自动化测试?
对讲功能的测试主要面临两个核心问题:
| 问题类型 | 描述 | 挑战 |
|---|---|---|
| 通不通 | 对讲能否成功建立并传输音视频 | 相对简单,可通过接口响应和设备日志快速验证 |
| 效果好 | 音视频质量是否达到业务标准 | 主观性强,引发争议 |
效果争议的典型场景:
- 坐席人员为保护嗓子将麦克风贴近嘴边,可能导致设备端炸麦、出现爆声。
- 研发人员认为是麦克风使用不当(测试方法问题),而产品和客服认为是程序处理问题。
- 双方各执一词,缺乏客观量化数据支撑。
自动化测试的价值:
- 排除主观因素:通过网络指标、波形分析等量化手段客观评价质量。
- 降低测试成本:减少人工参与,支持长时间、高频次回归测试。
- 建立质量基线:为后续音视频优化提供可比对的历史数据。
1.2 对讲业务流程概览
一次完整的对讲涉及上位机(平台/坐席)、业务服务器、调度服务器、媒体代理及终端设备五个角色的协同。下图完整展示了从呼叫发起到挂断的全过程(单向):
sequenceDiagramparticipant Platform as 上位机/测试脚本participant Server as 业务服务器 (ZSJSCC)participant Device as 终端设备 (JSMG)participant Dispatcher as 调度服务器participant MediaProxy as 媒体代理/WS服务 (127.0.0.1)Note over Device, Dispatcher: 【背景】心跳与注册维持Device->>Dispatcher: 发送心跳Device->>Dispatcher: 发送注册Device->>Device: callStat: 2 (空闲)Note over Platform, Device: 【阶段一】呼叫发起Platform->>Platform: 生成 SessionID & EventIDPlatform->>Server: POST /connectMediaServer-->>Platform: 200 OKNote over Platform, MediaProxy: 【阶段二】媒体通道建立Platform->>MediaProxy: WebSocket 连接MediaProxy-->>Platform: 101 Switching ProtocolsNote over Device, Platform: 【阶段三】设备对讲激活Device->>Device: 打开设备对讲状态回调 (200)Device->>Dispatcher: send TalkStat: callStat=3Note over Platform, Device: 【阶段四】音视频数据流转Platform->>MediaProxy: 发送 WebRTC 媒体数据MediaProxy->>Device: 转发音视频流Device->>Device: RecvAudioData / RecvVedioData 日志记录Note over Platform, Device: 【阶段五】结束对讲Platform->>MediaProxy: 关闭 WebSocketDevice->>Device: 收到 bye 消息,删除会话Device->>Dispatcher: send TalkStat: callStat=2Platform->>Platform: 收集 SSH 日志,生成报告
关键动作说明:
connectMedia接口:上位机通过该接口向业务服务器下发对讲指令,关联坐席(fromMediaId)与设备(toMediaId),并分配唯一sessionId。- 通话状态切换:
callStat: 2:设备空闲。callStat: 3:通话中。
- 媒体流验证:设备端日志中的
RecvAudioData和RecvVedioData是验证对讲“通了”的核心指标。 - 自动化监控:测试脚本通过 SSH 实时
tail设备日志,正则匹配关键事件(200 建立、bye 挂断、错误重连)判定测试结果。
二、阶段一:解决“通不通”的问题
2.1 测试目标
- 验证
connectMedia接口调用成功率。 - 验证 WebSocket 媒体通道建立成功率。
- 验证设备端是否收到音视频数据。
- 自动生成包含关键指标(帧数、错误次数)的测试报告。
2.2 实现代码
以下是伪端到端测试脚本(ps:设备端只是通过音视频模块日志判断,不可信)。
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
对讲功能端到端测试(阶段一:连通性验证)
- 调用 connectMedia 接口,建立 WebSocket 连接
- SSH 实时解析设备日志,统计音视频帧、通话状态
- 自动生成 PASS/FAIL 报告
"""import asyncio
import threading
import time
import uuid
import re
import logging
import requests
import websockets
import paramiko
from packaging import version
from datetime import datetime# ---------- 日志配置 ----------
logging.basicConfig(level=logging.INFO,format='%(asctime)s - %(levelname)s - %(message)s',handlers=[logging.StreamHandler(),logging.FileHandler('talk_test.log', encoding='utf-8')]
)
logger = logging.getLogger(__name__)# ---------- 配置 ----------
CONFIG = {"base_url_zs": "https://zsjscc.jslife.com.cn","token": "JSc975a434134e408b9fab3d3e162188e2", # 调用平台登入脚本获取 token"user_id": "LW_5647","from_media_id": "svmediaCtrl70B5E875FECE","to_media_id": "111350001899232EDD001011","project_no": "p241075647","ssh": {"host": "192.168.8.54","username": "root","password": "Jsst_***","log_path": "/opt/jsst/target/product/JSM1612-Linux-5.0.3/log/JsstMg.log"},"test": {"call_duration": 60, # 通话保持秒数"connect_timeout": 10,"post_call_wait": 5}
}# ---------- 工具函数 ----------
def generate_session_id():return f"{uuid.uuid4().hex[:32]}_webrtc"def generate_event_id():return uuid.uuid4().hex# ---------- 设备日志监听 ----------
class DeviceLogMonitor:def __init__(self, host, username, password, log_path):self.host = hostself.username = usernameself.password = passwordself.log_path = log_pathself.client = Noneself.session_id = Noneself.metrics = {}self.stop_event = threading.Event()self.thread = Nonedef connect(self):self.client = paramiko.SSHClient()self.client.set_missing_host_key_policy(paramiko.AutoAddPolicy())self.client.connect(self.host, username=self.username, password=self.password, timeout=10)logger.info("SSH connected to %s", self.host)def start_monitoring(self, session_id):self.session_id = session_idself.metrics = {"established": False,"audio_frames": 0,"video_frames": 0,"call_stat_3_count": 0,"rtsp_errors": 0,"errors": []}self.stop_event.clear()self.thread = threading.Thread(target=self._tail_log, daemon=True)self.thread.start()logger.info("Monitoring started for session %s", session_id)def _tail_log(self):cmd = f"tail -n 0 -f {self.log_path}"stdin, stdout, stderr = self.client.exec_command(cmd)escaped = re.escape(self.session_id)patterns = {"established": re.compile(rf"打开设备对讲状态回调\s+{escaped},200"),"bye": re.compile(rf"由于\[bye\]消息,删掉会话\s+\[{escaped}\]"),"audio": re.compile(r"RecvAudioData\[(\d+)\]"),"video": re.compile(r"RecvVedioData\[(\d+)\]"),"call_stat_3": re.compile(rf"{escaped}.*?send TalkStat:.*?\"callStat\":3"),"rtsp_error": re.compile(r"av_read_frame Error, reOpen")}for line in iter(stdout.readline, ""):if self.stop_event.is_set():breakline = line.strip()if not line:continueif not self.metrics["established"] and patterns["established"].search(line):self.metrics["established"] = Truelogger.info("Talk established")if patterns["bye"].search(line):logger.info("Talk ended by bye")if patterns["audio"].search(line):self.metrics["audio_frames"] += 1if patterns["video"].search(line):self.metrics["video_frames"] += 1if patterns["call_stat_3"].search(line):self.metrics["call_stat_3_count"] += 1if patterns["rtsp_error"].search(line):self.metrics["rtsp_errors"] += 1self.metrics["errors"].append(line[:100])def stop_monitoring(self):self.stop_event.set()if self.thread and self.thread.is_alive():self.thread.join(timeout=5)if self.client:self.client.close()logger.info("Monitoring stopped. Audio: %d, Video: %d",self.metrics['audio_frames'], self.metrics['video_frames'])return self.metrics# ---------- 上位机对讲控制 ----------
def connect_media(session_id, event_id):url = f"{CONFIG['base_url_zs']}/jscc/video/connectMedia?env=g"headers = {"Content-Type": "application/json","token": CONFIG["token"],"userId": CONFIG["user_id"],"env": "g","roletype": "510081","Origin": CONFIG["base_url_zs"],"Referer": f"{CONFIG['base_url_zs']}/jscc-seat/index.html"}payload = {"fromMediaId": CONFIG["from_media_id"],"toMediaId": CONFIG["to_media_id"],"sessionId": session_id,"eventId": event_id,"projectNo": CONFIG["project_no"]}try:resp = requests.post(url, headers=headers, json=payload, timeout=CONFIG["test"]["connect_timeout"])resp.raise_for_status()data = resp.json()if data.get("success") and data["data"][0].get("resultCode") == 200:logger.info("connectMedia succeeded")return Truelogger.error("connectMedia failed: %s", data)return Falseexcept Exception as e:logger.error("connectMedia exception: %s", e)return Falseasync def websocket_call(session_id, duration):ws_url = f"ws://127.0.0.1:19002/videoDeviceId/{session_id}"headers = {"Origin": CONFIG["base_url_zs"], "User-Agent": "Mozilla/5.0"}try:if version.parse(websockets.__version__) >= version.parse("9.0"):ws = await websockets.connect(ws_url, additional_headers=headers,open_timeout=CONFIG["test"]["connect_timeout"])else:ws = await websockets.connect(ws_url, extra_headers=headers,timeout=CONFIG["test"]["connect_timeout"])logger.info("WebSocket connected")await asyncio.sleep(duration)await ws.close()logger.info("WebSocket closed")return Trueexcept Exception as e:logger.error("WebSocket exception: %s", e)return False# ---------- 报告生成 ----------
def generate_report(metrics, session_id):status = "PASS" if metrics["established"] and metrics["audio_frames"] > 0 else "FAIL"report = f"""
=================================================
Talk Test Report
Session ID: {session_id}
Time: {datetime.now().strftime("%Y-%m-%d %H:%M:%S")}
Result: {status}Device Metrics:Established: {metrics['established']}Audio frames: {metrics['audio_frames']}Video frames: {metrics['video_frames']}callStat=3 count: {metrics['call_stat_3_count']}RTSP errors: {metrics['rtsp_errors']}
"""if metrics["errors"]:report += "\nErrors:\n"for err in metrics["errors"]:report += f" - {err}\n"report += "=================================================\n"return report# ---------- 主流程 ----------
async def main():logger.info("=== Talk End-to-End Test ===")monitor = DeviceLogMonitor(CONFIG["ssh"]["host"],CONFIG["ssh"]["username"],CONFIG["ssh"]["password"],CONFIG["ssh"]["log_path"])try:monitor.connect()except Exception as e:logger.error("SSH connection failed: %s", e)returnsession_id = generate_session_id()monitor.start_monitoring(session_id)event_id = generate_event_id()if not connect_media(session_id, event_id):monitor.stop_monitoring()returnif not await websocket_call(session_id, CONFIG["test"]["call_duration"]):monitor.stop_monitoring()returnawait asyncio.sleep(CONFIG["test"]["post_call_wait"])metrics = monitor.stop_monitoring()report = generate_report(metrics, session_id)print(report)with open("talk_report.txt", "w", encoding="utf-8") as f:f.write(report)logger.info("Report saved to talk_report.txt")if __name__ == "__main__":asyncio.run(main())
2.3 报告示例
=================================================
Talk Test Report
Session ID: b0bc06da0ffe4f77a66969d_webrtc
Time: 2026-04-17 11:53:00
Result: PASSDevice Metrics:Established: TrueAudio frames: 125Video frames: 47callStat=3 count: 12RTSP errors: 0
=================================================
三、后续规划
| 阶段 | 目标 | 核心方法 |
|---|---|---|
| 阶段二 | 量化音频传输质量 | 分析音频帧抖动、码率衰减;引入 PESQ/POLQA 客观音质评分 |
| 阶段三 | 量化视频传输质量 | 检测视频卡顿、黑屏;计算 SSIM/VMAF 相似度 |
| 阶段四 | 端到端延迟测量 | 通过二维码/数字时钟 + OCR 精确计算玻璃到玻璃延迟 |
通过分阶段推进,最终构建覆盖连通性 → 传输质量 → 主观体验的完整自动化测试体系。
