当前位置: 首页 > news >正文

完整教程:npu_moe_distribute_combine算子代码分析

前言

 本篇博客是《MoeDistributeDispatch算子代码阅读》的姊妹篇。
 在EP并行场景中,每张卡部署了不同的专家。npu_moe_distribute_dispatch根据expert_ids,将hidden states发送到对应专家所在的rank上。
单个专家的运算,用代码语言描述:

class Expert(nn.Module):
def __init__(self, dim: int, inter_dim: int):
super().__init__()
self.w1 = Linear(dim, inter_dim)
self.w2 = Linear(inter_dim, dim)
self.w3 = Linear(dim, inter_dim)
def forward(self, x: torch.Tensor) -> torch.Tensor:
return self.w2(F.silu(self.w1(x)) * self.w3(x))

 source: DeepSeek-V3 的 MoE 架构解析:细粒度专家与高效模型扩展
 推理平台上,使用FusedMoE,输出多个专家的计算结果。例如vllm-ascend中的实现逻辑:

def unquant_apply_mlp(hidden_states: torch.Tensor,
w1: torch.Tensor,
w2: torch.Tensor,
group_list: torch.Tensor,
group_list_type: int = 1,
topk_scales: Optional[torch.Tensor] = None,
need_trans: bool = True) -> torch.Tensor:
if need_trans:
w1 = w1.transpose(1, 2)
w2 = w2.transpose(1, 2)
gate_up_out = torch_npu.npu_grouped_matmul(
x=[hidden_states],
weight=[w1],
split_item=2,
group_list_type=group_list_type,
group_type=0,
group_list=group_list,
)[0]
if is_310p():
gate_up_out = torch_npu.npu_swiglu(gate_up_out.to(torch.float32)).to(
torch.float16)
else:
gate_up_out = torch_npu.npu_swiglu(gate_up_out)
if topk_scales is not None:
gate_up_out *= topk_scales
hidden_states = torch_npu.npu_grouped_matmul(
x=[gate_up_out],
weight=[w2],
split_item=2,
group_list_type=group_list_type,
group_type=0,
group_list=group_list,
)[0]
return hidden_states

 source: moe_mlp.py
 参数对应关系:

算子参数1参数2
npu_moe_distribute_dispatchexpert_token_numsexpert_token_nums_type
npu_grouped_matmulgroup_listgroup_list_type

expert_token_nums_type(group_list_type)为0时候,npu_moe_distribute_dispatch输出cusum型的token计数,来自上一篇的数据示例:
--------expertTokenNumsOutGMTensor------------
3 6 11 16 17 22 27 30
32 34 36 41 44 46 47 50
 专家计算完之后,从哪里来的输入,对应的输出需要返回到对应的rank上,由MoeDistributeCombine算子负责。

MoeDistributeCombine

文档资料

 torch_npu.npu_moe_distribute_combine接口说明:
https://www.hiascend.com/document/detail/zh/Pytorch/700/apiref/apilist/ptaoplist_002363.html
 代码实现:
https://gitcode.com/cann/ops-transformer/tree/master/mc2/moe_distribute_combine
 ep_send_counts (Tensor):必选参数。表示本卡每个专家发给EP(Expert Parallelism)域每个卡的token数(token数以前缀和的形式表示),要求为1维张量。数据类型支持int32,对应torch_npu.npu_moe_distribute_dispatch的ep_recv_counts输出。
 expand_idx (Tensor):必选参数。表示给同一专家发送的token个数,要求为1维张量。数据类型支持int32,对应torch_npu.npu_moe_distribute_dispatch的expand_idx输出。
来自上一篇博客的数据,rank0打印的数据,sendCountsGlobal(ep_send_counts), expand_idx(expandIdxGMTensor):
--------sendCountsGlobal------------
2 3 5 6 9 11 13 16
16 17 18 22 24 27 28 30
31 32 33 34 35 36 39 41
43 44 45 46 47 47 47 50
--------expandIdxGMTensor------------
0 0 0 0 0 0 0 0 0 0 0 0
0 0 1 0 1 0 1 1 0 0 0 0
0 0 2 1 1 1 1 2 1 0 1 1
1 0 0 1 1 2 0 1 2 1 1 2
sendCountsGlobal数字的含义:
rank0上有2个token需要专家0处理。
rank1传输1个token,需要专家0处理: 1= 3 -2
rank0上有2个token需要专家1处理: 2 = 5 - 3
rank1传输1个token,需要专家1处理: 1 = 6 - 5

主要流程

template 
__aicore__ inline void MoeDistributeCombine::Process()
{if constexpr (IsNeedReduceScatter) {ReduceScatterTrans();}BuffInit();SetWaitTpStatusAndDisPatch();AlltoAllBuffInit();SetStatus();WaitDispatch();LocalWindowCopy();
}

SetWaitTpStatusAndDisPatch

 主要看ExpertAlltoAllDispatchCopyAdd的处理。epSendCountLocal_参考上面打印的sendCountsGlobal。

template 
__aicore__ inline void MoeDistributeCombine::ExpertAlltoAllDispatchCopyAdd()
{// 分核是按照卡数去分的,先循环单卡上每个专家,再循环处理当前核处理的卡号,因为网络中一个专家的放在一起处理for (uint32_t expertIdx = 0U; expertIdx < curRankExpertNum; expertIdx++) {for (uint32_t ep = startRankId_ ; ep < endRankId_; ep++) {if ((ep > 0) || (expertIdx > 0)) {preCount = epSendCountLocal_.GetValue(expertIdx * epWorldSize_ + ep - 1);}curTokenNum = epSendCountLocal_.GetValue(expertIdx * epWorldSize_ + ep) - preCount;if (curTokenNum == 0) {continue;}startTokenIdx = preCount * axisH_;ExpertAlltoAllDispatchInnerCopyAdd(curTokenNum, startTokenIdx, ep, expertIdx);}}
}
template 
__aicore__ inline void MoeDistributeCombine::ExpertAlltoAllDispatchInnerCopyAdd(uint32_t tokenNumLoop, uint32_t srcStartTokenIdx, uint32_t ep, uint32_t expertIdx)
{// 获取对应卡上 window 的首地址GM_ADDR rankGM = GetWinAddrByRankId(ep, EP_DOMAIN, expertIdx) + epDataOffsetOnWin_;
}

 根据ep的值,决定是本地拷贝还是RDMA。
 根据epSendCountLocal_中的值,0号专家有3个输出。头2个要拷贝到rank0,最后1个要拷贝到rank1。

SetStatus

不再分析

WaitDispatch

不再分析

LocalWindowCopy

 数据同步完成后,拷贝数据。

template 
__aicore__ inline void MoeDistributeCombine::LocalWindowCopy()
{for (uint32_t tokenIndex = beginIndex; tokenIndex < endIndex; tokenIndex++) {uint32_t index = tokenIndex * axisK_;int32_t moeExpert = 0;float scaleVal = 0.0;GM_ADDR wAddr;SyncFunc(); // 与结果搬出datacopy同tensorDuplicate(sumFloatBufLocal, (float)0, axisH_);LocalTensor tmpUb;for (uint32_t i = 0; i < axisK_; i++) {moeExpert = expertIdsLocal.GetValue(index);scaleVal = expandScalesLocal.GetValue(index);wAddr = (__gm__ uint8_t *)(epWindowGM_) + expertPerSizeOnWin_ * moeExpertPerRankNum_ *sharedExpertRankNum_ + expertPerSizeOnWin_ * moeExpert +indexCountsLocal.GetValue(index) * axisHExpandXTypeSize_ + tokenOffset * sizeof(ExpandXType);rowTmpGlobal_.SetGlobalBuffer((__gm__ ExpandXType *)wAddr);tmpUb = moeSumQueue_.AllocTensor();if constexpr (IsQuant) {DataCopy(tmpUb, rowTmpGlobal_, quantCopyLen);} else {DataCopy(tmpUb, rowTmpGlobal_, processLen);SyncFunc();}moeSumQueue_.EnQue(tmpUb);tmpUb = moeSumQueue_.DeQue();if constexpr (IsQuant) {DequantProcess(tmpUb);}Cast(rowTmpFloatLocal, tmpUb, AscendC::RoundMode::CAST_NONE, processLen);PipeBarrier();AscendC::Muls(mulBufLocal, rowTmpFloatLocal, scaleVal, processLen);PipeBarrier();AscendC::Add(sumFloatBufLocal, sumFloatBufLocal, mulBufLocal, processLen);index++;moeSumQueue_.FreeTensor(tmpUb);}
}

 地址偏移的计算:

expertPerSizeOnWin_ * moeExpertPerRankNum_ *sharedExpertRankNum_ + expertPerSizeOnWin_ * moeExpert +indexCountsLocal.GetValue(index) * axisHExpandXTypeSize_ + tokenOffset * sizeof(ExpandXType);

 moeExpert从expertIdsLocal获取的值,就是本rank推理token产生的expertIds。
 rank=0上的expertIds为:
--------expertIds------------
11 17 29 12 24 23 1 0 16 30 18 8
2 14 11 3 30 21 12 0 10 9 6 31
22 27 30 21 1 24 17 11 3 19 2 29
16 13 7 27 6 29 5 22 24 19 23 2
 indexCountsLocal是从expandIdxGM拷贝的数据。
 rank=0上的expandIdxGM为,就是相同专家id的计数值。参考MoeDistributeDispatch算子中的函数CalTokenSendExpertCnt。
--------expandIdxGMTensor------------
0 0 0 0 0 0 0 0 0 0 0 0
0 0 1 0 1 0 1 1 0 0 0 0
0 0 2 1 1 1 1 2 1 0 1 1
1 0 0 1 1 2 0 1 2 1 1 2

http://www.jsqmd.com/news/120315/

相关文章:

  • 用 .NET MAUI 10 + VS Copilot 从 0 开发一个签到 App(六)登录
  • 信息学奥赛一本通 1616:A 的 B 次方
  • 微信开发者secret和appid获取方法
  • 解锁大模型“能干活“的秘诀:RAG×MoE技术组合深度解析
  • Java毕设选题推荐:基于Java+springboot招投标管理系统设计与实现基于springboot的在线招标系统的设计与实现【附源码、mysql、文档、调试+代码讲解+全bao等】
  • 2025 --【J+S 二十连测】-- 第十二套 总结+题解
  • Java计算机毕设之基于springboot的在线招标系统的设计与实现基于springboot招投标管理系统设计与实现(完整前后端代码+说明文档+LW,调试定制等)
  • 深入解析MySQL事务与锁:构建高并发数据系统的基石
  • android kotlinx.serialization用法和封装全解
  • 系统架构设计师教程资源合集
  • 什么是八股文?Java程序员春招如何提前储备?拿高薪offer?
  • 创新点解读:基于贝叶斯优化PatchTST的时间序列预测算法(附代码实现)
  • 【毕业设计】基于springboot的幼儿园管理系统的设计与实现(源码+文档+远程调试,全bao定制等)
  • AI伦理风险防控与治理体系构建 守护技术向善之路
  • AI应用架构师如何实现高效的上下文理解增强方案?
  • 吐血整理!儿童鞋服宝藏品牌大盘点 - 品牌测评鉴赏家
  • 创新点解读:基于非线性二次分解的Ridge-RF-XGBoost时间序列预测(附代码实现)
  • 大模型微调资源合集
  • 【毕业设计】基于springboot的在线招标系统的设计与实现(源码+文档+远程调试,全bao定制等)
  • I/O多路复用
  • 基于CNN(卷积神经网路)-BiLSTM(双向长短期记忆网络)-Attention(注意力机制)的时间序列预测python代码
  • Vue.js:轻量高效的渐进式前端框架,为何成为开发者首选?
  • EI顶刊复现:基于氨储能技术的电转氨耦合风–光–火综合能源系统双层优化调度附Matlab代码
  • 生成式AI重构内容生态 人机协同定义创作新范式
  • 【课程设计/毕业设计】基于springboot+vue的在线招标系统的设计与实现基于springboot电子招投标系统【附源码、数据库、万字文档】
  • 2025.12.21博客
  • 实用指南:【threejs】材质共享导致的典型问题
  • 2025年儿童鞋服品牌前十名揭晓!哪些品牌靠科技与口碑征服家长? - 品牌测评鉴赏家
  • Vue.js从入门到实战:一站式学习指南
  • 深入解析:NewStar CTF 2025公开赛道-web题目-week4-writeup