当前位置: 首页 > news >正文

异步API开发:轮询与回调的实际应用

异步API开发:轮询与回调的实际应用

前言

最近在测试使用一家做具身智能大脑的感知与决策服务,其中涉及轮询(Polling)和回调(Callback/Webhook)这两种异步API结果获取方式。


文章目录

  • 异步API开发:轮询与回调的实际应用
    • 前言
    • 第一部分:基础概念
      • 1.1 什么是异步API?
      • 1.2 为什么AI服务使用异步API?
    • 第二部分:轮询模式详解
      • 2.1 工作原理
      • 2.2 代码实现
      • 2.3 轮询策略
      • 2.4 优缺点
      • 2.5 适用场景
    • 第三部分:回调模式详解
      • 3.1 工作原理
      • 3.2 代码实现
      • 3.3 公网访问问题
      • 3.4 安全机制
      • 3.5 优缺点
      • 3.6 适用场景
    • 第四部分:感知决策在机器人开发中的应用
      • 4.1 机械臂 + AI平台架构
      • 4.2 完整工作流程
      • 4.3 轮询 vs 回调的选择
      • 4.4 混合模式
    • 第五部分:常见问题
      • Q1: 轮询间隔设置多少合适?
      • Q2: 轮询超时设置多少?
      • Q3: 回调服务器需要HTTPS吗?
      • Q4: 如何处理回调失败?
      • Q5: 如何保证回调的幂等性?
    • 总结

第一部分:基础概念

1.1 什么是异步API?

同步API:发送请求后,必须等待服务器处理完成才能得到响应。

请求 ──────────────────────────────────────▶ 响应 │←────────── 等待时间 ──────────────▶│

异步API:发送请求后,立即返回一个"任务标识",结果稍后通过其他方式获取。

请求 ──▶ 立即返回任务ID │ │ (后台处理中) │ ▼ 通过轮询或回调获取结果

1.2 为什么AI服务使用异步API?

原因说明
处理时间长AI模型推理需要几秒到几十秒
避免超时HTTP请求通常有30秒超时限制
提高并发服务器可以同时处理多个任务
用户体验客户端不会卡住

第二部分:轮询模式详解

2.1 工作原理

轮询(Polling)是客户端主动、定期向服务器查询任务状态的方式。

时间线: 0s ──▶ 提交任务,获得taskId 1s ──▶ 查询状态 → "处理中" 2s ──▶ 查询状态 → "处理中" 3s ──▶ 查询状态 → "处理中" 4s ──▶ 查询状态 → "完成",获得结果

2.2 代码实现

importrequestsimporttimeclassPollingClient:"""轮询模式客户端"""def__init__(self,api_host,token):self.api_host=api_host self.token=tokendefsubmit_task(self,image_url,object_names):"""提交感知任务"""url=f"{self.api_host}/perception/check"headers={"Authorization":f"Bearer{self.token}"}payload={"image_type":"2D","image_url":image_url,"object_names":object_names}resp=requests.post(url,headers=headers,json=payload)returnresp.json()["data"]["taskId"]defpoll_result(self,task_id,timeout=60,interval=1):"""轮询获取结果"""url=f"{self.api_host}/perception/result?task_id={task_id}"start_time=time.time()whiletime.time()-start_time<timeout:resp=requests.get(url)data=resp.json()["data"]status=data["taskStatus"]ifstatus=="DONE":returndata["taskResult"]elifstatusin["RUNNING","SUBMIT_SUCCESS"]:time.sleep(interval)else:raiseException(f"任务失败:{status}")raiseTimeoutError("轮询超时")defdetect(self,image_url,object_names):"""完整的检测流程"""task_id=self.submit_task(image_url,object_names)returnself.poll_result(task_id)

2.3 轮询策略

固定间隔轮询

whilenotdone:check_status()time.sleep(1)# 固定1秒

指数退避轮询(推荐):

interval=0.5# 初始0.5秒whilenotdone:check_status()time.sleep(interval)interval=min(interval*1.5,5)# 逐渐增加,最大5秒

长轮询(Long Polling):

# 服务器支持的话,可以设置较长的超时# 服务器会在有结果时立即返回,或超时后返回resp=requests.get(url,timeout=30)

2.4 优缺点

优点缺点
实现简单浪费带宽(反复请求)
不需要公网IP有延迟(轮询间隔)
不需要额外服务器服务器压力大
防火墙友好不够实时

2.5 适用场景

  • 开发和测试阶段
  • 任务量较少
  • 没有公网IP的环境
  • 对实时性要求不高

第三部分:回调模式详解

3.1 工作原理

回调(Callback/Webhook)是服务器在任务完成后,主动向客户端发送通知的方式。

时间线: 0s ──▶ 提交任务(附带回调地址),获得taskId │ │ (服务器后台处理) │ 4s ◀── 服务器向回调地址发送结果

3.2 代码实现

回调服务器

fromflaskimportFlask,requestimportjsonimporthashlibfromCrypto.CipherimportAESimportbase64 app=Flask(__name__)AES_KEY="your_16_byte_key"# 16字节密钥defverify_signature(timestamp,nonce,encrypted_data,signature):"""验证签名"""sign_str=f"{timestamp}{nonce}{encrypted_data}{AES_KEY}"expected=hashlib.md5(sign_str.encode()).hexdigest().upper()returnexpected==signaturedefdecrypt_data(encrypted_data):"""AES解密"""combined=base64.b64decode(encrypted_data)iv=combined[:16]encrypted=combined[16:]cipher=AES.new(AES_KEY.encode(),AES.MODE_CBC,iv)decrypted=cipher.decrypt(encrypted)# 去除PKCS5填充padding_len=decrypted[-1]returndecrypted[:-padding_len].decode('utf-8')@app.route('/callback',methods=['POST'])defcallback():"""接收回调"""data=request.json# 1. 验证签名ifnotverify_signature(data['timestamp'],data['nonce'],data['encryptedData'],data['signature']):return'',403# 2. 验证时间戳(防止重放攻击)importtimeifdata['timestamp']<time.time()*1000-600000:# 10分钟内有效return'',403# 3. 解密数据result=json.loads(decrypt_data(data['encryptedData']))# 4. 处理结果print(f"收到任务结果:{result}")process_result(result)# 5. 返回key字段(平台要求)returnresult.get('key','')defprocess_result(result):"""处理任务结果"""task_id=result.get('taskId')status=result.get('taskStatus')ifstatus=='DONE':task_result=result.get('taskResult',{})# 处理检测结果boxes=task_result.get('boxes',[])labels=task_result.get('labels',[])scores=task_result.get('scores',[])forlabel,score,boxinzip(labels,scores,boxes):print(f"检测到:{label}, 置信度:{score:.2f}, 位置:{box}")if__name__=='__main__':app.run(host='0.0.0.0',port=8080)

3.3 公网访问问题

回调模式需要AI模型服务器能够访问到你的回调地址,这需要:

方案1:公网服务器

你的服务器(有公网IP) │ │ http://123.45.67.89:8080/callback │ ▼ AI模型服务器可以直接访问

方案2:内网穿透

你的电脑(内网) │ │ localhost:8080 │ ▼ 内网穿透工具(cpolar/frp/ngrok) │ │ https://xxx.cpolar.top/callback │ ▼ AI模型服务器通过公网地址访问

cpolar配置示例

# 安装cpolarcurl-L https://www.cpolar.com/static/downloads/install-release-cpolar.sh|sudobash# 认证cpolar authtoken YOUR_TOKEN# 启动隧道cpolar http8080# 获得公网地址,如:https://abc123.r10.cpolar.top

3.4 安全机制

AI平台的回调安全机制:

  1. 签名验证:防止伪造请求

    signature = MD5(timestamp + nonce + encryptedData + aesKey).toUpperCase()
  2. 时间戳验证:防止重放攻击

    if timestamp < 当前时间 - 10分钟: 拒绝请求
  3. AES加密:保护数据传输

    加密方式: AES/CBC/PKCS5Padding IV: 数据前16字节

3.5 优缺点

优点缺点
实时性好需要公网IP或内网穿透
不浪费带宽需要运行回调服务器
服务器压力小配置相对复杂
效率高需要处理安全问题

3.6 适用场景

  • 生产环境
  • 任务量大
  • 对实时性要求高
  • 有公网IP或可以使用内网穿透

第四部分:感知决策在机器人开发中的应用

4.1 机械臂 + AI平台架构

┌─────────────────────────────────────────────────────────────┐ │ 机器人 │ │ │ │ ┌─────────┐ │ │ │ 摄像头 │──┐ │ │ └─────────┘ │ │ │ ▼ │ │ ┌─────────────────────┐ ┌─────────────────────────┐ │ │ │ 图片采集模块 │────▶│ API客户端 │ │ │ │ (camera.py) │ │ (polling_client.py) │ │ │ └─────────────────────┘ └───────────┬─────────────┘ │ │ │ │ │ ▼ │ │ ┌─────────────────────────┐ │ │ │ 云平台 │ │ │ │ - 感知大模型 │ │ │ │ - 决策大模型 │ │ │ └───────────┬─────────────┘ │ │ │ │ │ ▼ │ │ ┌─────────────────────┐ ┌─────────────────────────┐ │ │ │ 机械臂控制 │◀────│ 动作执行器 │ │ │ │ (openarm_ctrl.py) │ │ (embodied_agent.py) │ │ │ └─────────────────────┘ └─────────────────────────┘ │ │ │ └─────────────────────────────────────────────────────────────┘

4.2 完整工作流程

classEmbodiedAgent:"""具身智能代理"""def__init__(self):self.camera=Camera()self.qianjue=QianjueClient()self.arm=OpenArmController()defexecute_task(self,command):"""执行任务:感知 → 决策 → 执行"""# 1. 采集图像image=self.camera.capture()# 2. 上传图片image_key=self.qianjue.upload_image(image)# 3. 感知分析(轮询模式)perception_result=self.qianjue.perception(image_key,object_names="易拉罐,瓶子,杯子")# 4. 决策规划decision_result=self.qianjue.decision(image_key,command=command,perception=perception_result)# 5. 执行动作action=decision_result.robot_action self.execute_action(action)defexecute_action(self,action):"""执行机器人动作"""if"Grasp"inaction:target=parse_target(action)position=get_grasp_position(target)self.arm.grasp(position)elif"Release"inaction:self.arm.release()elif"Move"inaction:position=parse_position(action)self.arm.move_to(position)

4.3 轮询 vs 回调的选择

场景推荐方式原因
开发测试轮询简单,不需要额外配置
单次任务轮询任务量少,轮询开销可接受
实时控制回调需要快速响应
生产环境回调效率高,可扩展
无公网IP轮询回调需要公网访问

4.4 混合模式

实际应用中,可以结合两种模式:

classHybridClient:"""混合模式客户端"""def__init__(self,callback_url=None):self.callback_url=callback_url self.results={}# 存储回调结果defget_result(self,task_id,timeout=60):"""获取结果:优先回调,备用轮询"""ifself.callback_url:# 等待回调结果start=time.time()whiletime.time()-start<timeout:iftask_idinself.results:returnself.results.pop(task_id)time.sleep(0.1)# 回调超时或未配置,使用轮询returnself.poll_result(task_id,timeout)

第五部分:常见问题

Q1: 轮询间隔设置多少合适?

A: 建议使用指数退避策略:

  • 初始间隔:0.5秒
  • 最大间隔:5秒
  • 增长因子:1.5

Q2: 轮询超时设置多少?

A: 根据任务类型:

  • 简单检测:30秒
  • 全功能感知:60秒
  • 决策任务:30秒

Q3: 回调服务器需要HTTPS吗?

A: 建议使用HTTPS,但HTTP也可以。使用cpolar等工具可以自动获得HTTPS。

Q4: 如何处理回调失败?

A: 1. 实现重试机制 2. 记录失败日志 3. 备用轮询获取结果

Q5: 如何保证回调的幂等性?

A: 使用taskId作为唯一标识,处理前检查是否已处理过:

processed_tasks=set()defhandle_callback(task_id,result):iftask_idinprocessed_tasks:return# 已处理,跳过processed_tasks.add(task_id)process_result(result)

总结

方面轮询回调
原理客户端主动查询服务器主动推送
实时性较差
复杂度简单较复杂
资源消耗
网络要求需要公网访问
适用场景开发测试生产环境
http://www.jsqmd.com/news/166885/

相关文章:

  • 使用Miniconda创建独立环境,完美复现论文实验结果
  • 解决CondaError: run ‘conda init‘ before ‘conda activate‘的根本方法
  • Zanzibar vs MySQL Permission System - 实证性能对比研究
  • GitHub热门开源项目推荐:基于Miniconda的轻量级AI实验复现环境
  • PyTorch安装失败?可能是Conda环境没配好!解决方案在这里
  • Pyenv与Conda双剑合璧:精细化管理多个Python版本
  • Delphi多线程编程入门:工作线程与主线程的协作
  • 如何在Linux上快速安装PyTorch并启用GPU加速(附Miniconda详细步骤)
  • 用Markdown写AI论文笔记:Jupyter+Miniconda高效组合
  • Anaconda Cloud私有包管理 vs Miniconda本地部署
  • 000
  • Jupyter notebook extensions增强Miniconda交互功能
  • Docker stats监控Miniconda容器资源消耗
  • SSH远程访问Miniconda环境进行PyTorch训练的完整流程
  • 小白也能学会的PyTorch安装教程:基于Miniconda和GPU加速
  • Docker Run参数详解:运行Miniconda-Python3.10镜像的10种方式
  • PyTorch GPU检测失败?检查CUDA与Miniconda环境兼容性
  • Miniconda中安装PyTorch Lightning的最佳方式
  • 2025年拆迁维权律师事务所推荐,专业处理拆迁一定要搬吗、协商阶段需要搬吗等问题全解析 - 工业设备
  • 2025拆迁维权律所TOP5权威推荐:拆迁一定要搬吗?专业法律服务解你优 - 工业推荐榜
  • Docker build-arg传递Miniconda版本参数自动化构建
  • hivesql 字段aa值 如何去掉前面的0
  • Linux下Miniconda环境切换导致PyTorch报错的处理
  • Docker Run参数详解:如何挂载Miniconda-Python3.10镜像运行
  • 深度测评:主治医师听哪个老师的课? - 医考机构品牌测评专家
  • Python描述符协议:揭秘属性访问的魔法背后
  • CUDA安装避坑指南:配合Miniconda-Python3.10实现PyTorch无缝对接
  • 使用Miniconda-Python3.10快速搭建深度学习环境(含PyTorch和TensorFlow)
  • python脚本打包步骤
  • 使用Miniconda打包自己的PyTorch项目依赖