当前位置: 首页 > news >正文

CANN 异步推理:隐藏推理延迟提升吞吐量的完整方案

一、同步 vs 异步推理

1.1 执行模型对比

同步推理 (Synchronous): 请求 → 等待推理 → 返回结果 延迟 = 预处理 + 推理 + 后处理 特点: 简单,但 CPU 空闲等待 NPU 异步推理 (Asynchronous): 请求 → 提交推理 → 立即返回 结果就绪 → 回调通知 / 轮询获取 延迟感知 ≈ 预处理 (推理在后台) 特点: 复杂,但 CPU/NPU 并行工作 ┌─────────────────────────────────┐ │ 同步: [CPU][NPU][CPU][NPU] │ │ 异步: [CPU][CPU][CPU][CPU] │ │ [NPU ][NPU ][NPU ] │ └─────────────────────────────────┘

1.2 适用场景

同步推理适用: - 单次推理,对延迟敏感 - 简单应用,不需要高吞吐 - 调试阶段 异步推理适用: - 高并发服务 - 流水线推理 - 多模型串行执行 - CPU/NPU 异构协同

二、CANN 异步 API

2.1 基础异步推理

importtorch.npuimportthreadingclassAsyncInferenceEngine:"""异步推理引擎"""def__init__(self,model):self.model=model self.stream=torch.npu.Stream()self.lock=threading.Lock()definfer_async(self,input_data,callback=None):"""异步推理"""withtorch.npu.stream(self.stream):output=self.model(input_data)ifcallback:# 注册回调 (Stream 完成后执行)event=torch.npu.Event()event.record(self.stream)callback_thread=threading.Thread(target=self._wait_and_callback,args=(event,callback,output))callback_thread.start()returnoutputdef_wait_and_callback(self,event,callback,output):"""等待完成并执行回调"""event.synchronize()callback(output)# 使用示例engine=AsyncInferenceEngine(model)defon_complete(output):print(f"推理完成:{output.shape}")# 异步推理output=engine.infer_async(input_data,callback=on_complete)# CPU 继续其他工作process_other_tasks()

2.2 Future 模式

importconcurrent.futuresclassFutureInferenceEngine:"""Future 模式异步推理"""def__init__(self,model):self.model=model self.executor=concurrent.futures.ThreadPoolExecutor(max_workers=4)self.stream=torch.npu.Stream()definfer(self,input_data):"""提交异步推理任务"""future=self.executor.submit(self._run_inference,input_data)returnfuturedef_run_inference(self,input_data):"""实际推理执行"""withtorch.npu.stream(self.stream):output=self.model(input_data)returnoutputdefinfer_batch(self,input_list):"""批量异步推理"""futures=[self.infer(inp)forinpininput_list]results=[f.result()forfinfutures]returnresults# 使用示例engine=FutureInferenceEngine(model)# 提交多个推理任务future1=engine.infer(input1)future2=engine.infer(input2)future3=engine.infer(input3)# 处理其他任务process_other_tasks()# 获取结果result1=future1.result(timeout=5.0)result2=future2.result(timeout=5.0)result3=future3.result(timeout=5.0)print(f"结果:{result1.shape},{result2.shape},{result3.shape}")

三、生产者-消费者模型

3.1 异步推理队列

importqueueimportthreadingclassAsyncInferenceQueue:"""异步推理队列"""def__init__(self,model,max_queue_size=100):self.model=model self.request_queue=queue.Queue(maxsize=max_queue_size)self.result_store={}self.lock=threading.Lock()self.running=Falseself.worker_thread=Nonedefstart(self):"""启动推理工作线程"""self.running=Trueself.worker_thread=threading.Thread(target=self._worker,daemon=True)self.worker_thread.start()print("异步推理队列已启动")defstop(self):"""停止推理工作线程"""self.running=Falseifself.worker_thread:self.worker_thread.join(timeout=5.0)print("异步推理队列已停止")defsubmit(self,request_id,input_data):"""提交推理请求"""self.request_queue.put({'id':request_id,'input':input_data,'submitted_at':time.time()})withself.lock:self.result_store[request_id]={'status':'pending','submitted_at':time.time()}returnrequest_iddefget_result(self,request_id,timeout=10.0):"""获取推理结果"""start_time=time.time()whiletime.time()-start_time<timeout:withself.lock:ifrequest_idinself.result_store:result=self.result_store[request_id]ifresult['status']=='completed':returnresult['output']elifresult['status']=='failed':raiseRuntimeError(f"推理失败:{result.get('error')}")time.sleep(0.01)raiseTimeoutError(f"推理超时:{request_id}")def_worker(self):"""推理工作线程"""whileself.running:try:request=self.request_queue.get(timeout=1.0)request_id=request['id']input_data=request['input']try:# 执行推理output=self.model(input_data)# 存储结果withself.lock:self.result_store[request_id]={'status':'completed','output':output,'completed_at':time.time()}exceptExceptionase:withself.lock:self.result_store[request_id]={'status':'failed','error':str(e)}exceptqueue.Empty:continue# 使用示例queue_engine=AsyncInferenceQueue(model)queue_engine.start()# 提交请求req_id=queue_engine.submit("req_001",input_data)# 处理其他任务process_other_tasks()# 获取结果result=queue_engine.get_result(req_id)print(f"结果:{result.shape}")# 停止queue_engine.stop()

3.2 多消费者模型

classMultiConsumerInference:"""多消费者异步推理"""def__init__(self,model,num_consumers=4):self.model=model self.request_queue=queue.Queue(maxsize=1000)self.result_store={}self.lock=threading.Lock()self.consumers=[]self.num_consumers=num_consumersdefstart(self):"""启动多个消费者"""foriinrange(self.num_consumers):consumer=threading.Thread(target=self._consumer_worker,args=(i,),daemon=True)self.consumers.append(consumer)consumer.start()print(f"已启动{self.num_consumers}个消费者")def_consumer_worker(self,consumer_id):"""消费者工作线程"""stream=torch.npu.Stream()whileTrue:try:request=self.request_queue.get(timeout=1.0)request_id=request['id']input_data=request['input']try:withtorch.npu.stream(stream):output=self.model(input_data)withself.lock:self.result_store[request_id]={'status':'completed','output':output,'consumer_id':consumer_id}exceptExceptionase:withself.lock:self.result_store[request_id]={'status':'failed','error':str(e),'consumer_id':consumer_id}exceptqueue.Empty:continuedefsubmit(self,request_id,input_data):"""提交请求"""self.request_queue.put({'id':request_id,'input':input_data})withself.lock:self.result_store[request_id]={'status':'pending'}returnrequest_iddefget_result(self,request_id,timeout=10.0):"""获取结果"""start_time=time.time()whiletime.time()-start_time<timeout:withself.lock:ifrequest_idinself.result_store:result=self.result_store[request_id]ifresult['status']in['completed','failed']:returnresult time.sleep(0.01)raiseTimeoutError(f"推理超时:{request_id}")# 使用示例multi_consumer=MultiConsumerInference(model,num_consumers=4)multi_consumer.start()# 提交多个请求foriinrange(100):multi_consumer.submit(f"req_{i}",input_data)# 获取结果foriinrange(100):result=multi_consumer.get_result(f"req_{i}")print(f"req_{i}:{result['status']}")

四、流水线推理架构

4.1 三阶段流水线

classInferencePipeline:"""三阶段推理流水线"""def__init__(self,preprocessor,model,postprocessor):self.preprocessor=preprocessor self.model=model self.postprocessor=postprocessor# 三个阶段各自的 Streamself.preprocess_stream=torch.npu.Stream()self.inference_stream=torch.npu.Stream()self.postprocess_stream=torch.npu.Stream()# 事件同步self.preprocess_done=torch.npu.Event()self.inference_done=torch.npu.Event()definfer(self,raw_data):"""流水线推理"""# 阶段 1: 预处理withtorch.npu.stream(self.preprocess_stream):preprocessed=self.preprocessor(raw_data)self.preprocess_done.record(self.preprocess_stream)# 阶段 2: 推理 (等待预处理完成)withtorch.npu.stream(self.inference_stream):self.inference_stream.wait_event(self.preprocess_done)output=self.model(preprocessed)self.inference_done.record(self.inference_stream)# 阶段 3: 后处理 (等待推理完成)withtorch.npu.stream(self.postprocess_stream):self.postprocess_stream.wait_event(self.inference_done)result=self.postprocessor(output)returnresultdefinfer_batch(self,raw_data_list):"""批量流水线推理"""results=[]forraw_datainraw_data_list:result=self.infer(raw_data)results.append(result)torch.npu.synchronize()returnresults# 使用示例pipeline=InferencePipeline(preprocessor,model,postprocessor)results=pipeline.infer_batch(raw_data_list)

五、错误处理与超时控制

5.1 超时控制

classTimeoutInferenceEngine:"""带超时控制的异步推理"""def__init__(self,model,default_timeout=5.0):self.model=model self.default_timeout=default_timeout self.stream=torch.npu.Stream()definfer_with_timeout(self,input_data,timeout=None):"""带超时的推理"""timeout=timeoutorself.default_timeout future=self._submit_inference(input_data)try:result=future.result(timeout=timeout)returnresultexceptconcurrent.futures.TimeoutError:# 超时处理print(f"推理超时 ({timeout}s)")returnNonedef_submit_inference(self,input_data):"""提交推理任务"""executor=concurrent.futures.ThreadPoolExecutor(max_workers=1)future=executor.submit(self._run_inference,input_data)returnfuturedef_run_inference(self,input_data):"""实际推理"""withtorch.npu.stream(self.stream):output=self.model(input_data)returnoutput# 使用示例engine=TimeoutInferenceEngine(model,default_timeout=5.0)result=engine.infer_with_timeout(input_data,timeout=3.0)

5.2 重试机制

classRetryInferenceEngine:"""带重试机制的异步推理"""def__init__(self,model,max_retries=3,retry_delay=1.0):self.model=model self.max_retries=max_retries self.retry_delay=retry_delay self.stream=torch.npu.Stream()definfer_with_retry(self,input_data):"""带重试的推理"""forattemptinrange(self.max_retries):try:withtorch.npu.stream(self.stream):output=self.model(input_data)returnoutputexceptExceptionase:ifattempt<self.max_retries-1:print(f"推理失败 (尝试{attempt+1}/{self.max_retries}):{e}")time.sleep(self.retry_delay)else:raiseRuntimeError(f"推理失败,已重试{self.max_retries}次:{e}")# 使用示例engine=RetryInferenceEngine(model,max_retries=3)result=engine.infer_with_retry(input_data)

六、常见问题

问题原因解决方案
异步结果获取失败Stream 未同步使用 Event 同步
内存泄漏异步任务未清理及时清理过期任务
推理顺序错乱未使用请求 ID使用请求 ID 跟踪
超时不生效超时设置太长调整超时参数
重试风暴重试间隔太短增加退避策略

相关仓库

  • ascend-cl- 异步推理接口 https://gitee.com/ascend/ascend-cl
  • torch_npu- Stream 管理 https://gitee.com/ascend/torch_npu
  • torch_npu- Event 同步 https://gitee.com/ascend/torch_npu
http://www.jsqmd.com/news/870955/

相关文章:

  • ncmdump工具终极指南:3步解锁网易云音乐NCM格式限制
  • 80集短剧,3天拍完:当电影人下场做Agent,影视生产迎来了“最懂行”的解法
  • RocketMQ Dledger 集群与 Raft 协议
  • 黄金回收白银回收铂金回收彩金回收店铺推荐织金县2026最新五家靠谱回收门店TOP5排行榜及联系方式推荐 - 前途无量YY
  • 终极指南:5步解决Cursor AI试用限制,永久免费使用Pro功能
  • 抖音无水印视频下载终极指南:免费快速获取高清素材
  • 3个关键步骤掌握Hugo-PaperMod主题部署
  • 3分钟搞定!在Mac上直接运行Windows应用的终极指南
  • VR-Reversal:无需VR设备,3D视频转换工具让你的普通显示器变身沉浸式影院
  • 在PC上解锁Switch游戏体验:Ryujinx模拟器深度配置手册
  • 终极电视盒子管理方案:TVBoxOSC让你的客厅影院更智能
  • 如何快速部署i茅台智能预约系统:面向初学者的完整指南
  • 黄金回收白银回收铂金回收彩金回收店铺推荐志丹县2026最新五家靠谱回收门店TOP5排行榜及联系方式推荐 - 前途无量YY
  • 免费多平台资源下载终极指南:如何一键获取视频号、抖音无水印内容
  • 黄金回收白银回收铂金回收彩金回收店铺推荐中方县2026最新五家靠谱回收门店TOP5排行榜及联系方式推荐 - 前途无量YY
  • 我为什么会把 555电影 当成“工具站”来看
  • 如何高效实现STL到STEP格式转换:stltostp工具的完整解决方案
  • ZMK开源键盘固件:从零打造你的终极定制化机械键盘
  • Windows 11安卓子系统WSA终极指南:开发者必知的完整解决方案
  • FlashAttention 的“加速玄学”:为什么 A100 能快 2 倍,910 却只能快 1.5 倍?
  • Spring-Ai-Alibaba [03] multiple-llm-client-demo
  • 如何让工艺工程师主导TVA应用开发
  • 革命性macOS窗口管理:Topit智能窗口置顶工具的深度解析与实战指南
  • STM32F103C8T6+TJA1042+UTA0403:一个CAN通讯新手踩过的所有坑(附完整接线图与代码)
  • 黄金回收白银回收铂金回收彩金回收店铺推荐中江县2026最新五家靠谱回收门店TOP5排行榜及联系方式推荐 - 前途无量YY
  • 黄金回收白银回收铂金回收彩金回收店铺推荐株洲县2026最新五家靠谱回收门店TOP5排行榜及联系方式推荐 - 前途无量YY
  • LingJing靶场+burp-labs:闭环式渗透实战教学系统
  • 三大核心优势打造离线版游戏王:YgoMaster免费畅玩指南
  • 三步轻松搞定B站视频下载:跨平台免费工具BilibiliDown完整指南
  • 手把手教你用ESP32C3驱动WS2812灯带:从RMT底层配置到彩虹灯效实现