基于Agora与AssemblyAI构建高精度实时语音转录机器人
1. 项目概述:构建一个高精度、低延迟的实时转录机器人
在构建实时语音交互应用时,一个常见的需求是:如何在不干扰主会话的情况下,高质量地记录并转写会议或直播中每位参与者的发言?传统的方案要么精度不够,要么延迟太高,要么无法区分说话人。今天,我将分享一个我们团队在实际项目中打磨出来的解决方案:利用 Agora 的 Python Server SDK 和 AssemblyAI 的 Universal-3 Pro 流式 API,构建一个“静默观察者”式的实时转录机器人。
这个机器人的核心工作流程非常清晰:它像一个隐形的会议秘书,以“观众”身份加入 Agora 的音视频频道,订阅并接收每个参会者的原始音频流(PCM 格式),然后实时地将这些音频流推送给 AssemblyAI 进行转写。AssemblyAI 的 Universal-3 Pro 模型不仅转写准确率高,还自带说话人分离(Speaker Diarization)功能,能自动区分“谁在什么时候说了什么”。最终,结构清晰的逐句转录文本会实时输出,你可以轻松地将它们送入数据库、大语言模型或触发其他业务流程。
为什么选择这个技术栈?关键在于“对齐”与“专精”。Agora Server SDK 让服务器端程序能直接获取纯净的、未经压缩的原始 PCM 音频帧,这恰好是 AssemblyAI 流式 API 所期望的输入格式。这种“端到端”的原始数据对接,避免了不必要的音频编解码和格式转换,是实现超低延迟转录的基石。相比之下,许多内置或通用的语音转文本方案在实时性、准确率和多语言支持上往往难以兼顾。
2. 核心架构与选型逻辑解析
2.1 为什么是 Agora + AssemblyAI 的组合?
在决定技术栈时,我们对比了多种方案。最终选择 Agora Python Server SDK 与 AssemblyAI Universal-3 Pro Streaming 的组合,是基于以下几个核心考量:
音频数据管道的纯净性:Agora 的subscribe_all_audio配合set_playback_audio_frame_before_mixing_parameters方法,可以直接获取到每个用户的、未经混音的原始 PCM 音频流。这意味着机器人听到的是每个参会者独立的“干声”,没有房间混响,也没有与其他说话人的声音混合。这对于后续的语音识别和说话人分离至关重要,能极大提升准确率。
协议与格式的天然对齐:AssemblyAI 的流式 WebSocket API 接收的是单声道、16kHz 采样率、16位深的 PCM 数据。而 Agora SDK 可以精确地按此规格输出音频帧。这种“开箱即用”的兼容性省去了我们使用 FFmpeg 等工具进行实时转码的复杂性和额外延迟。每一帧从 Agora 接收后,几乎可以直接塞进 WebSocket 发送出去。
性能指标的显著优势:根据官方基准测试和我们自己的实测,AssemblyAI Universal-3 Pro 在关键指标上表现突出:
| 指标 | AssemblyAI Universal-3 Pro | 典型内置/通用 STT 方案 |
|---|---|---|
| P50 延迟 | ~307 毫秒 | ~600-900 毫秒 |
| 词错误率 (WER) | 8.9% | 14%-18% |
| 实时说话人分离 | ✅ 支持 | ❌ 通常不支持或非实时 |
| 支持语言 | 99+ 种 | 有限(通常<20种) |
对于需要实时字幕、会议纪要或内容审核的场景,低于 500 毫秒的延迟意味着字幕与语音几乎是同步的。而低于 10% 的词错误率,使得转录结果基本无需大量人工修正即可使用。实时说话人分离更是锦上添花,它自动为每段话打上了“说话人 ID”的标签,让后续的分析和处理(如区分不同客户的发言)变得非常简单。
2.2 系统组件与数据流
整个系统的运行依赖于几个核心组件的高效协作:
- Agora 服务端连接:作为机器人本体,在服务端初始化 Agora 服务,并以“观众”角色加入目标频道。它不发布任何音视频流,只进行订阅。
- 音频帧捕获与分发器:监听频道内的用户加入/离开事件。每当有新用户加入,就为其创建一个独立的音频帧捕获流和一个对应的 AssemblyAI WebSocket 连接。这是一个典型的一对一映射关系。
- AssemblyAI 流式转录引擎:每个用户的音频流通过一个独立的、持久的 WebSocket 连接推送给 AssemblyAI。该服务实时处理音频流,并回传包含说话人标签、标点符号和逐句结束标记的 JSON 格式转录结果。
- 结果处理器:接收并解析 AssemblyAI 返回的
Turn事件(代表一个完整的说话轮次)。这个节点是系统的输出口,你可以在这里编写逻辑,将结构化的转录文本发送到任何你需要的地方——写入数据库、推送给 LLM 生成摘要、或触发业务工作流。
注意:这里采用“每个用户一个 WebSocket 连接”的设计,而非将所有用户音频混音后用一个连接处理。这样做虽然连接数增多,但保证了每个音频流的独立性和纯净度,使得 AssemblyAI 的说话人分离模型能发挥最佳效果,也避免了混音可能带来的语音质量下降和说话人混淆问题。
3. 环境准备与项目初始化
3.1 前置条件与账号配置
在开始写代码之前,你需要准备好以下三样东西,这就像厨师下锅前的“备菜”环节:
- Python 环境:确保你的系统安装了 Python 3.9 或更高版本。我推荐使用
pyenv或conda来管理 Python 版本,为这个项目创建一个独立的虚拟环境,避免依赖冲突。 - Agora 开发者账号:前往 Agora 控制台注册并创建一个项目。创建成功后,你会获得两个关键凭证:
- App ID:项目的唯一标识符。
- App Certificate:用于生成安全令牌(Token)的密钥。请务必像保管密码一样保管它,不要泄露到客户端代码中。
- AssemblyAI 账号:前往 AssemblyAI 官网注册,并在控制台获取你的API Key。Universal-3 Pro 模型是他们的旗舰产品,你需要确保账户有足够的额度或已开通相应套餐。
3.2 项目骨架搭建与依赖安装
拿到所有“食材”后,我们开始搭建厨房。首先从 GitHub 克隆项目模板,这是一个已经搭好基础框架的起点,能让我们避开很多初始配置的坑。
# 克隆项目仓库到本地 git clone https://github.com/kelseyefoster/voice-agent-agora-universal-3-pro # 进入项目目录 cd voice-agent-agora-universal-3-pro # 安装所有必需的 Python 依赖包 pip install -r requirements.txt接下来是关键的一步:配置环境变量。项目根目录下有一个.env.example文件,我们需要复制它并填入自己的凭证。
# 复制环境变量示例文件 cp .env.example .env然后用文本编辑器打开新创建的.env文件,内容如下,你需要替换掉your_xxx部分:
# .env 文件内容 AGORA_APP_ID=your_agora_app_id AGORA_APP_CERT=your_agora_certificate ASSEMBLYAI_API_KEY=your_assemblyai_api_key AGORA_CHANNEL=my-channel AGORA_BOT_UID=9999AGORA_APP_ID和AGORA_APP_CERT:填入从 Agora 控制台获取的值。ASSEMBLYAI_API_KEY:填入从 AssemblyAI 控制台获取的 API 密钥。AGORA_CHANNEL:你的机器人将要加入的频道名称。频道是 Agora 中音视频会话的逻辑隔离单元,所有加入同一频道名的用户才能互通。AGORA_BOT_UID:为你的机器人指定一个数字用户 ID。确保这个 ID 在频道内是唯一的,不要与其他真实用户冲突。通常可以使用一个较大的数字,如 9999。
实操心得:我强烈建议在开发阶段使用临时的频道名(如
test-channel-123)和固定的 Bot UID。将.env文件添加到.gitignore中,确保密钥不会意外提交到代码仓库。对于团队协作,可以使用类似dotenv的方式在部署时注入这些变量。
4. 核心代码实现与逐行解析
完成环境配置后,我们深入到bot.py的核心代码中。我将分模块拆解,并解释每一段代码的意图和关键细节。
4.1 初始化 Agora 服务并加入频道
机器人的第一步是“进入房间”。我们使用 Agora 的 Python Server SDK 来初始化一个服务实例,并以“观众”身份加入指定的频道。
import asyncio import json import os from dotenv import load_dotenv import websockets from agora.rtc.agora_service import AgoraService, AgoraServiceConfig from agora.rtc.rtc_connection import RTCConnConfig from agora.rtc.agora_base import ClientRoleType, ChannelProfileType, AudioScenarioType # 加载环境变量 load_dotenv() async def main(): # 从环境变量读取配置 AGORA_APP_ID = os.getenv('AGORA_APP_ID') AGORA_APP_CERT = os.getenv('AGORA_APP_CERT') ASSEMBLYAI_API_KEY = os.getenv('ASSEMBLYAI_API_KEY') AGORA_CHANNEL = os.getenv('AGORA_CHANNEL', 'default-channel') AGORA_BOT_UID = int(os.getenv('AGORA_BOT_UID', 9999)) # 1. 配置并初始化 Agora 服务 cfg = AgoraServiceConfig() cfg.appid = AGORA_APP_ID cfg.enable_audio_processor = True # 启用音频处理模块 cfg.audio_scenario = AudioScenarioType.AUDIO_SCENARIO_CHORUS # 适用于语音场景 service = AgoraService() service.initialize(cfg) # 2. 创建连接配置:以观众角色加入直播频道 conn_cfg = RTCConnConfig( client_role_type=ClientRoleType.CLIENT_ROLE_AUDIENCE, channel_profile=ChannelProfileType.CHANNEL_PROFILE_LIVE_BROADCASTING, ) connection = service.create_rtc_connection(conn_cfg) # 3. 生成 Token 并连接频道 (生产环境需使用动态Token,此处为演示简化) # 注意:在安全的生产环境中,Token应在服务端动态生成,此处直接使用无Token连接仅用于测试。 token = None # 对于未开启鉴权的项目,可暂时为None。生产环境务必使用下面章节的方法生成。 connection.connect(token, AGORA_CHANNEL, str(AGORA_BOT_UID)) print(f"Bot (UID:{AGORA_BOT_UID}) joined channel: {AGORA_CHANNEL}")关键点解析:
ClientRoleType.CLIENT_ROLE_AUDIENCE:这是机器人的关键身份。设置为“观众”意味着它只接收流,不发送流,因此不会占用上行带宽,也不会被其他用户看到或听到。ChannelProfileType.CHANNEL_PROFILE_LIVE_BROADCASTING:频道模式设为“直播”。在这种模式下,观众角色是明确被支持的,且通信模式更符合我们“一对多收听”的场景。AudioScenarioType.AUDIO_SCENARIO_CHORUS:音频场景设置为“合唱”。这个场景针对多人语音交谈进行了优化,能提供更清晰的语音质量,非常适合会议转录。token:在测试或未开启频道鉴权的项目中,可以暂时为None。但在任何生产环境中,都必须使用有效的、有时效性的 Token 来连接,这是安全保障的第一道门。我们会在后面章节详细讲如何生成 Token。
4.2 配置音频输出与订阅所有音频
成功加入频道后,我们需要告诉 Agora SDK:“请把所有人的原始音频,以我指定的格式送给我。”
# 4. 获取本地用户(即机器人自身)的频道对象 agora_channel = connection.get_local_user() # 5. 关键步骤:在订阅音频前,设置期望的原始音频帧参数 # 这步至关重要,它避免了SDK内部的二次重采样,减少延迟和性能开销。 agora_channel.set_playback_audio_frame_before_mixing_parameters( num_of_channels=1, # 单声道,AssemblyAI要求 sample_rate=16000, # 16kHz采样率,AssemblyAI要求 ) # 6. 订阅频道内所有用户的音频流 agora_channel.subscribe_all_audio() print("Subscribed to all audio in the channel.")为什么要在订阅前设置参数?set_playback_audio_frame_before_mixing_parameters这个方法的名字很长,但作用很明确:它在你调用subscribe_all_audio之前,预先定义了你希望收到的“原始音频帧”的格式。如果你在订阅后才设置,SDK 可能已经以默认格式(例如 48kHz 立体声)开始向你传递数据了,这时你再修改格式,SDK 内部就需要进行实时重采样,这会增加不必要的 CPU 消耗和微小的延迟。提前声明“我要 16kHz 单声道”,SDK 就会在音频处理流水线的最初环节直接按此格式输出,效率最高。
执行完这一步后,你就可以通过agora_channel.get_audio_frames(uid)这个异步生成器,按帧获取指定用户纯净的 PCM 数据了。每一帧都将是 16 位深度、小端序、16kHz 采样率的单声道 PCM,与 AssemblyAI 的输入要求严丝合缝。
4.3 为每位用户建立独立的 AssemblyAI 转录流
这是系统的核心桥梁。我们为频道内的每个用户创建一个独立的任务,该任务负责两件事:1) 从 Agora 拉取该用户的音频帧;2) 通过一个专属的 WebSocket 连接推送给 AssemblyAI 并接收转录结果。
# AssemblyAI 流式 WebSocket 端点 AAI_WS_URL = ( "wss://streaming.assemblyai.com/v3/ws" f"?sample_rate=16000" "&speech_model=u3-rt-pro" # 指定使用 Universal-3 Pro 实时模型 "&format_turns=true" # 请求以“说话轮次”的格式返回结果,便于处理 ) async def stream_participant(agora_channel, uid: int, api_key: str): """处理单个用户的音频流转录""" headers = {"Authorization": api_key} try: async with websockets.connect(AAI_WS_URL, additional_headers=headers) as ws: # 连接建立后,AssemblyAI 会发送一个 Session Begins 消息 session_start = json.loads(await ws.recv()) print(f"[UID:{uid}] Transcription session started: {session_start['id']}") async def send_audio(): """从Agora读取音频帧并发送到AssemblyAI""" async for frame in agora_channel.get_audio_frames(uid): # frame.data 就是符合格式的 PCM 字节数据 if ws.open: await ws.send(frame.data) else: break async def recv_transcripts(): """接收并处理AssemblyAI返回的转录消息""" async for message in ws: event = json.loads(message) # 我们关注 'Turn' 类型的事件,且一个完整的说话轮次结束时会有 'end_of_turn' 标记 if event["type"] == "Turn" and event.get("end_of_turn"): speaker = event.get("speaker", "Unknown") transcript_text = event["transcript"] # 打印到控制台,这里就是你的输出点 print(f"[UID:{uid}, Speaker:{speaker}] {transcript_text}") # !!! 在此处可以添加自定义逻辑:存入数据库、调用LLM、触发Webhook等 !!! # 并行执行发送和接收任务 await asyncio.gather(send_audio(), recv_transcripts()) except websockets.exceptions.ConnectionClosedOK: print(f"[UID:{uid}] WebSocket connection closed normally.") except Exception as e: print(f"[UID:{uid}] Error in transcription stream: {e}")代码逻辑深度解读:
- WebSocket 连接参数:
speech_model=u3-rt-pro明确指定使用 Universal-3 Pro 实时模型,这是高准确率和低延迟的保证。format_turns=true参数让 AssemblyAI 以“轮次”为单位返回结果,每个Turn事件代表一个相对完整的说话片段(通常以自然停顿为界),并且end_of_turn为True时表示这个片段结束。这种结构比原始的流式字词(word-by-word)输出更易于后续处理。 - 双任务并行:
asyncio.gather(send_audio(), recv_transcripts())是异步编程的经典模式。send_audio是一个永不停止的循环(直到连接断开),不断从 Agora 拉取音频帧并发送;recv_transcripts也是一个永不停止的循环,持续监听 WebSocket 返回的消息。这两个 IO 密集型任务被asyncio高效地调度,互不阻塞。 - 错误处理:WebSocket 连接可能因为网络问题、服务端重启或主动终止而关闭。用
try...except包裹连接和循环,可以保证当一个用户的转录流出现故障时,不会导致整个机器人崩溃,只会影响该用户。
4.4 动态管理用户:加入与离开
一个真实的频道,用户是动态进出的。我们需要监听这些事件,并相应地创建或销毁转录流。
# 用于存储活跃用户及其转录任务的字典 active_streams: dict[int, asyncio.Task] = {} def on_user_joined(uid: int): """当新用户加入频道时调用""" print(f"User joined: {uid}") if uid == AGORA_BOT_UID: return # 忽略机器人自己 # 为这个新用户创建一个独立的转录任务 task = asyncio.create_task(stream_participant(agora_channel, uid, ASSEMBLYAI_API_KEY)) active_streams[uid] = task def on_user_left(uid: int, reason: int): """当用户离开频道时调用""" print(f"User left: {uid}, reason: {reason}") if uid in active_streams: # 取消该用户的转录任务 active_streams[uid].cancel() try: # 等待任务被正式取消,避免资源泄露 asyncio.create_task(active_streams[uid]) except asyncio.CancelledError: pass del active_streams[uid] # 向Agora连接注册事件回调函数 connection.register_observer_callback("on_user_joined", on_user_joined) connection.register_observer_callback("on_user_offline", on_user_left) # 注意:SDK中可能叫on_user_offline注意事项:
- 过滤自身:在
on_user_joined中,一定要判断加入的用户 UID 是否等于机器人自身的AGORA_BOT_UID。如果不过滤,机器人会尝试订阅自己的音频流,这通常会导致错误或无意义的数据。 - 资源清理:当用户离开时,除了从字典中移除任务引用,更重要的是调用
task.cancel()来通知异步框架终止这个任务。这会让stream_participant函数中的websockets.connect上下文管理器正常退出,关闭 WebSocket 连接,避免连接泄露。 - 回调函数名:不同版本的 Agora SDK 中,用户离开事件的回调函数名可能略有差异,如
on_user_offline或on_user_left。需要查阅你所用版本的 SDK 文档来确定准确的名称。
4.5 优雅地关闭与清理
当我们需要停止机器人时(例如收到终止信号),不能直接退出,必须进行优雅的清理,关闭所有连接,取消所有任务。
import signal shutdown_event = asyncio.Event() def signal_handler(): print("\nShutdown signal received.") shutdown_event.set() # 注册信号处理(如Ctrl+C) loop = asyncio.get_running_loop() for sig in (signal.SIGINT, signal.SIGTERM): loop.add_signal_handler(sig, signal_handler) print("Bot is running. Press Ctrl+C to stop.") try: # 主循环,等待关机事件 await shutdown_event.wait() finally: print("Shutting down...") # 1. 取消所有用户的转录任务 for uid, task in active_streams.items(): task.cancel() if active_streams: await asyncio.gather(*active_streams.values(), return_exceptions=True) # 2. 断开Agora连接 connection.disconnect() # 3. 释放Agora服务资源 service.release() print("Bot shutdown complete.")优雅关闭的重要性:直接杀死进程可能导致 AssemblyAI 那边的 WebSocket 连接没有发送终止信号,服务器端可能认为连接异常中断,未处理完的音频数据可能无法生成最终的转录片段。虽然我们的简单示例没有在关闭时向 AssemblyAI 发送Terminate消息(如原始代码片段所示),但在生产环境中,为每个 WebSocket 连接发送一个{"type": "Terminate"}的 JSON 消息是一个好习惯,这能通知 AssemblyAI 最终化处理并返回一个包含总处理时长等信息的Termination事件。
5. 生产环境关键配置与安全实践
将代码从本地测试推向生产环境,有几个关键步骤不能忽视,它们关系到系统的稳定性和安全性。
5.1 安全地生成与使用 Agora Token
在测试时,我们可能使用了未开启鉴权的 App ID。在生产中,必须开启频道鉴权,并为每次连接动态生成 Token。Token 是一种临时的、可撤销的访问凭证,包含了 App ID、频道名、用户 ID、过期时间等信息,并使用 App Certificate 进行签名。
# 安装 Token 生成工具包 # pip install agora-token-builder from agora_token_builder import RtcTokenBuilder, Role_Subscriber import time def generate_bot_token(app_id: str, app_certificate: str, channel_name: str, uid: int, expire_in_seconds: int = 3600) -> str: """ 生成用于机器人加入频道的 Token。 参数: app_id: Agora 项目的 App ID。 app_certificate: Agora 项目的 App Certificate。 channel_name: 要加入的频道名称。 uid: 机器人的用户 ID。 expire_in_seconds: Token 的有效期,默认1小时。根据安全要求调整。 返回: 生成的 Token 字符串。 """ # 计算 Token 过期的时间戳(当前时间 + 有效期) expire_time = int(time.time()) + expire_in_seconds # 注意:对于观众角色,我们使用 Role_Subscriber token = RtcTokenBuilder.buildTokenWithUid( app_id=app_id, app_certificate=app_certificate, channel_name=channel_name, uid=uid, role=Role_Subscriber, # 关键:订阅者角色对应 CLIENT_ROLE_AUDIENCE privilege_expired_ts=expire_time ) return token # 在 main 函数中替换掉 token = None token = generate_bot_token(AGORA_APP_ID, AGORA_APP_CERT, AGORA_CHANNEL, AGORA_BOT_UID) connection.connect(token, AGORA_CHANNEL, str(AGORA_BOT_UID))安全要点:
- Token 必须在服务端生成:绝对不要将
App Certificate存放在客户端(如网页、移动端 App)代码中。Token 的生成必须在你的服务器后端进行。 - 设置合理的有效期:对于长时间运行的机器人,Token 有效期可以设置得较长(如 24 小时),但同时你需要在代码中实现 Token 的轮换机制,在 Token 过期前重新生成并调用
connection.renew_token()方法更新连接。 - 使用正确的角色:
Role_Subscriber对应的是CLIENT_ROLE_AUDIENCE,确保 Token 的权限与连接配置一致。
5.2 处理网络波动与重连
生产环境的网络是不稳定的。WebSocket 连接可能断开,Agora 连接也可能出现闪断。健壮的代码必须具备重连能力。
async def resilient_stream_participant(agora_channel, uid: int, api_key: str, max_retries=5): """具备重连能力的用户转录流处理函数""" retry_count = 0 base_delay = 1 # 初始重连延迟1秒 while retry_count < max_retries: try: await stream_participant(agora_channel, uid, api_key) # 调用之前的核心函数 # 如果 stream_participant 正常退出(如用户离开),则跳出循环 break except (websockets.exceptions.ConnectionClosedError, ConnectionResetError) as e: retry_count += 1 if retry_count >= max_retries: print(f"[UID:{uid}] Max retries ({max_retries}) reached. Giving up.") break delay = base_delay * (2 ** (retry_count - 1)) # 指数退避 print(f"[UID:{uid}] Connection lost. Retrying ({retry_count}/{max_retries}) in {delay}s...") await asyncio.sleep(delay) except asyncio.CancelledError: # 任务被外部取消(如用户离开),正常退出 print(f"[UID:{uid}] Transcription task cancelled.") break except Exception as e: print(f"[UID:{uid}] Unexpected error: {e}. Stopping transcription for this user.") break指数退避策略:这是网络编程中标准的重连策略。每次重连失败后,等待时间翻倍(1s, 2s, 4s...),避免在服务短暂故障时发起海量重连请求,给服务器造成压力。
5.3 性能优化与资源管理
当频道内有数十甚至上百个用户时,为每个人维持一个 WebSocket 连接和两个异步任务可能会消耗大量内存和网络资源。
- 连接池管理:考虑实现一个 AssemblyAI WebSocket 连接池。对于说话不频繁的场景(如大型讲座),可以设计一种机制,在用户静默一段时间后暂停其音频流推送,但保持 WebSocket 连接存活以复用,减少频繁建连的开销。这需要更精细的状态管理。
- 异步任务限制:使用
asyncio.Semaphore来限制同时运行的最大转录任务数量,防止突发大量用户加入时耗尽系统资源。 - 日志与监控:集成像
structlog或loguru这样的日志库,为不同级别(INFO, WARNING, ERROR)和不同用户(UID)输出结构化日志。同时,可以定期向监控系统(如 Prometheus)上报活跃连接数、音频帧处理延迟、转录错误率等指标。
6. 扩展应用场景与下游集成
转录文本的实时输出只是一个开始。stream_participant函数中打印转录结果的那一行(print(f"[UID:{uid}, Speaker:{speaker}] {transcript_text}"))是整个系统的价值溢出点。你可以在这里插入任何业务逻辑。
6.1 实时会议纪要生成
将转录文本实时存入数据库(如 PostgreSQL, MongoDB),并附上时间戳、说话人、频道 ID 等元数据。这样,会议一结束,一份完整的、带说话人标签的逐字稿就自动生成了。
# 示例:使用异步数据库驱动存入 PostgreSQL import asyncpg async def save_to_database(uid, speaker, transcript, channel, turn_start, turn_end): conn = await asyncpg.connect(DATABASE_URL) await conn.execute(''' INSERT INTO meeting_transcripts (channel_id, user_id, speaker_label, content, start_time, end_time) VALUES ($1, $2, $3, $4, $5, $6) ''', channel, uid, speaker, transcript, turn_start, turn_end) await conn.close() # 在收到 Turn 事件时调用 # event 中包含 'start', 'end' 时间戳(毫秒) await save_to_database(uid, speaker, transcript_text, AGORA_CHANNEL, event['start'], event['end'])6.2 驱动实时 AI 助手与互动
将每个用户说完的一句话(即一个end_of_turn为 True 的片段),实时发送给大语言模型(如通过 OpenAI GPT, Claude, 或本地部署的 Llama 接口),可以实现强大的实时交互功能。
- 实时问答:在培训场景中,学员提问,AI 助手实时解析问题并从知识库中生成答案,再由主持人或 TTS 播报出来。
- 实时摘要:在长时间会议中,每 5 分钟或每当一个议题结束时,将期间的对话文本发送给 LLM,生成阶段性摘要,并投射到共享屏幕上。
- 实时翻译:将转录的英文文本实时发送给翻译 API,并输出目标语言的字幕。
- 合规与安全监控:实时分析转录文本,检测是否包含敏感词、违规内容或特定关键词,并即时告警。
# 示例:调用 OpenAI API 进行实时摘要 import openai openai.api_key = os.getenv("OPENAI_API_KEY") async def generate_summary(transcript_segment): response = await openai.ChatCompletion.acreate( model="gpt-3.5-turbo", messages=[ {"role": "system", "content": "你是一个会议助理,请用一句话总结以下发言的核心内容。"}, {"role": "user", "content": transcript_segment} ], max_tokens=100 ) return response.choices[0].message.content # 在收到 Turn 事件后调用 summary = await generate_summary(transcript_text) print(f"实时摘要: {summary}")6.3 触发外部工作流
将转录结果作为触发器,通过 Webhook 推送给其他系统,可以无缝集成到现有的工作流中。
- 客服系统:将客户与客服的对话实时转录并推送给 CRM,自动生成工单或更新客户信息。
- 协作工具:将团队站会内容实时转录并发送到 Slack 或钉钉的特定频道,形成异步记录。
- 内容生产:将直播或播客的音频实时转录为文字稿,并自动发布到内容管理系统(CMS)的草稿箱。
# 示例:通过 Webhook 发送转录结果 import aiohttp async def post_webhook(webhook_url, data): async with aiohttp.ClientSession() as session: async with session.post(webhook_url, json=data) as resp: if resp.status != 200: print(f"Webhook failed: {resp.status}") # 在收到 Turn 事件后调用 webhook_data = { "event": "transcription_turn", "channel": AGORA_CHANNEL, "uid": uid, "speaker": speaker, "text": transcript_text, "timestamp": time.time() } await post_webhook("https://your-workflow-server/webhook", webhook_data)7. 常见问题排查与调试技巧
在实际部署和运行中,你可能会遇到一些问题。这里记录了一些我们踩过的坑和解决方法。
7.1 音频相关问题
问题:机器人加入频道后收不到任何音频,或者转录结果一直是空的。
- 检查用户角色:确认机器人的
client_role_type是CLIENT_ROLE_AUDIENCE(观众)。如果是CLIENT_ROLE_BROADCASTER(主播),但在频道中没有发布音频流,可能会被其他用户忽略。 - 检查频道模式:确认
channel_profile是CHANNEL_PROFILE_LIVE_BROADCASTING。在通信模式(CHANNEL_PROFILE_COMMUNICATION)下,所有用户默认都是主播,且没有明确的观众角色,订阅逻辑可能不同。 - 验证真实用户是否在发言:确保频道内有其他用户,并且他们的麦克风权限已开启,正在正常说话。可以通过 Agora 控制台的“水晶球”监控工具查看频道状态。
- 检查音频帧回调:在
stream_participant函数的send_audio循环内添加调试日志,打印len(frame.data)。正常情况下,每个帧的数据长度应该是固定的(对于 16kHz 单声道,一帧 20ms 的音频数据是 640 字节)。如果一直是 0 或没有进入循环,说明没有收到音频帧。
问题:转录结果乱码或全是无意义的单词。
- 确认音频格式:这是最常见的原因。双重检查
set_playback_audio_frame_before_mixing_parameters的参数是否为sample_rate=16000和num_of_channels=1。任何不匹配都会导致 AssemblyAI 接收到错误格式的音频,从而产生垃圾输出。 - 检查音频数据内容:写一小段调试代码,将接收到的前几帧 PCM 数据保存为
.raw文件,然后用 Audacity 等音频软件以“16-bit PCM, 单声道,16000Hz”的格式导入播放,听一下是否是正常的语音。如果不是,说明音频流源头或处理链路有问题。
7.2 网络与连接问题
问题:WebSocket 连接频繁断开。
- 检查防火墙和网络策略:确保运行机器人的服务器可以访问
wss://streaming.assemblyai.com:443。有些企业网络会限制出站连接。 - 检查 AssemblyAI API Key 配额和状态:登录 AssemblyAI 控制台,确认 API Key 有效且未超出用量限制。过期的 Key 会导致连接被拒绝。
- 实施重连机制:如前文所述,务必实现带有指数退避的重连逻辑。网络瞬时抖动是常态。
问题:高并发下,部分用户的转录流延迟变高或不稳定。
- 监控系统资源:使用
top,htop或asyncpg等工具监控机器人进程的 CPU 和内存使用情况。每个 WebSocket 连接和异步任务都有开销。如果用户数很多(如 >50),可能需要升级服务器配置。 - 优化异步循环:确保你没有在异步任务中执行阻塞性的 CPU 密集型操作(如复杂的同步计算)。如果需要进行大量计算,考虑使用
asyncio.to_thread或run_in_executor将其放到单独的线程池中执行,避免阻塞事件循环。 - 分批处理:如果所有用户同时说话,瞬间的音频帧处理和网络发送压力会很大。可以考虑在
send_audio循环中增加轻微的asyncio.sleep(0.001)来平滑发送速率,但这会增加整体延迟,需要权衡。
7.3 部署与运维问题
问题:在 Docker 容器中运行失败,提示 Agora SDK 相关错误。
- 依赖库与系统环境:Agora Python Server SDK 可能依赖某些特定的系统库(如音频处理库)。确保你的 Docker 镜像基于一个完整的 Linux 发行版(如
ubuntu:22.04),并在 Dockerfile 中安装必要的系统包。参考 Agora 官方文档的安装要求。 - 权限问题:某些 SDK 可能需要访问
/dev下的设备或具有特定的 Linux 能力(capabilities)。检查容器是否以特权模式运行,或者是否缺少必要的挂载。
问题:如何监控机器人的健康状态?
- 心跳与健康检查:为机器人程序添加一个简单的 HTTP 健康检查端点(例如使用
aiohttp创建一个简单的 web server),返回当前活跃用户数、任务状态等。这样容器编排平台(如 Kubernetes)可以通过探针检查其存活状态。 - 结构化日志:将所有日志输出为 JSON 格式,并包含
uid,channel,session_id,error_type等字段。这样可以通过 ELK(Elasticsearch, Logstash, Kibana)或 Loki 等日志聚合系统进行集中查询和告警。 - 自定义指标:使用
prometheus_client库暴露一些指标,如transcription_active_connections,audio_frame_receive_latency_ms,transcription_error_count。这些指标可以被 Prometheus 抓取,并在 Grafana 中绘制成仪表盘,直观展示系统运行状态。
