CANN-NPU 显存回收策略:内存碎片整理与显存池化机制实战
一、显存碎片从哪来
1.1 碎片的两种形态
外部碎片——总空闲内存够用,但不连续。比如有 4 块 128MB 空闲,但需要一块 512MB 的连续内存,分配失败。
内部碎片——分配器按固定大小的块分配,实际使用的比分配的小。比如分配 400KB,分配器给 512KB,浪费了 112KB。
1.2 为什么 NPU 显存碎片更严重
- 批量推理:不同 batch size 需要不同大小的 buffer,反复分配释放
- 动态 Shape:每次推理的中间 tensor 大小不同,无法复用
- 多模型:多个模型交替执行,显存需求波动大
以一个典型推理服务为例:
时间线: 0s: 分配模型A权重 200MB → 空闲 600MB (连续) 1s: 分配激活缓冲 100MB → 空闲 500MB (连续) 2s: 释放缓冲 → 空闲 500MB (连续) 3s: 分配模型B权重 300MB → 空闲 200MB (连续) 4s: 分配激活缓冲 150MB → 空闲 50MB 5s: 释放模型B权重 → 空闲 350MB (碎片: 150MB + 200MB) 6s: 分配 250MB → 失败! (碎片无法满足)二、池化分配器设计
2.1 核心思想
预分配一大块显存,自己管理分配和释放,避免频繁调用系统 API。同时维护不同大小的空闲块列表,减少碎片。
importthreadingfromcollectionsimportdefaultdictclassNPUAllocator:"""NPU 显存池化分配器 设计要点: 1. 预分配: 启动时一次性申请整块显存 2. 大小类: 按 2 的幂分级管理,减少内部碎片 3. 空闲链表: 每个大小类维护空闲块链表,O(1) 分配 4. 线程安全: 推理服务多线程调用,需要加锁 为什么用 2 的幂分级? - 分配 400KB → 给 512KB 块,内部碎片率约 22% - 如果用 1KB 粒度 → 碎片率 < 0.5%,但管理开销大 - 2 的幂是折中:碎片率可控,管理高效 """def__init__(self,total_size=8*1024*1024*1024):# 默认 8GBself.total_size=total_size self.used_size=0self.lock=threading.Lock()# 按大小类管理空闲块# key = 块大小 (2 的幂), value = 空闲块列表self.free_blocks=defaultdict(list)# 地址 → 大小 映射(用于释放时查找)self.block_map={}# 预分配的整块内存(模拟)self._pool=bytearray(total_size)self._base_addr=id(self._pool)# 初始化: 把整块内存加入空闲列表self._add_free_block(self._base_addr,total_size)def_next_power_of_2(self,size):"""将大小向上取到 2 的幂"""power=1whilepower<size:power<<=1returnpowerdef_add_free_block(self,addr,size):self.free_blocks[size].append(addr)defallocate(self,size):"""分配内存 策略: 1. 找到 >= size 的最小 2 的幂大小类 2. 从该类的空闲链表取一块 3. 如果没有,尝试拆分更大的块 4. 如果都没有,触发碎片整理 """withself.lock:aligned_size=self._next_power_of_2(size)# 尝试从对应大小类分配block=self._try_allocate(aligned_size)ifblockisnotNone:addr,block_size=block self.block_map[addr]=block_size self.used_size+=block_sizereturnaddr# 尝试从更大的块拆分block=self._split_allocate(aligned_size)ifblockisnotNone:addr,block_size=block self.block_map[addr]=block_size self.used_size+=block_sizereturnaddr# 内存不足raiseMemoryError(f"显存分配失败: 需要{size}bytes, "f"已用{self.used_size}/{self.total_size}")def_try_allocate(self,aligned_size):"""从指定大小类尝试分配"""ifself.free_blocks[aligned_size]:addr=self.free_blocks[aligned_size].pop()returnaddr,aligned_sizereturnNonedef_split_allocate(self,aligned_size):"""从更大的空闲块拆分"""forblock_sizeinsorted(self.free_blocks.keys()):ifblock_size>aligned_sizeandself.free_blocks[block_size]:# 拆分: 一大变两小addr=self.free_blocks[block_size].pop()remaining_size=block_size-aligned_size# 剩余部分放回空闲列表self._add_free_block(addr+aligned_size,remaining_size)returnaddr,aligned_sizereturnNonedeffree(self,addr):"""释放内存 关键操作: 尝试合并相邻的空闲块,减少外部碎片。 """withself.lock:ifaddrnotinself.block_map:returnblock_size=self.block_map.pop(addr)self.used_size-=block_size# 尝试合并相邻空闲块merged_addr,merged_size=self._try_merge(addr,block_size)self._add_free_block(merged_addr,merged_size)def_try_merge(self,addr,size):"""尝试与相邻空闲块合并 这是减少外部碎片的核心。 每次释放时检查左右两侧是否有空闲块,有则合并。 """merged_addr=addr merged_size=size# 检查右侧相邻块right_addr=addr+sizeforblock_size,addrsinlist(self.free_blocks.items()):forainaddrs:ifa==right_addr:merged_size+=block_size addrs.remove(a)break# 检查左侧相邻块left_addr=addr-merged_sizeforblock_size,addrsinlist(self.free_blocks.items()):forainaddrs:ifa+block_size==left_addr:merged_addr=a merged_size+=block_size addrs.remove(a)breakreturnmerged_addr,merged_sizedefstats(self):"""返回显存使用统计"""withself.lock:free_size=self.total_size-self.used_sizereturn{'total':self.total_size,'used':self.used_size,'free':free_size,'usage_rate':f"{self.used_size/self.total_size*100:.1f}%",'fragment_count':len(self.block_map),}2.2 使用示例
# 初始化 8GB 显存池allocator=NPUAllocator(total_size=8*1024*1024*1024)# 分配模型权重weight_buf=allocator.allocate(200*1024*1024)# 200MBinput_buf=allocator.allocate(100*1024*1024)# 100MB# 使用后释放allocator.free(input_buf)# 查看统计print(allocator.stats())# {'total': 8589934592, 'used': 209715200, 'free': 8380219392, ...}三、碎片整理策略
3.1 紧凑式碎片整理
把所有已分配的块移到一端,空闲空间连成一片。需要暂停所有推理任务。
defcompact(self):"""紧凑式碎片整理 步骤: 1. 暂停推理(阻止新分配) 2. 计算所有已分配块的新地址(连续排列) 3. 搬移数据到新地址 4. 更新地址映射 5. 恢复推理 代价: - 暂停时间与已分配块数量成正比 - 数据搬移消耗带宽 - 适合在低峰期执行 """withself.lock:# 收集所有已分配块allocated=sorted(self.block_map.items(),key=lambdax:x[0])# 计算新地址new_addr=self._base_addr addr_mapping={}# 旧地址 → 新地址forold_addr,block_sizeinallocated:addr_mapping[old_addr]=new_addr new_addr+=block_size# 搬移数据(模拟)forold_addr,new_addrinaddr_mapping.items():# 实际中需要用 memcpy 或 DMA 搬移pass# 更新映射self.block_map.clear()forold_addr,new_addrinaddr_mapping.items():self.block_map[new_addr]=self.block_map.get(old_addr,0)# 重置空闲列表self.free_blocks.clear()free_start=new_addr free_size=self._base_addr+self.total_size-free_startiffree_size>0:self._add_free_block(free_start,free_size)3.2 增量式碎片整理
不需要暂停,每次释放时主动合并。这是日常策略。
defincremental_compact(self):"""增量式碎片整理 在每次 free() 时自动执行,不需要暂停。 通过合并相邻空闲块,逐步减少碎片。 这是日常策略,大部分场景够用。 """# 已经在 free() 的 _try_merge 中实现了pass3.3 策略对比
| 策略 | 适用场景 | 暂停时间 | 碎片消除效果 |
|---|---|---|---|
| 增量合并 | 日常运行 | 无 | 中等 |
| 紧凑整理 | 低峰期 | 高 | 最佳 |
| 重新分配 | 模型切换 | 中等 | 好 |
四、针对推理场景的优化
4.1 预分配策略
推理服务启动时就预估好需要多少显存,一次性分配到位。
defpreallocate_for_inference(model_configs):"""根据模型配置预分配显存 推理服务的显存需求相对可预测: - 模型权重: 固定大小,启动时确定 - KV Cache: 最大序列长度 × batch size × 层数 - 激活缓冲: 最大输入 shape × 中间层最大宽度 """allocator=NPUAllocator()forconfiginmodel_configs:# 预分配权重缓冲weight_buf=allocator.allocate(config['weight_size'])# 预分配 KV Cache(按最大序列长度)kv_cache_size=(config['max_seq_len']*config['max_batch']*config['num_layers']*config['hidden_dim']*2# K 和 V)kv_buf=allocator.allocate(kv_cache_size)returnallocator4.2 多级缓存
把显存分成几个区域,优先用小的、快的区域。
classMultiLevelCache:"""多级显存缓存 L1: 寄存器文件 — 最快,容量最小(几十 KB) L2: SRAM 缓存 — 快,容量中等(几百 KB) L3: HBM 显存 — 慢,容量大(GB 级) 策略: 频繁访问的数据放 L1/L2,不频繁的放 L3。 """def__init__(self):self.l1={}# 寄存器级缓存self.l2={}# SRAM 级缓存self.l3={}# HBM 级缓存defget(self,key):ifkeyinself.l1:returnself.l1[key]ifkeyinself.l2:val=self.l2.pop(key)self.l1[key]=valreturnvalifkeyinself.l3:val=self.l3.pop(key)self.l2[key]=valreturnvalreturnNonedefput(self,key,value,level='l3'):iflevel=='l1':self.l1[key]=valueeliflevel=='l2':self.l2[key]=valueelse:self.l3[key]=value五、监控与诊断
classMemoryMonitor:"""显存使用监控 持续跟踪显存使用情况,发现异常及时告警。 """def__init__(self,allocator,threshold=0.9):self.allocator=allocator self.threshold=threshold self.history=[]defcheck(self):stats=self.allocator.stats()self.history.append(stats)usage_rate=stats['used']/stats['total']ifusage_rate>self.threshold:print(f"[WARN] 显存使用率{usage_rate:.1%}超过阈值{self.threshold:.0%}")returnstatsdefreport(self):"""生成显存使用报告"""ifnotself.history:return"无数据"latest=self.history[-1]peak_used=max(h['used']forhinself.history)report=f""" 显存使用报告: 总量:{latest['total']/1024**3:.1f}GB 当前使用:{latest['used']/1024**3:.1f}GB ({latest['usage_rate']}) 峰值使用:{peak_used/1024**3:.1f}GB ({peak_used/latest['total']:.1%}) 碎片数:{latest['fragment_count']}"""returnreport六、常见问题
| 问题 | 原因 | 解决方案 |
|---|---|---|
| OOM 但总空闲够用 | 外部碎片严重 | 执行紧凑式碎片整理 |
| 分配速度变慢 | 空闲链表太长 | 调整大小类粒度,减少链表长度 |
| 显存泄漏 | 忘记释放 buffer | 用 MemoryMonitor 监控,定期检查 |
| 推理延迟抖动 | 碎片整理暂停了推理 | 用增量式整理,避免暂停 |
相关仓库
- CANN- 昇腾计算架构 https://gitee.com/ascend/cann
- jemalloc- 高性能内存分配器 https://github.com/jemalloc/jemalloc
- tcmalloc- Google 线程级内存分配器 https://github.com/google/tcmalloc
- PoolAllocator- 虚拟显存池化分配器 https://github.com/vllm-project/vllm
