当前位置: 首页 > news >正文

【vllm】(二)vLLM v1 Engine — 模块超深度逐行分析之三

3.10 core.py - 引擎核心

文件职责: 实现vLLM推理的"内循环"——调度→执行→更新,这是GPU推理的真正驱动者。

3.10.1 EngineCore.init() 初始化流程

逐行解析

  1. 加载插件:load_general_plugins()— 允许第三方插件注册
  2. 创建ModelExecutor:executor_class(vllm_config)— 启动GPU workers
  3. 注册失败回调:executor.register_failure_callback()
  4. KV Cache初始化:_initialize_kv_caches()— 最关键的初始化步骤
  5. 创建StructuredOutputManager: 约束输出管理(JSON grammar等)
  6. 创建Scheduler:vllm_config.scheduler_config.get_scheduler_cls()
  7. KV Connector握手: 收集所有worker的KV传输元数据
  8. Batch Queue: 如果PP>1,启用批处理队列消除pipeline气泡
  9. 前缀缓存: 如果启用,初始化request_block_hasher
  10. GC优化:freeze_gc_heap()+enable_envs_cache()

3.10.2 _initialize_kv_caches() 完整流程

@instrument(span_name="Prepare model")def_initialize_kv_caches(self,vllm_config):
  1. 获取KV Cache规格:model_executor.get_kv_cache_specs()
  2. GPU内存Profile:model_executor.determine_available_memory()
    • 运行dummy forward pass测量峰值内存
    • 可用GPU内存 = 总内存 - 模型权重 - 激活内存
  3. 计算KV Cache配置:get_kv_cache_configs()
    • 根据可用内存计算num_gpu_blocks
    • 可能自动降低max_model_len以适配内存
  4. 同步max_model_len: 如果auto-fit降低了max_model_len
    • 调用collective_rpc("update_max_model_len")同步到所有worker
  5. 生成Scheduler KV Cache配置:generate_scheduler_kv_cache_config()
  6. 初始化KV Cache:model_executor.initialize_from_config()
    • 分配GPU内存给KV Cache blocks
    • 预热模型执行

3.10.3 step() 核心循环

defstep(self)->tuple[dict[int,EngineCoreOutputs],bool]:

完整执行流程(逐行):

  1. 检查是否有待处理请求:if not self.scheduler.has_requests(): return {}, False
  2. 调度:scheduler_output = self.scheduler.schedule()
    • 决定哪些请求prefill、哪些decode
    • 分配KV Cache blocks
    • 生成SchedulerOutput
  3. 异步执行模型:future = model_executor.execute_model(scheduler_output, non_block=True)
    • non_block=True返回Future,不等待执行完成
  4. 获取grammar bitmask:grammar_output = scheduler.get_grammar_bitmask(scheduler_output)
    • 用于约束输出(JSON grammar等)
  5. 等待模型执行:model_output = future.result()
    • 阻塞直到GPU计算完成
  6. 条件采样: 如果model_output为None,调用model_executor.sample_tokens(grammar_output)
    • pooling模型不需要采样
  7. 处理中止请求:_process_aborts_queue()
    • 检查是否有用户在中止请求
  8. 更新调度器:scheduler.update_from_output(scheduler_output, model_output)
    • 更新请求状态(完成/继续/抢占)
    • 释放已完成的KV Cache blocks
    • 返回EngineCoreOutputs
  9. 返回:(engine_core_outputs, total_num_scheduled_tokens > 0)

3.10.4 step_with_batch_queue() Pipeline并行变体

与step()的区别

  1. 异步重叠调度和执行
    • 如果batch_queue未满,立即调度新batch并放入队列
    • 同时,之前的batch在GPU上执行
  2. 延迟grammar采样
    • 当有pending_structured_output_tokens时,延迟采样
    • 先处理上一轮输出,再对当前轮进行grammar约束采样
  3. 消除pipeline气泡
    • 多个batch流水线执行,前一个batch的PP stage 2与下一个batch的PP stage 1重叠
  4. 返回None
    • 如果本step只是填充队列而没有输出可返回,返回None

3.10.5 post_step() 推测解码处理

defpost_step(self,model_executed:bool):ifnotself.async_schedulingandself.use_spec_decodeandmodel_executed:draft_token_ids=self.model_executor.take_draft_token_ids()ifdraft_token_idsisnotNone:self.scheduler.update_draft_token_ids(draft_token_ids)
  • 仅在非异步调度 + 推测解码启用时执行
  • 获取draft model的token预测
  • 更新scheduler以验证draft tokens

3.10.6 EngineCoreProc - 进程包装器

classEngineCoreProc:@staticmethoddefrun_engine_core(vllm_config,executor_class,handshake_address,...):

ZMQ busy loop

  1. 创建EngineCore实例
  2. 完成就绪握手(ZMQ DEALER → frontend ROUTER)
  3. 进入busy loop:
    • 从input_socket接收请求(ZMQ DEALER)
    • 根据EngineCoreRequestType分派
    • ADD →engine_core.add_request()
    • ABORT →engine_core.abort_requests()
    • UTILITY → 执行utility操作
  4. 调用engine_core.step_fn()执行推理
  5. 将结果编码发送到output_socket(ZMQ PUB/PUSH)
  6. 循环直到收到shutdown信号

DEAD信号: 当EngineCore异常退出时,发送预定义的ENGINE_CORE_DEAD字节序列,frontend检测到此信号后设置engine_dead标志。

3.10.7 EngineCoreActor - Ray Actor变体

classEngineCoreActor:"""Ray actor version of EngineCoreProc for multi-node deployment."""
  • 使用Ray remote actor替代multiprocessing.Process
  • 适用于多节点部署场景
  • 通过Ray的placement group管理GPU资源分配

3.11 core_client.py - 引擎核心客户端

文件职责: 提供前后端进程之间的通信桥梁,封装ZMQ消息传递和零拷贝tensor传输。

3.11.1 EngineCoreClient (ABC)

classEngineCoreClient(ABC):@staticmethoddefmake_client(multiprocess_mode,asyncio_mode,vllm_config,...):ifasyncio_modeandnotmultiprocess_mode:raiseNotImplementedError# 暂不支持async inprocifmultiprocess_modeandasyncio_mode:returnmake_async_mp_client(...)ifmultiprocess_modeandnotasyncio_mode:returnSyncMPClient(...)returnInprocClient(...)

工厂模式:根据运行模式自动选择客户端实现。

3.11.2 InprocClient

最简单的实现,直接调用EngineCore方法,无IPC开销:

classInprocClient(EngineCoreClient):def__init__(self,*args):self.engine_core=EngineCore(*args)defget_output(self):outputs,model_executed=self.engine_core.step_fn()self.engine_core.post_step(model_executed)returnoutputs.get(0)orEngineCoreOutputs()defadd_request(self,request):req,wave=self.engine_core.preprocess_add_request(request)self.engine_core.add_request(req,wave)

3.11.3 MPClient 基类

classMPClient(EngineCoreClient):def__init__(self,asyncio_mode,vllm_config,...):self.ctx=zmq.asyncio.Context()ifasyncio_modeelsezmq.Context()self.resources=BackgroundResources(ctx=sync_ctx)self._finalizer=weakref.finalize(self,self.resources)

关键设计

  • weakref.finalize: 确保进程退出时清理ZMQ资源
  • BackgroundResources: 持有所有需要清理的资源引用
  • ZMQ context使用2个IO线程(io_threads=2)提升吞吐

3.11.4 BackgroundResources

@dataclassclassBackgroundResources:ctx:zmq.Context engine_manager:CoreEngineProcManager|CoreEngineActorManager coordinator:DPCoordinator|Noneoutput_socket:zmq.Socket|zmq.asyncio.Socket input_socket:zmq.Socket|zmq.asyncio.Socket...engine_dead:bool=False

call() 清理逻辑

  1. 设置engine_dead = True
  2. 关闭engine_manager(停止所有core进程)
  3. 关闭coordinator
  4. 关闭所有ZMQ socket
  5. 取消asyncio tasks(async模式)
  6. 发送shutdown信号到sync output线程(sync模式)

3.11.5 AsyncMPClient

classAsyncMPClient(MPClient):asyncdefget_output_async(self):frames=awaitself.output_socket.recv_multipart()self.resources.validate_alive(frames)outputs=self.decoder.decode(frames[0])returnoutputsasyncdefadd_request_async(self,request):data=self.encoder.encode(EngineCoreRequestType.ADD,request)awaitself.input_socket.send_multipart(data)# 同时发送tensor数据self._send_tensors(request)

核心异步操作

  • get_output_async(): await ZMQ PULL接收
  • add_request_async(): await ZMQ DEALER发送
  • Tensor发送是同步的(MPQueue.put不阻塞事件循环太久)

3.11.6 SyncMPClient

classSyncMPClient(MPClient):defget_output(self):# 后台线程从ZMQ PULL接收,put到output_queuereturnself.output_queue.get()defadd_request(self,request):data=self.encoder.encode(EngineCoreRequestType.ADD,request)self.input_socket.send_multipart(data)
  • 使用后台线程_output_queue_loop从ZMQ拉取输出
  • 主线程通过标准queue.Queue.get()同步获取输出
  • 适配LLMEngine的同步step()模式

3.11.7 DPLBAsyncMPClient - 内部DP负载均衡

classDPLBAsyncMPClient(AsyncMPClient):"""Internal load balancer: client balances requests to all DP ranks."""
  • 维护所有DP rank的连接
  • 使用round-robin或基于队列长度的策略分配请求
  • 聚合所有rank的输出
  • 处理wave_complete同步

3.11.8 DPAsyncMPClient - 外部DP负载均衡

  • 每个client只连接一个特定的DP rank
  • 外部负载均衡器决定请求路由
  • 通过dp_rank参数指定目标rank

3.12 llm_engine.py - LLM引擎(同步)

文件职责: 提供同步的推理API,适配V0风格的add_request()/step()使用模式。

3.12.1 LLMEngine 类

classLLMEngine:def__init__(self,vllm_config,executor_class,...):self.input_processor=InputProcessor(...)self.output_processor=OutputProcessor(...)self.engine_core=EngineCoreClient.make_client(multiprocess_mode=notvllm_config.model_config.enable_lora,asyncio_mode=False,...)

注意: LoRA启用时使用InprocClient(同进程),因为LoRA管理需要直接访问engine_core。

3.12.2 add_request()

defadd_request(self,request_id,prompt,params,...):processed_inputs=self.input_processor.process_inputs(...)self.output_processor.add_request(...)self.engine_core.add_request(processed_inputs)

3.12.3 step()

defstep(self)->list[RequestOutput]:outputs=self.engine_core.get_output()request_outputs=self.output_processor.process_outputs(outputs)returnrequest_outputs
  • 同步调用get_output()阻塞等待Core输出
  • 处理输出后返回给调用者
  • 典型用法:while has_requests: outputs = engine.step()

3.13 async_llm.py - 异步LLM引擎

文件职责: 提供基于asyncio的异步推理API,支持高并发流式输出。

3.13.1 AsyncLLM 类

classAsyncLLM:def__init__(self,vllm_config,executor_class,...):self.input_processor=InputProcessor(...)self.output_processor=OutputProcessor(...)self.engine_core=EngineCoreClient.make_async_mp_client(...)self.output_handler:asyncio.Task|None=None

3.13.2 generate() 异步生成器

asyncdefgenerate(self,prompt,params,request_id,...):# 1. 处理输入processed_inputs=self.input_processor.process_inputs(...)# 2. 注册输出追踪queue=self.output_processor.add_request(...)# 3. 发送到Coreawaitself.engine_core.add_request_async(processed_inputs)# 4. 启动output_handler(如果未启动)self._ensure_output_handler()# 5. 返回异步生成器returnself._generate_stream(queue,request_id)

_generate_stream() 逐行逻辑

asyncdef_generate_stream(self,queue,request_id):whileTrue:output=awaitqueue.get()ifisinstance(output,EngineDeadError):raiseoutputyieldoutputifoutput.finished:break

3.13.3 _output_handler() 后台任务

asyncdef_output_handler(self):whileTrue:# 1. 从Core获取输出outputs=awaitself.engine_core.get_output_async()# 2. 处理输出request_outputs=self.output_processor.process_outputs(outputs)# 3. 分发到per-request Queueforoutputinrequest_outputs:state=self.output_processor.get_state(output.request_id)state.queue.put(output)

3.13.4 _ensure_output_handler()

def_ensure_output_handler(self):ifself.output_handlerisNoneorself.output_handler.done():self.output_handler=asyncio.create_task(self._output_handler())
  • 懒启动:第一个请求到来时才创建后台任务
  • 如果任务异常退出,自动重启

3.13.5 scale_elastic_ep()

asyncdefscale_elastic_ep(self,new_data_parallel_size):set_scaling_elastic_ep(True)try:awaitself.engine_core.scale_elastic_ep(new_data_parallel_size)self.vllm_config.parallel_config.data_parallel_size=new_data_parallel_sizefinally:set_scaling_elastic_ep(False)
  • 动态调整DP大小(弹性扩展)
  • 使用set_scaling_elastic_ep保护配置更新期间的并发访问

3.13.6 属性方法

@propertydefis_running(self)->bool:returnself.output_handlerisNoneornotself.output_handler.done()@propertydefis_stopped(self)->bool:returnself.errored@propertydeferrored(self)->bool:returnself.engine_core.resources.engine_deadornotself.is_running

3.14 utils.py - 工具模块

文件职责: 提供引擎进程管理、设备控制、ZMQ地址分配等基础设施。

3.14.1 CoreEngineState 枚举

classCoreEngineState(Enum):NEW=auto()# 刚创建,等待连接CONNECTED=auto()# 已连接到frontend,等待就绪READY=auto()# 初始化完成,可以接收请求

3.14.2 CoreEngine

classCoreEngine:def__init__(self,index=0,local=True):self.local=local self.identity=index.to_bytes(2,"little")# ZMQ ROUTER identityself.state=CoreEngineState.NEW
  • 每个DP rank对应一个CoreEngine
  • identity用于ZMQ ROUTER/DEALER模式的消息路由

3.14.3 EngineZmqAddresses

@dataclassclassEngineZmqAddresses:inputs:list[str]# 前端→Core的输入地址outputs:list[str]# Core→前端的输出地址coordinator_input:str|None# Coordinator输入地址coordinator_output:str|None# Coordinator输出地址frontend_stats_publish_address:str|None# 统计发布地址

3.14.4 CoreEngineProcManager

classCoreEngineProcManager:def__init__(self,local_engine_count,start_index,...):# 1. 创建multiprocessing.Process列表# 2. 设置设备控制环境变量(DP场景)# 3. 配置NUMA亲和性# 4. 启动所有进程# 5. 如果任何进程启动失败,停止所有进程

设备控制set_device_control_env_var()

  • 在DP场景下,每个engine进程只能看到自己的GPU
  • 临时修改CUDA_VISIBLE_DEVICES或等效环境变量
  • 使用unittest.mock.patch.dict安全地修改os.environ

NUMA亲和性numa_utils.configure_subprocess()

  • 将engine进程绑定到特定NUMA节点
  • 减少跨NUMA内存访问延迟

3.14.5 CoreEngineActorManager

Ray Actor版本的进程管理器,支持多节点部署:

  • 使用Ray placement group管理GPU资源
  • 支持MoE模型的DPMoEEngineCoreActor
  • 自动创建placement groups或使用传入的

3.14.6 SignalCallback

classSignalCallback:def__init__(self,callback):self._event=threading.Event()self._thread=threading.Thread(target=self._run,daemon=True)self._thread.start()deftrigger(self):self._event.set()# 安全地从信号处理器调用
  • 信号处理器中只能调用async-signal-safe函数
  • Event.set()是async-signal-safe
  • 实际回调在专用线程中执行

附录

A. 关键配置参数

参数默认值说明
data_parallel_size1DP并行度
data_parallel_external_lbFalse外部DP负载均衡
enable_elastic_epFalse弹性EP
enable_chunked_prefillTrue分块prefill
enable_prefix_cachingFalse前缀缓存
max_model_lenauto最大序列长度(可能被auto-fit降低)
block_size16KV Cache block大小

B. ZMQ Socket类型总结

Socket类型用途
input_socketDEALER (client) / ROUTER (core)请求输入
output_socketPULL (client) / PUSH (core)输出拉取
stats_socketSUB (client) / PUB (coordinator)统计订阅
first_req_socketPAIRfirst-request-per-wave
handshake_socketDEALER/ROUTER就绪握手

C. 模块依赖关系图

D. 完整请求生命周期总结

  1. 客户端调用generate(prompt, params)
  2. InputProcessor验证和转换请求
  3. OutputProcessor注册RequestState
  4. EngineCoreClient通过ZMQ发送EngineCoreRequest
  5. TensorIpcSender传输多模态/LoRA张量
  6. EngineCore.step(): schedule → execute_model → update_from_output
  7. EngineCoreOutput通过ZMQ返回Frontend
  8. OutputProcessor: detokenize → stop_check → logprobs → build_output
  9. Per-request Queue分发到客户端生成器
  10. 客户端yield RequestOutput,直到finished=True
http://www.jsqmd.com/news/707015/

相关文章:

  • 【Linux从入门到精通】第20篇:性能监控工具大盘点
  • RWKV7-1.5B-G1A模型效果展示:对比传统LSTM在文本生成上的优势
  • CAPIO架构:基于CHERI的细粒度安全用户空间驱动
  • 2026成都诚信净水系统供应商:家用地暖供应商、家用格力空调供应商、新风系统中央空调、格力中央空调供应商、格力中央空调总代理选择指南 - 优质品牌商家
  • ESP32-S3-BOX-3开发套件:智能语音与物联网应用实战
  • 机器学习数据快速分析:实战方法与关键洞察
  • 大语言模型幻觉现象解析与应对策略
  • 工业级Wi-Fi 7接入点EKI-6333BE-4GD技术解析与应用
  • AAEON GENE-EHL5工业级单板计算机解析与应用
  • 从新回看《道德经》第二十二章的炊者不立,发现了权力熵增定律的底层逻辑
  • 【Linux从入门到精通】第21篇:Shell脚本开篇——什么是Shell?写第一个Hello World
  • API版本管理:向后兼容与平滑升级的企业级方案
  • Docker AI Toolkit 2026隐藏模式曝光:仅限docker ai enable --stealth启动的联邦学习协调器(附实测吞吐对比表)
  • 2026年Q2四川民宿规划设计标杆名录及核心参数对比:成都商业规划设计公司/成都太空舱民宿公司/成都景区规划推荐/选择指南 - 优质品牌商家
  • GLM-4.1V-9B-Base与C语言交互:通过本地API实现轻量级集成
  • 不止于展示:用3D WebView for Windows在Unity里打造可交互的Web AR/VR应用原型
  • 那些“无用”的书,成就一个“有趣”的人
  • OpenAI发表Nature论文:揭开AI模型总“说谎”的真相,人类对AI准确性的评估促使其产生幻觉
  • Copilot Next 工作流自动化配置到底难在哪?92%开发者卡在第3步——资深架构师逐行调试实录
  • Ryujinx模拟器完全指南:跨平台Switch游戏体验与深度优化策略
  • 自由程序员越全能,越赚不到钱?别再死撑着当“全能工具人”了。
  • 机器学习随机性评估:重复实验次数计算与实践
  • 第二周.系统管理相关的操作总结
  • DTVM:融合EVM生态与Wasm性能的下一代确定性虚拟机
  • 嵌入式AI新选择:将Phi-4-mini-flash-reasoning推理集成到STM32开发流程
  • dij免费问题
  • SystemC Export API参数管理机制与硬件仿真实践
  • ARM与Thumb指令集详解:寄存器使用与性能优化
  • LiuJuan20260223Zimage作品展示:看看这个模型生成的图片效果
  • 机器学习算法清单构建与应用实践指南