一、同步 vs 异步推理
1.1 执行模型对比
同步推理 (Synchronous): 请求 → 等待推理 → 返回结果 延迟 = 预处理 + 推理 + 后处理 特点: 简单,但 CPU 空闲等待 NPU 异步推理 (Asynchronous): 请求 → 提交推理 → 立即返回 结果就绪 → 回调通知 / 轮询获取 延迟感知 ≈ 预处理 (推理在后台) 特点: 复杂,但 CPU/NPU 并行工作 ┌─────────────────────────────────┐ │ 同步: [CPU][NPU][CPU][NPU] │ │ 异步: [CPU][CPU][CPU][CPU] │ │ [NPU ][NPU ][NPU ] │ └─────────────────────────────────┘
1.2 适用场景
同步推理适用: - 单次推理,对延迟敏感 - 简单应用,不需要高吞吐 - 调试阶段 异步推理适用: - 高并发服务 - 流水线推理 - 多模型串行执行 - CPU/NPU 异构协同
二、CANN 异步 API
2.1 基础异步推理
importtorch.npuimportthreadingclassAsyncInferenceEngine:"""异步推理引擎"""def__init__(self,model):self.model=model self.stream=torch.npu.Stream()self.lock=threading.Lock()definfer_async(self,input_data,callback=None):"""异步推理"""withtorch.npu.stream(self.stream):output=self.model(input_data)ifcallback:# 注册回调 (Stream 完成后执行)event=torch.npu.Event()event.record(self.stream)callback_thread=threading.Thread(target=self._wait_and_callback,args=(event,callback,output))callback_thread.start()returnoutputdef_wait_and_callback(self,event,callback,output):"""等待完成并执行回调"""event.synchronize()callback(output)# 使用示例engine=AsyncInferenceEngine(model)defon_complete(output):print(f"推理完成:{output.shape}")# 异步推理output=engine.infer_async(input_data,callback=on_complete)# CPU 继续其他工作process_other_tasks()
2.2 Future 模式
importconcurrent.futuresclassFutureInferenceEngine:"""Future 模式异步推理"""def__init__(self,model):self.model=model self.executor=concurrent.futures.ThreadPoolExecutor(max_workers=4)self.stream=torch.npu.Stream()definfer(self,input_data):"""提交异步推理任务"""future=self.executor.submit(self._run_inference,input_data)returnfuturedef_run_inference(self,input_data):"""实际推理执行"""withtorch.npu.stream(self.stream):output=self.model(input_data)returnoutputdefinfer_batch(self,input_list):"""批量异步推理"""futures=[self.infer(inp)forinpininput_list]results=[f.result()forfinfutures]returnresults# 使用示例engine=FutureInferenceEngine(model)# 提交多个推理任务future1=engine.infer(input1)future2=engine.infer(input2)future3=engine.infer(input3)# 处理其他任务process_other_tasks()# 获取结果result1=future1.result(timeout=5.0)result2=future2.result(timeout=5.0)result3=future3.result(timeout=5.0)print(f"结果:{result1.shape},{result2.shape},{result3.shape}")
三、生产者-消费者模型
3.1 异步推理队列
importqueueimportthreadingclassAsyncInferenceQueue:"""异步推理队列"""def__init__(self,model,max_queue_size=100):self.model=model self.request_queue=queue.Queue(maxsize=max_queue_size)self.result_store={}self.lock=threading.Lock()self.running=Falseself.worker_thread=Nonedefstart(self):"""启动推理工作线程"""self.running=Trueself.worker_thread=threading.Thread(target=self._worker,daemon=True)self.worker_thread.start()print("异步推理队列已启动")defstop(self):"""停止推理工作线程"""self.running=Falseifself.worker_thread:self.worker_thread.join(timeout=5.0)print("异步推理队列已停止")defsubmit(self,request_id,input_data):"""提交推理请求"""self.request_queue.put({'id':request_id,'input':input_data,'submitted_at':time.time()})withself.lock:self.result_store[request_id]={'status':'pending','submitted_at':time.time()}returnrequest_iddefget_result(self,request_id,timeout=10.0):"""获取推理结果"""start_time=time.time()whiletime.time()-start_time<timeout:withself.lock:ifrequest_idinself.result_store:result=self.result_store[request_id]ifresult['status']=='completed':returnresult['output']elifresult['status']=='failed':raiseRuntimeError(f"推理失败:{result.get('error')}")time.sleep(0.01)raiseTimeoutError(f"推理超时:{request_id}")def_worker(self):"""推理工作线程"""whileself.running:try:request=self.request_queue.get(timeout=1.0)request_id=request['id']input_data=request['input']try:# 执行推理output=self.model(input_data)# 存储结果withself.lock:self.result_store[request_id]={'status':'completed','output':output,'completed_at':time.time()}exceptExceptionase:withself.lock:self.result_store[request_id]={'status':'failed','error':str(e)}exceptqueue.Empty:continue# 使用示例queue_engine=AsyncInferenceQueue(model)queue_engine.start()# 提交请求req_id=queue_engine.submit("req_001",input_data)# 处理其他任务process_other_tasks()# 获取结果result=queue_engine.get_result(req_id)print(f"结果:{result.shape}")# 停止queue_engine.stop()
3.2 多消费者模型
classMultiConsumerInference:"""多消费者异步推理"""def__init__(self,model,num_consumers=4):self.model=model self.request_queue=queue.Queue(maxsize=1000)self.result_store={}self.lock=threading.Lock()self.consumers=[]self.num_consumers=num_consumersdefstart(self):"""启动多个消费者"""foriinrange(self.num_consumers):consumer=threading.Thread(target=self._consumer_worker,args=(i,),daemon=True)self.consumers.append(consumer)consumer.start()print(f"已启动{self.num_consumers}个消费者")def_consumer_worker(self,consumer_id):"""消费者工作线程"""stream=torch.npu.Stream()whileTrue:try:request=self.request_queue.get(timeout=1.0)request_id=request['id']input_data=request['input']try:withtorch.npu.stream(stream):output=self.model(input_data)withself.lock:self.result_store[request_id]={'status':'completed','output':output,'consumer_id':consumer_id}exceptExceptionase:withself.lock:self.result_store[request_id]={'status':'failed','error':str(e),'consumer_id':consumer_id}exceptqueue.Empty:continuedefsubmit(self,request_id,input_data):"""提交请求"""self.request_queue.put({'id':request_id,'input':input_data})withself.lock:self.result_store[request_id]={'status':'pending'}returnrequest_iddefget_result(self,request_id,timeout=10.0):"""获取结果"""start_time=time.time()whiletime.time()-start_time<timeout:withself.lock:ifrequest_idinself.result_store:result=self.result_store[request_id]ifresult['status']in['completed','failed']:returnresult time.sleep(0.01)raiseTimeoutError(f"推理超时:{request_id}")# 使用示例multi_consumer=MultiConsumerInference(model,num_consumers=4)multi_consumer.start()# 提交多个请求foriinrange(100):multi_consumer.submit(f"req_{i}",input_data)# 获取结果foriinrange(100):result=multi_consumer.get_result(f"req_{i}")print(f"req_{i}:{result['status']}")
四、流水线推理架构
4.1 三阶段流水线
classInferencePipeline:"""三阶段推理流水线"""def__init__(self,preprocessor,model,postprocessor):self.preprocessor=preprocessor self.model=model self.postprocessor=postprocessor# 三个阶段各自的 Streamself.preprocess_stream=torch.npu.Stream()self.inference_stream=torch.npu.Stream()self.postprocess_stream=torch.npu.Stream()# 事件同步self.preprocess_done=torch.npu.Event()self.inference_done=torch.npu.Event()definfer(self,raw_data):"""流水线推理"""# 阶段 1: 预处理withtorch.npu.stream(self.preprocess_stream):preprocessed=self.preprocessor(raw_data)self.preprocess_done.record(self.preprocess_stream)# 阶段 2: 推理 (等待预处理完成)withtorch.npu.stream(self.inference_stream):self.inference_stream.wait_event(self.preprocess_done)output=self.model(preprocessed)self.inference_done.record(self.inference_stream)# 阶段 3: 后处理 (等待推理完成)withtorch.npu.stream(self.postprocess_stream):self.postprocess_stream.wait_event(self.inference_done)result=self.postprocessor(output)returnresultdefinfer_batch(self,raw_data_list):"""批量流水线推理"""results=[]forraw_datainraw_data_list:result=self.infer(raw_data)results.append(result)torch.npu.synchronize()returnresults# 使用示例pipeline=InferencePipeline(preprocessor,model,postprocessor)results=pipeline.infer_batch(raw_data_list)
五、错误处理与超时控制
5.1 超时控制
classTimeoutInferenceEngine:"""带超时控制的异步推理"""def__init__(self,model,default_timeout=5.0):self.model=model self.default_timeout=default_timeout self.stream=torch.npu.Stream()definfer_with_timeout(self,input_data,timeout=None):"""带超时的推理"""timeout=timeoutorself.default_timeout future=self._submit_inference(input_data)try:result=future.result(timeout=timeout)returnresultexceptconcurrent.futures.TimeoutError:# 超时处理print(f"推理超时 ({timeout}s)")returnNonedef_submit_inference(self,input_data):"""提交推理任务"""executor=concurrent.futures.ThreadPoolExecutor(max_workers=1)future=executor.submit(self._run_inference,input_data)returnfuturedef_run_inference(self,input_data):"""实际推理"""withtorch.npu.stream(self.stream):output=self.model(input_data)returnoutput# 使用示例engine=TimeoutInferenceEngine(model,default_timeout=5.0)result=engine.infer_with_timeout(input_data,timeout=3.0)
5.2 重试机制
classRetryInferenceEngine:"""带重试机制的异步推理"""def__init__(self,model,max_retries=3,retry_delay=1.0):self.model=model self.max_retries=max_retries self.retry_delay=retry_delay self.stream=torch.npu.Stream()definfer_with_retry(self,input_data):"""带重试的推理"""forattemptinrange(self.max_retries):try:withtorch.npu.stream(self.stream):output=self.model(input_data)returnoutputexceptExceptionase:ifattempt<self.max_retries-1:print(f"推理失败 (尝试{attempt+1}/{self.max_retries}):{e}")time.sleep(self.retry_delay)else:raiseRuntimeError(f"推理失败,已重试{self.max_retries}次:{e}")# 使用示例engine=RetryInferenceEngine(model,max_retries=3)result=engine.infer_with_retry(input_data)
六、常见问题
| 问题 | 原因 | 解决方案 |
|---|
| 异步结果获取失败 | Stream 未同步 | 使用 Event 同步 |
| 内存泄漏 | 异步任务未清理 | 及时清理过期任务 |
| 推理顺序错乱 | 未使用请求 ID | 使用请求 ID 跟踪 |
| 超时不生效 | 超时设置太长 | 调整超时参数 |
| 重试风暴 | 重试间隔太短 | 增加退避策略 |
相关仓库
- ascend-cl- 异步推理接口 https://gitee.com/ascend/ascend-cl
- torch_npu- Stream 管理 https://gitee.com/ascend/torch_npu
- torch_npu- Event 同步 https://gitee.com/ascend/torch_npu