GPU 调度与 AI 推理优化:从独占模式到分时复用,算力资源的极致压榨
GPU 调度与 AI 推理优化:从独占模式到分时复用,算力资源的极致压榨
一、GPU 资源的浪费困境:独占模式的低利用率
AI 推理服务的 GPU 利用率普遍偏低。在线推理场景中,请求量在白天高峰和夜间低谷之间波动剧烈,低谷期 GPU 利用率可能低于 10%。但 GPU 实例通常以独占模式分配——一个推理服务独占一整块 GPU,即使只使用了 10% 的算力,其余 90% 也在空转。
更深层的问题是 GPU 资源的碎片化。不同模型对 GPU 显存的需求差异巨大——7B 模型需要 14GB,70B 模型需要 140GB。当集群中同时运行多种模型时,显存分配容易产生碎片,导致大模型无法找到足够的连续显存空间启动,而小模型又无法利用碎片显存。
二、GPU 分时复用与虚拟化架构
flowchart TD A[GPU 硬件] --> B[GPU 虚拟化层] B --> B1[MPS: 多进程服务] B --> B2[MIG: 多实例 GPU] B --> B3[时间片: vGPU] B1 --> C[调度策略层] B2 --> C B3 --> C C --> C1[推理任务: 优先级高/延迟敏感] C --> C2[微调任务: 优先级中/可中断] C --> C3[训练任务: 优先级低/可抢占] C1 --> D[显存管理] C2 --> D C3 --> D D --> D1[KV Cache 分配] D --> D2[模型权重共享] D --> D3[显存池化]2.1 GPU 显存池化管理
# gpu_memory_pool.py — GPU 显存池化管理 # 设计意图:将 GPU 显存抽象为可分配的资源池, # 支持多个推理任务共享同一块 GPU,提高显存利用率 import time from dataclasses import dataclass, field from typing import Optional from enum import Enum class AllocationStatus(Enum): ALLOCATED = "allocated" FREED = "freed" PENDING = "pending" @dataclass class MemoryBlock: block_id: str start_offset: int # 显存偏移量(MB) size: int # 块大小(MB) status: AllocationStatus owner: Optional[str] = None # 占用者任务 ID allocated_at: float = 0 @dataclass class GPUMemoryPool: gpu_id: str total_memory_mb: int blocks: list[MemoryBlock] = field(default_factory=list) def __post_init__(self): # 初始化为一个大空闲块 self.blocks = [MemoryBlock( block_id="initial", start_offset=0, size=self.total_memory_mb, status=AllocationStatus.FREED, )] def allocate(self, task_id: str, size_mb: int) -> Optional[MemoryBlock]: """分配显存块""" # 查找足够大的空闲块(首次适配) for i, block in enumerate(self.blocks): if block.status == AllocationStatus.FREED and block.size >= size_mb: # 分割空闲块 allocated = MemoryBlock( block_id=f"{task_id}-{int(time.time())}", start_offset=block.start_offset, size=size_mb, status=AllocationStatus.ALLOCATED, owner=task_id, allocated_at=time.time(), ) # 更新剩余空闲块 remaining_size = block.size - size_mb if remaining_size > 0: remaining = MemoryBlock( block_id=f"free-{i}", start_offset=block.start_offset + size_mb, size=remaining_size, status=AllocationStatus.FREED, ) self.blocks[i] = remaining self.blocks.insert(i, allocated) else: self.blocks[i] = allocated return allocated # 没有足够大的连续空闲块,尝试合并碎片 self._defragment() return self.allocate(task_id, size_mb) # 合并后重试 def free(self, task_id: str) -> int: """释放任务占用的所有显存""" freed = 0 for block in self.blocks: if block.owner == task_id: block.status = AllocationStatus.FREED block.owner = None freed += block.size # 合并相邻空闲块 self._merge_free_blocks() return freed def get_utilization(self) -> float: """计算显存利用率""" used = sum(b.size for b in self.blocks if b.status == AllocationStatus.ALLOCATED) return used / self.total_memory_mb def _defragment(self): """显存碎片整理""" self._merge_free_blocks() def _merge_free_blocks(self): """合并相邻的空闲块""" merged = [] for block in sorted(self.blocks, key=lambda b: b.start_offset): if (merged and merged[-1].status == AllocationStatus.FREED and block.status == AllocationStatus.FREED and merged[-1].start_offset + merged[-1].size == block.start_offset): # 合并 merged[-1].size += block.size else: merged.append(block) self.blocks = merged2.2 推理任务调度器
# inference_gpu_scheduler.py — GPU 推理任务调度器 # 设计意图:在多个推理任务之间调度 GPU 资源, # 支持优先级抢占和弹性分配 import time from dataclasses import dataclass, field from typing import Optional from enum import Enum class TaskType(Enum): ONLINE_INFERENCE = "online" # 在线推理,延迟敏感 BATCH_INFERENCE = "batch" # 批量推理,可延迟 FINE_TUNING = "finetuning" # 微调,可中断 @dataclass class InferenceTask: task_id: str task_type: TaskType model_name: str memory_required_mb: int min_replicas: int = 1 max_replicas: int = 4 current_replicas: int = 0 avg_latency_ms: float = 0 qps: float = 0 created_at: float = field(default_factory=time.time) @dataclass class SchedulingDecision: task_id: str action: str # scale_up / scale_down / migrate / hold target_replicas: int target_gpu: Optional[str] = None reason: str = "" class InferenceGPUScheduler: def __init__(self): self.gpus: dict[str, GPUMemoryPool] = {} self.tasks: dict[str, InferenceTask] = {} self.task_assignments: dict[str, list[str]] = {} # task_id → [gpu_ids] def register_gpu(self, gpu_id: str, memory_mb: int): self.gpus[gpu_id] = GPUMemoryPool(gpu_id=gpu_id, total_memory_mb=memory_mb) def submit_task(self, task: InferenceTask) -> SchedulingDecision: """提交推理任务""" self.tasks[task.task_id] = task return self._schedule_task(task) def _schedule_task(self, task: InferenceTask) -> SchedulingDecision: """调度任务到 GPU""" # 查找有足够显存的 GPU for gpu_id, pool in self.gpus.items(): utilization = pool.get_utilization() # 预留 20% 显存给 KV Cache available = pool.total_memory_mb * (1 - utilization) * 0.8 if available >= task.memory_required_mb: block = pool.allocate(task.task_id, task.memory_required_mb) if block: task.current_replicas += 1 self.task_assignments.setdefault(task.task_id, []).append(gpu_id) return SchedulingDecision( task_id=task.task_id, action="scale_up", target_replicas=task.current_replicas, target_gpu=gpu_id, reason=f"分配到 {gpu_id},显存利用率 {utilization:.0%}" ) # 没有足够显存,尝试抢占低优先级任务 if task.task_type == TaskType.ONLINE_INFERENCE: return self._preempt_for_online_task(task) return SchedulingDecision( task_id=task.task_id, action="hold", target_replicas=0, reason="无可用 GPU 资源,任务排队等待" ) def _preempt_for_online_task(self, task: InferenceTask) -> SchedulingDecision: """为在线推理任务抢占资源""" # 找到可抢占的批量推理任务 for tid, t in self.tasks.items(): if t.task_type == TaskType.BATCH_INFERENCE and t.current_replicas > 0: # 缩减批量任务的副本 gpu_id = self.task_assignments.get(tid, [])[-1] if gpu_id: pool = self.gpus.get(gpu_id) if pool: freed = pool.free(tid) t.current_replicas -= 1 self.task_assignments[tid].pop() # 尝试分配给在线任务 block = pool.allocate(task.task_id, task.memory_required_mb) if block: task.current_replicas += 1 self.task_assignments.setdefault(task.task_id, []).append(gpu_id) return SchedulingDecision( task_id=task.task_id, action="scale_up", target_replicas=task.current_replicas, target_gpu=gpu_id, reason=f"抢占批量任务 {tid} 的资源" ) return SchedulingDecision( task_id=task.task_id, action="hold", target_replicas=0, reason="无可抢占的资源" ) def get_cluster_stats(self) -> dict: """获取集群统计信息""" total_memory = sum(g.total_memory_mb for g in self.gpus.values()) used_memory = sum( sum(b.size for b in g.blocks if b.status == AllocationStatus.ALLOCATED) for g in self.gpus.values() ) return { "gpu_count": len(self.gpus), "total_memory_mb": total_memory, "used_memory_mb": used_memory, "utilization": used_memory / total_memory if total_memory > 0 else 0, "task_count": len(self.tasks), }三、模型权重共享与显存优化
3.1 模型权重共享
# weight_sharing.py — 模型权重共享机制 # 设计意图:多个推理实例共享同一份模型权重, # 减少显存占用,支持同模型多副本部署 from dataclasses import dataclass from typing import Optional import hashlib @dataclass class SharedWeights: model_name: str version: str memory_mb: int checksum: str ref_count: int = 0 # 引用计数 gpu_id: Optional[str] = None class WeightSharingManager: def __init__(self): self.weights: dict[str, SharedWeights] = {} # key: model_name:version def load_weights(self, model_name: str, version: str, memory_mb: int, gpu_id: str) -> SharedWeights: """加载模型权重(如果已存在则增加引用计数)""" key = f"{model_name}:{version}" if key in self.weights: # 权重已加载,增加引用计数 weights = self.weights[key] weights.ref_count += 1 return weights # 首次加载 weights = SharedWeights( model_name=model_name, version=version, memory_mb=memory_mb, checksum=hashlib.md5(f"{model_name}:{version}".encode()).hexdigest()[:8], ref_count=1, gpu_id=gpu_id, ) self.weights[key] = weights return weights def release_weights(self, model_name: str, version: str) -> bool: """释放模型权重(引用计数归零时卸载)""" key = f"{model_name}:{version}" weights = self.weights.get(key) if not weights: return False weights.ref_count -= 1 if weights.ref_count <= 0: # 引用计数归零,卸载权重 del self.weights[key] return True # 通知调用方释放显存 return False def get_sharing_stats(self) -> dict: """获取权重共享统计""" total_savings = sum( w.memory_mb * (w.ref_count - 1) for w in self.weights.values() if w.ref_count > 1 ) return { "shared_models": len([w for w in self.weights.values() if w.ref_count > 1]), "total_savings_mb": total_savings, "weights": [ {"model": w.model_name, "version": w.version, "refs": w.ref_count, "memory_mb": w.memory_mb} for w in self.weights.values() ], }四、边界分析与架构权衡
显存碎片的整理开销:显存碎片整理需要移动已分配的块,这涉及 GPU 显存的数据拷贝,开销不可忽视。频繁整理会影响推理延迟。需要在碎片率和整理频率之间平衡——碎片率超过阈值时才触发整理。
优先级抢占的恢复延迟:被抢占的批量任务需要重新调度和加载模型权重,恢复延迟可能达到分钟级。频繁抢占会严重影响批量任务的完成时间。需要设置抢占冷却期,避免同一任务被反复抢占。
权重共享的一致性:多个推理实例共享同一份权重,意味着权重不可变。如果需要热更新模型版本,必须先加载新权重,再逐步切换流量,最后卸载旧权重。这增加了部署流程的复杂度。
MPS 的隔离性限制:NVIDIA MPS 允许多个进程共享 GPU,但缺乏内存隔离——一个进程的错误可能影响其他进程。MIG 提供了硬件级隔离,但只支持 A100/H100 等高端 GPU,且分片数量有限(最多 7 个实例)。
五、总结
GPU 调度与 AI 推理优化通过显存池化、优先级调度和权重共享三层机制,将 GPU 利用率从独占模式的 10-30% 提升到共享模式的 60-80%。核心机制包括:显存池化管理支持多任务共享 GPU,优先级调度确保在线推理优先获取资源,权重共享减少同模型多副本的显存占用。但碎片整理开销、抢占恢复延迟、权重共享一致性和隔离性限制是需要权衡的边界条件。落地建议:从显存池化开始验证共享模式;在线推理和批量推理分优先级调度;同模型多副本启用权重共享;MIG 用于强隔离需求,MPS 用于高密度场景。
补充落地建议:围绕“GPU 调度与 AI 推理优化:从独占模式到分时复用,算力资源的极致压榨”继续推进时,应把验证标准写成可执行清单,而不是停留在经验判断。性能类方案要给出基准数据,架构类方案要给出故障隔离方式,AI 类方案要给出输出质量和人工兜底策略。每一次迭代都应回答三个问题:收益是否可量化,失败是否可回滚,维护成本是否被团队接受。
如果短期资源有限,可以先保留最关键的观测指标,包括处理耗时、失败率、资源占用和人工介入次数。等这些指标稳定后,再扩展自动化能力。这样的节奏更慢,但风险更低,也更符合生产级技术文章强调的工程可验证性。
