From 25c6d76311326eec641ed60cb12c9b03f1cc2795 Mon Sep 17 00:00:00 2001 From: wyu0-0 Date: Tue, 22 Jul 2025 19:13:52 +0800 Subject: [PATCH 1/6] refactor fused_experts_with_mc2 --- vllm_ascend/ops/fused_moe.py | 382 +++++++++++++++++++++++------------ 1 file changed, 252 insertions(+), 130 deletions(-) diff --git a/vllm_ascend/ops/fused_moe.py b/vllm_ascend/ops/fused_moe.py index 37edb9767a1..f019bb59c13 100644 --- a/vllm_ascend/ops/fused_moe.py +++ b/vllm_ascend/ops/fused_moe.py @@ -140,153 +140,275 @@ def fused_experts_with_mc2( is_torchair: bool = False, mc2_mask: Optional[torch.Tensor] = None, ) -> Union[torch.Tensor, Tuple[torch.Tensor, torch.Tensor]]: - quant_mode = 0 - ep_group = get_mc2_group() - ep_rank_id = ep_group.rank_in_group - ep_world_size = ep_group.world_size - tp_world_size = get_tp_group().world_size + """Fused MoE implementation with MC2 dispatch/combine + + Args: + hidden_states: Input tensor of shape [num_tokens, hidden_size] + w1: Expert gate-up weights of shape [num_experts, hidden_size, intermediate_size] + w2: Expert down weights of shape [num_experts, intermediate_size, hidden_size] + topk_weights: Selected expert weights of shape [num_tokens, top_k] + topk_ids: Selected expert indices of shape [num_tokens, top_k] + expert_map: Expert mapping tensor + moe_all_to_all_group_name: Communication group name + shared_experts: Optional shared experts module + is_torchair: Whether running in TorchAIR mode + mc2_mask: Optional mask tensor for A3 optimization + + Returns: + Union[torch.Tensor, Tuple[torch.Tensor, torch.Tensor]]: Output tensor(s) + """ # NOTE: `global_bs` should be equal to `max_num_tokens_across_dp` * `ep_world_size`, # and `max_num_tokens_across_dp` has been split into `tp_world_size` parts before. - global_bs = ( - math.ceil(get_forward_context().max_tokens_across_dp / tp_world_size) * - ep_world_size) + def _calculate_global_bs(tp_size: int, ep_size: int) -> int: + """Calculate global batch size for MoE distribution""" + max_tokens_dp = get_forward_context().max_tokens_across_dp + return math.ceil(max_tokens_dp / tp_size) * ep_size + + def _build_dispatch_kwargs( + enable_v2: bool, + need_extra: bool, + a3: bool, + mc2_mask: Optional[torch.Tensor], + x: torch.Tensor, + expert_ids: torch.Tensor, + moe_expert_num: int, + moe_all_to_all_group_name: Optional[str], + ep_world_size: int, + ep_rank_id: int, + global_bs: int + ) -> dict: + quant_mode = 0 + """Construct kwargs for MoE dispatch operation""" + base_kwargs = { + "x": x, + "expert_ids": expert_ids, + "expert_shard_type": 0, + "shared_expert_rank_num": 0, + "moe_expert_num": moe_expert_num, + "global_bs": global_bs, + "scales": None, + "quant_mode": quant_mode, + "group_ep": moe_all_to_all_group_name, + "ep_world_size": ep_world_size, + "ep_rank_id": ep_rank_id, + } - # NOTE: Currently, when in A3 or in torchair graph, we need to pass in some extra param into dispatch & combine - need_extra_args = get_ascend_soc_version( - ) == AscendSocVersion.A3 or is_torchair + if need_extra: + base_kwargs.update({ + "group_tp": moe_all_to_all_group_name, + "tp_world_size": 1, + "tp_rank_id": 0, + }) - # NOTE: Currently, when in A3, we need to pass in some extra param into dispatch & combine - a3_need_extra_args = get_ascend_soc_version() == AscendSocVersion.A3 + if a3 and enable_v2: + base_kwargs["x_active_mask"] = mc2_mask - enable_dispatch_v2 = hasattr(torch_npu, "npu_moe_distribute_dispatch_v2") + return base_kwargs - moe_expert_num = len(expert_map) - kwargs_mc2 = { - "x": hidden_states, - "expert_ids": topk_ids, - "expert_shard_type": 0, - "shared_expert_rank_num": 0, - "moe_expert_num": moe_expert_num, - "global_bs": global_bs, - } - - stage1_kwargs = { - "scales": None, - "quant_mode": quant_mode, - "group_ep": moe_all_to_all_group_name, - "ep_world_size": ep_world_size, - "ep_rank_id": ep_rank_id, - } - if need_extra_args: - stage1_kwargs.update({ - "group_tp": moe_all_to_all_group_name, - "tp_world_size": 1, - "tp_rank_id": 0, - }) - if a3_need_extra_args and enable_dispatch_v2: - stage1_kwargs.update({ - "x_active_mask": mc2_mask, - }) - - kwargs_mc2.update(stage1_kwargs) - - output = torch_npu.npu_moe_distribute_dispatch_v2( - **kwargs_mc2 - ) if enable_dispatch_v2 else torch_npu.npu_moe_distribute_dispatch( - **kwargs_mc2) - # comm_stream.wait_stream(torch.npu.current_stream()) - expand_x, dynamic_scale, assist_info_for_combine, expert_token_nums, ep_recv_counts = output[ - 0:5] - - if shared_experts is not None: + def _process_shared_experts( + experts: Any, + h_states: torch.Tensor, + weights: torch.Tensor, + expand_x: torch.Tensor + ) -> torch.Tensor: + """Compute shared expert activations in secondary stream""" with npu_stream_switch("moe_secondary", 0): - npu_wait_tensor(hidden_states, topk_weights) - shared_gate_up, _ = shared_experts.gate_up_proj(hidden_states) + npu_wait_tensor(h_states, weights) + shared_gate_up, _ = experts.gate_up_proj(h_states) npu_wait_tensor(shared_gate_up, expand_x) - shared_act = shared_experts.act_fn(shared_gate_up) + return experts.act_fn(shared_gate_up) - w1 = w1.transpose(1, 2) + def _expert_forward( + expand_x: torch.Tensor, + w1: torch.Tensor, + w2: torch.Tensor, + expert_token_nums: torch.Tensor + ) -> torch.Tensor: + """Execute expert forward computation (gate_up -> SwiGLU -> down)""" + # Gate-up projection + w1 = w1.transpose(1, 2) + group_list = expert_token_nums.to(torch.int64) + gate_up_out_list = torch_npu.npu_grouped_matmul( + x=[expand_x], + weight=[w1], + split_item=2, + # 1 means count mode, to avoid cumulative operation of the group list + group_list_type=1, + group_type=0, + group_list=group_list, + ) + # TODO: Remove this in the future. + gate_up_out = torch.cat(gate_up_out_list, dim=0) + gate_up_out = torch_npu.npu_swiglu(gate_up_out) + + # Down projection + w2 = w2.transpose(1, 2) + down_list = torch_npu.npu_grouped_matmul( + x=[gate_up_out], + weight=[w2], + split_item=2, + group_list_type=1, + group_type=0, + group_list=group_list, + ) + return torch.cat(down_list, dim=0) + + def _build_combine_kwargs( + enable_v2: bool, + need_extra: bool, + a3: bool, + mc2_mask: Optional[torch.Tensor], + x: torch.Tensor, + topk_ids: torch.Tensor, + topk_weights: torch.Tensor, + moe_expert_num: int, + group_name: Optional[str], + ep_world_size: int, + ep_rank_id: int, + ep_recv_counts: torch.Tensor, + tp_recv_counts: torch.Tensor, + assist_info: Any, + global_bs: int + ) -> dict: + """Construct kwargs for MoE combine operation""" + base_kwargs = { + "expand_x": x, + "expert_ids": topk_ids, + "expert_scales": topk_weights.to(torch.float32), + "expert_shard_type": 0, + "shared_expert_rank_num": 0, + "moe_expert_num": moe_expert_num, + "global_bs": global_bs, + "ep_send_counts": ep_recv_counts, + "group_ep": group_name, + "ep_world_size": ep_world_size, + "ep_rank_id": ep_rank_id, + } - group_list = expert_token_nums.to(torch.int64) - gate_up_out_list = torch_npu.npu_grouped_matmul( - x=[expand_x], - weight=[w1], - split_item=2, - # 1 means count mode, to avoid cumulative operation of the group list - group_list_type=1, - group_type=0, - group_list=group_list, - ) + if enable_v2: + base_kwargs["assist_info_for_combine"] = assist_info + else: + base_kwargs["expand_idx"] = assist_info + + if need_extra: + base_kwargs.update({ + "tp_send_counts": tp_recv_counts, + "group_tp": group_name, + "tp_world_size": 1, + "tp_rank_id": 0, + }) + if a3 and enable_v2: + base_kwargs["x_active_mask"] = mc2_mask + + return base_kwargs + + def _prepare_return( + hidden_states: torch.Tensor, + shared_experts: Optional[Any], + shared_act: Optional[torch.Tensor], + down_out_list: torch.Tensor + ) -> Union[torch.Tensor, Tuple[torch.Tensor, torch.Tensor]]: + """Construct final output with optional shared experts""" + if shared_experts is None: + return hidden_states - # TODO: Remove this in the future. - gate_up_out = torch.cat(gate_up_out_list, dim=0) - gate_up_out = torch_npu.npu_swiglu(gate_up_out) + with npu_stream_switch("moe_secondary", 0): + npu_wait_tensor(shared_act, down_out_list) # Correct sync point + shared_hidden_states, _ = shared_experts.down_proj(shared_act) + return hidden_states, shared_hidden_states - w2 = w2.transpose(1, 2) - down_out_list = torch_npu.npu_grouped_matmul( - x=[gate_up_out], - weight=[w2], - split_item=2, - group_list_type=1, - group_type=0, - group_list=group_list, + # 1. Configuration setup + ep_group = get_mc2_group() + ep_rank_id = ep_group.rank_in_group + ep_world_size = ep_group.world_size + tp_world_size = get_tp_group().world_size + moe_expert_num = len(expert_map) + + # 2. Global batch size calculation + global_bs = _calculate_global_bs(tp_world_size, ep_world_size) + + # 3. Environment flags + # NOTE: Currently, when in A3, we need to pass in some extra param into dispatch & combine + # NOTE: Currently, when in A3 or in torchair graph, we need to pass in some extra param into dispatch & combine + a3_enabled = get_ascend_soc_version() == AscendSocVersion.A3 + need_extra_params = a3_enabled or is_torchair + enable_dispatch_v2 = hasattr(torch_npu, "npu_moe_distribute_dispatch_v2") + + # 4. MoE dispatch phase + dispatch_kwargs = _build_dispatch_kwargs( + enable_v2=enable_dispatch_v2, + need_extra=need_extra_params, + a3=a3_enabled, + mc2_mask=mc2_mask, + x=hidden_states, + expert_ids=topk_ids, + moe_expert_num=moe_expert_num, + moe_all_to_all_group_name=moe_all_to_all_group_name, + ep_world_size=ep_world_size, + ep_rank_id=ep_rank_id, + global_bs=global_bs ) - down_out_list = torch.cat(down_out_list, dim=0) + dispatch_func = ( + torch_npu.npu_moe_distribute_dispatch_v2 + if enable_dispatch_v2 + else torch_npu.npu_moe_distribute_dispatch + ) + dispatch_output = dispatch_func(**dispatch_kwargs) + expand_x, _, assist_info, expert_token_nums, ep_recv_counts = dispatch_output[:5] - # moeCombine - kwargs_mc2 = { - "expand_x": down_out_list, - "expert_ids": topk_ids, - "expert_scales": topk_weights.to(torch.float32), - "expert_shard_type": 0, - "shared_expert_rank_num": 0, - "moe_expert_num": moe_expert_num, - "global_bs": global_bs, - } - tp_recv_counts = output[5] - stage3_kwargs = { - "ep_send_counts": ep_recv_counts, - "group_ep": moe_all_to_all_group_name, - "ep_world_size": ep_world_size, - "ep_rank_id": ep_rank_id, - } - if enable_dispatch_v2: - stage3_kwargs.update({ - "assist_info_for_combine": - assist_info_for_combine, - }) - else: - stage3_kwargs.update({ - "expand_idx": assist_info_for_combine, - }) - if need_extra_args: - stage3_kwargs.update({ - "tp_send_counts": tp_recv_counts, - "group_tp": moe_all_to_all_group_name, - "tp_world_size": 1, - "tp_rank_id": 0, - }) - if a3_need_extra_args and enable_dispatch_v2: - stage3_kwargs.update({ - "x_active_mask": mc2_mask, - }) - kwargs_mc2.update(stage3_kwargs) - - hidden_states = torch_npu.npu_moe_distribute_combine_v2( - **kwargs_mc2 - ) if enable_dispatch_v2 else torch_npu.npu_moe_distribute_combine( - **kwargs_mc2) - - if shared_experts is None: - return hidden_states - else: - with npu_stream_switch("moe_secondary", 0): - npu_wait_tensor(shared_act, down_out_list) - shared_hidden_states, _ = shared_experts.down_proj(shared_act) - return hidden_states, shared_hidden_states + # 5. Shared experts computation (if any) + shared_act = None + if shared_experts is not None: + shared_act = _process_shared_experts( + experts=shared_experts, + h_states=hidden_states, + weights=topk_weights, + expand_x=expand_x + ) + + # 6. Expert forward computation + down_out = _expert_forward( + expand_x=expand_x, + w1=w1, + w2=w2, + expert_token_nums=expert_token_nums + ) + # 7. MoE combine phase + combine_kwargs = _build_combine_kwargs( + enable_v2=enable_dispatch_v2, + need_extra=need_extra_params, + a3=a3_enabled, + mc2_mask=mc2_mask, + x=down_out, + topk_ids=topk_ids, + topk_weights=topk_weights, + moe_expert_num=moe_expert_num, + group_name=moe_all_to_all_group_name, + ep_world_size=ep_world_size, + ep_rank_id=ep_rank_id, + ep_recv_counts=ep_recv_counts, + tp_recv_counts=dispatch_output[5], + assist_info=assist_info, + global_bs=global_bs + ) + + combine_func = ( + torch_npu.npu_moe_distribute_combine_v2 + if enable_dispatch_v2 + else torch_npu.npu_moe_distribute_combine + ) + hidden_states = combine_func(**combine_kwargs) + + # 8. Final output preparation + return _prepare_return( + hidden_states=hidden_states, + shared_experts=shared_experts, + shared_act=shared_act, + down_out_list=down_out + ) def apply_mlp( hidden_states: torch.Tensor, From e846a841cd469438b5e1a89b40903484c7bf7ec3 Mon Sep 17 00:00:00 2001 From: wyu0-0 Date: Wed, 23 Jul 2025 14:58:11 +0800 Subject: [PATCH 2/6] single ctx final --- vllm_ascend/ascend_config.py | 2 + vllm_ascend/ops/fused_moe.py | 271 +++++++++++++++++------------------ 2 files changed, 137 insertions(+), 136 deletions(-) diff --git a/vllm_ascend/ascend_config.py b/vllm_ascend/ascend_config.py index 8ea67994ea2..6536eebb2c9 100644 --- a/vllm_ascend/ascend_config.py +++ b/vllm_ascend/ascend_config.py @@ -39,6 +39,8 @@ def __init__(self, vllm_config): self.expert_map_path = additional_config.get("expert_map_path", None) self.chunked_prefill_for_mla = additional_config.get( "chunked_prefill_for_mla", False) + self.fc_dual_batch = additional_config.get( + "fc_dual_batch", False) self.enable_weight_nz_layout = additional_config.get( "enable_weight_nz_layout", False) diff --git a/vllm_ascend/ops/fused_moe.py b/vllm_ascend/ops/fused_moe.py index f019bb59c13..2948178680e 100644 --- a/vllm_ascend/ops/fused_moe.py +++ b/vllm_ascend/ops/fused_moe.py @@ -43,6 +43,7 @@ from vllm_ascend.ascend_config import get_ascend_config from vllm_ascend.ascend_forward_context import FusedMoEState from vllm_ascend.distributed.parallel_state import get_mc2_group +from vllm_ascend.multistream.context import get_multistream_comm_context from vllm_ascend.ops.expert_load_balancer import ExpertLoadBalancer from vllm_ascend.ops.moe_dispatcher.token_dispatcher import ( MoEAlltoAllSeqOverLapDispatcher, MoEDispatcherConfig) @@ -158,51 +159,70 @@ def fused_experts_with_mc2( Union[torch.Tensor, Tuple[torch.Tensor, torch.Tensor]]: Output tensor(s) """ - # NOTE: `global_bs` should be equal to `max_num_tokens_across_dp` * `ep_world_size`, - # and `max_num_tokens_across_dp` has been split into `tp_world_size` parts before. - def _calculate_global_bs(tp_size: int, ep_size: int) -> int: - """Calculate global batch size for MoE distribution""" - max_tokens_dp = get_forward_context().max_tokens_across_dp - return math.ceil(max_tokens_dp / tp_size) * ep_size - + def _build_context() -> dict: + ep_group = get_mc2_group() + tp_group = get_tp_group() + forward_ctx = get_forward_context() + soc_version = get_ascend_soc_version() + + return { + # 基础配置 + 'expert_map': expert_map, + 'moe_all_to_all_group_name': moe_all_to_all_group_name, + 'is_torchair': is_torchair, + 'mc2_mask': mc2_mask, + + 'ep_group': ep_group, + 'ep_rank_id': ep_group.rank_in_group, + 'ep_world_size': ep_group.world_size, + 'tp_world_size': tp_group.world_size, + + # NOTE: `global_bs` should be equal to `max_num_tokens_across_dp` * `ep_world_size`, + # and `max_num_tokens_across_dp` has been split into `tp_world_size` parts before. + 'global_bs': math.ceil(forward_ctx.max_tokens_across_dp / tp_group.world_size) * ep_group.world_size, + 'moe_expert_num': len(expert_map), + + # 功能开关 + # NOTE: Currently, when in A3, we need to pass in some extra param into dispatch & combine + # NOTE: Currently, when in A3 or in torchair graph, we need to pass in some extra param into dispatch & combine + 'dual_move_enabled': get_ascend_config().fc_dual_batch, + 'enable_dispatch_v2': hasattr(torch_npu, "npu_moe_distribute_dispatch_v2"), + 'a3_enabled': soc_version == AscendSocVersion.A3, + 'need_extra_params': (soc_version == AscendSocVersion.A3) or is_torchair, + + # 流控制 + 'ms_metadata': get_multistream_comm_context(), + } def _build_dispatch_kwargs( - enable_v2: bool, - need_extra: bool, - a3: bool, - mc2_mask: Optional[torch.Tensor], - x: torch.Tensor, - expert_ids: torch.Tensor, - moe_expert_num: int, - moe_all_to_all_group_name: Optional[str], - ep_world_size: int, - ep_rank_id: int, - global_bs: int + hidden_states: torch.Tensor, + topk_ids: torch.Tensor, + ctx: dict ) -> dict: quant_mode = 0 """Construct kwargs for MoE dispatch operation""" base_kwargs = { - "x": x, - "expert_ids": expert_ids, + "x": hidden_states, + "expert_ids": topk_ids, "expert_shard_type": 0, "shared_expert_rank_num": 0, - "moe_expert_num": moe_expert_num, - "global_bs": global_bs, + "moe_expert_num": ctx['moe_expert_num'], + "global_bs": ctx['global_bs'], "scales": None, "quant_mode": quant_mode, - "group_ep": moe_all_to_all_group_name, - "ep_world_size": ep_world_size, - "ep_rank_id": ep_rank_id, + "group_ep": ctx['moe_all_to_all_group_name'], + "ep_world_size": ctx['ep_world_size'], + "ep_rank_id": ctx['ep_rank_id'], } - if need_extra: + if ctx['need_extra_params']: base_kwargs.update({ - "group_tp": moe_all_to_all_group_name, + "group_tp": ctx['moe_all_to_all_group_name'], "tp_world_size": 1, "tp_rank_id": 0, }) - if a3 and enable_v2: - base_kwargs["x_active_mask"] = mc2_mask + if ctx['a3_enabled'] and ctx['enable_dispatch_v2']: + base_kwargs["x_active_mask"] = ctx['mc2_mask'] return base_kwargs @@ -255,51 +275,43 @@ def _expert_forward( return torch.cat(down_list, dim=0) def _build_combine_kwargs( - enable_v2: bool, - need_extra: bool, - a3: bool, - mc2_mask: Optional[torch.Tensor], - x: torch.Tensor, topk_ids: torch.Tensor, topk_weights: torch.Tensor, - moe_expert_num: int, - group_name: Optional[str], - ep_world_size: int, - ep_rank_id: int, + down_out: torch.Tensor, ep_recv_counts: torch.Tensor, tp_recv_counts: torch.Tensor, assist_info: Any, - global_bs: int + ctx: dict ) -> dict: """Construct kwargs for MoE combine operation""" base_kwargs = { - "expand_x": x, + "expand_x": down_out, "expert_ids": topk_ids, "expert_scales": topk_weights.to(torch.float32), "expert_shard_type": 0, "shared_expert_rank_num": 0, - "moe_expert_num": moe_expert_num, - "global_bs": global_bs, + "moe_expert_num": ctx['moe_expert_num'], + "global_bs": ctx['global_bs'], "ep_send_counts": ep_recv_counts, - "group_ep": group_name, - "ep_world_size": ep_world_size, - "ep_rank_id": ep_rank_id, + "group_ep": ctx['moe_all_to_all_group_name'], + "ep_world_size": ctx['ep_world_size'], + "ep_rank_id": ctx['ep_rank_id'], } - if enable_v2: + if ctx['enable_dispatch_v2']: base_kwargs["assist_info_for_combine"] = assist_info else: base_kwargs["expand_idx"] = assist_info - if need_extra: + if ctx['need_extra_params']: base_kwargs.update({ "tp_send_counts": tp_recv_counts, - "group_tp": group_name, + "group_tp": ctx['moe_all_to_all_group_name'], "tp_world_size": 1, "tp_rank_id": 0, }) - if a3 and enable_v2: - base_kwargs["x_active_mask"] = mc2_mask + if ctx['a3_enabled'] and ctx['enable_dispatch_v2']: + base_kwargs["x_active_mask"] = ctx['mc2_mask'] return base_kwargs @@ -318,97 +330,84 @@ def _prepare_return( shared_hidden_states, _ = shared_experts.down_proj(shared_act) return hidden_states, shared_hidden_states - # 1. Configuration setup - ep_group = get_mc2_group() - ep_rank_id = ep_group.rank_in_group - ep_world_size = ep_group.world_size - tp_world_size = get_tp_group().world_size - moe_expert_num = len(expert_map) - - # 2. Global batch size calculation - global_bs = _calculate_global_bs(tp_world_size, ep_world_size) - - # 3. Environment flags - # NOTE: Currently, when in A3, we need to pass in some extra param into dispatch & combine - # NOTE: Currently, when in A3 or in torchair graph, we need to pass in some extra param into dispatch & combine - a3_enabled = get_ascend_soc_version() == AscendSocVersion.A3 - need_extra_params = a3_enabled or is_torchair - enable_dispatch_v2 = hasattr(torch_npu, "npu_moe_distribute_dispatch_v2") - - # 4. MoE dispatch phase - dispatch_kwargs = _build_dispatch_kwargs( - enable_v2=enable_dispatch_v2, - need_extra=need_extra_params, - a3=a3_enabled, - mc2_mask=mc2_mask, - x=hidden_states, - expert_ids=topk_ids, - moe_expert_num=moe_expert_num, - moe_all_to_all_group_name=moe_all_to_all_group_name, - ep_world_size=ep_world_size, - ep_rank_id=ep_rank_id, - global_bs=global_bs - ) + def _get_dispatch_func(ctx: dict): + if ctx['enable_dispatch_v2']: + return torch_npu.npu_moe_distribute_dispatch_v2 + else: + return torch_npu.npu_moe_distribute_dispatch - dispatch_func = ( - torch_npu.npu_moe_distribute_dispatch_v2 - if enable_dispatch_v2 - else torch_npu.npu_moe_distribute_dispatch - ) - dispatch_output = dispatch_func(**dispatch_kwargs) - expand_x, _, assist_info, expert_token_nums, ep_recv_counts = dispatch_output[:5] - - # 5. Shared experts computation (if any) - shared_act = None - if shared_experts is not None: - shared_act = _process_shared_experts( - experts=shared_experts, - h_states=hidden_states, - weights=topk_weights, - expand_x=expand_x + def _get_combine_func(ctx: dict): + if ctx['enable_dispatch_v2']: + return torch_npu.npu_moe_distribute_combine_v2 + else: + return torch_npu.npu_moe_distribute_combine + + def _single_stream_execution( + ctx: dict, + hidden_states: torch.Tensor, + topk_ids: torch.Tensor, + topk_weights: torch.Tensor, + w1: torch.Tensor, + w2: torch.Tensor, + shared_experts: Optional[Any] = None + ) -> Union[torch.Tensor, Tuple[torch.Tensor, torch.Tensor]]: + # MoE dispatch phase + dispatch_kwargs = _build_dispatch_kwargs(hidden_states,topk_ids, ctx) + dispatch_output = _get_dispatch_func(ctx)(**dispatch_kwargs) + expand_x, _, assist_info, expert_token_nums, ep_recv_counts = dispatch_output[:5] + # Shared experts computation (if any) + shared_act = None + if shared_experts is not None: + shared_act = _process_shared_experts( + experts=shared_experts, + h_states=hidden_states, + weights=topk_weights, + expand_x=expand_x + ) + # Expert forward computation + down_out = _expert_forward( + expand_x=expand_x, + w1=w1, + w2=w2, + expert_token_nums=expert_token_nums + ) + # MoE combine phase + combine_kwargs = _build_combine_kwargs( + topk_ids=topk_ids, + topk_weights=topk_weights, + down_out=down_out, + ep_recv_counts=ep_recv_counts, + tp_recv_counts=dispatch_output[5], + assist_info=assist_info, + ctx=ctx + ) + hidden_states = _get_combine_func(ctx)(**combine_kwargs) + # Final output preparation + return _prepare_return( + hidden_states=hidden_states, + shared_experts=shared_experts, + shared_act=shared_act, + down_out_list=down_out ) - # 6. Expert forward computation - down_out = _expert_forward( - expand_x=expand_x, - w1=w1, - w2=w2, - expert_token_nums=expert_token_nums - ) + ctx = _build_context() - # 7. MoE combine phase - combine_kwargs = _build_combine_kwargs( - enable_v2=enable_dispatch_v2, - need_extra=need_extra_params, - a3=a3_enabled, - mc2_mask=mc2_mask, - x=down_out, - topk_ids=topk_ids, - topk_weights=topk_weights, - moe_expert_num=moe_expert_num, - group_name=moe_all_to_all_group_name, - ep_world_size=ep_world_size, - ep_rank_id=ep_rank_id, - ep_recv_counts=ep_recv_counts, - tp_recv_counts=dispatch_output[5], - assist_info=assist_info, - global_bs=global_bs - ) + # 双流模式启用条件:配置开启且无共享专家且元数据可用 + use_dual_stream = (get_ascend_config().fc_dual_batch and + ctx['ms_metadata'] is not None and + shared_experts is None) - combine_func = ( - torch_npu.npu_moe_distribute_combine_v2 - if enable_dispatch_v2 - else torch_npu.npu_moe_distribute_combine - ) - hidden_states = combine_func(**combine_kwargs) - - # 8. Final output preparation - return _prepare_return( - hidden_states=hidden_states, - shared_experts=shared_experts, - shared_act=shared_act, - down_out_list=down_out - ) + if use_dual_stream: + return _dual_stream_execution() + else: + return _single_stream_execution( + ctx=ctx, + hidden_states=hidden_states, + topk_ids=topk_ids, + topk_weights=topk_weights, + w1=w1, + w2=w2, + shared_experts=shared_experts) def apply_mlp( hidden_states: torch.Tensor, From d844918280aa78ff6080ff1c51ebeb4a6a45b2b0 Mon Sep 17 00:00:00 2001 From: wyu0-0 Date: Fri, 25 Jul 2025 14:58:08 +0800 Subject: [PATCH 3/6] dual stream --- vllm_ascend/ops/fused_moe.py | 188 ++++++++++++++++++++++++++--------- 1 file changed, 143 insertions(+), 45 deletions(-) diff --git a/vllm_ascend/ops/fused_moe.py b/vllm_ascend/ops/fused_moe.py index 2948178680e..4abcd00d66f 100644 --- a/vllm_ascend/ops/fused_moe.py +++ b/vllm_ascend/ops/fused_moe.py @@ -43,7 +43,6 @@ from vllm_ascend.ascend_config import get_ascend_config from vllm_ascend.ascend_forward_context import FusedMoEState from vllm_ascend.distributed.parallel_state import get_mc2_group -from vllm_ascend.multistream.context import get_multistream_comm_context from vllm_ascend.ops.expert_load_balancer import ExpertLoadBalancer from vllm_ascend.ops.moe_dispatcher.token_dispatcher import ( MoEAlltoAllSeqOverLapDispatcher, MoEDispatcherConfig) @@ -180,7 +179,6 @@ def _build_context() -> dict: # NOTE: `global_bs` should be equal to `max_num_tokens_across_dp` * `ep_world_size`, # and `max_num_tokens_across_dp` has been split into `tp_world_size` parts before. 'global_bs': math.ceil(forward_ctx.max_tokens_across_dp / tp_group.world_size) * ep_group.world_size, - 'moe_expert_num': len(expert_map), # 功能开关 # NOTE: Currently, when in A3, we need to pass in some extra param into dispatch & combine @@ -189,42 +187,7 @@ def _build_context() -> dict: 'enable_dispatch_v2': hasattr(torch_npu, "npu_moe_distribute_dispatch_v2"), 'a3_enabled': soc_version == AscendSocVersion.A3, 'need_extra_params': (soc_version == AscendSocVersion.A3) or is_torchair, - - # 流控制 - 'ms_metadata': get_multistream_comm_context(), } - def _build_dispatch_kwargs( - hidden_states: torch.Tensor, - topk_ids: torch.Tensor, - ctx: dict - ) -> dict: - quant_mode = 0 - """Construct kwargs for MoE dispatch operation""" - base_kwargs = { - "x": hidden_states, - "expert_ids": topk_ids, - "expert_shard_type": 0, - "shared_expert_rank_num": 0, - "moe_expert_num": ctx['moe_expert_num'], - "global_bs": ctx['global_bs'], - "scales": None, - "quant_mode": quant_mode, - "group_ep": ctx['moe_all_to_all_group_name'], - "ep_world_size": ctx['ep_world_size'], - "ep_rank_id": ctx['ep_rank_id'], - } - - if ctx['need_extra_params']: - base_kwargs.update({ - "group_tp": ctx['moe_all_to_all_group_name'], - "tp_world_size": 1, - "tp_rank_id": 0, - }) - - if ctx['a3_enabled'] and ctx['enable_dispatch_v2']: - base_kwargs["x_active_mask"] = ctx['mc2_mask'] - - return base_kwargs def _process_shared_experts( experts: Any, @@ -247,7 +210,6 @@ def _expert_forward( ) -> torch.Tensor: """Execute expert forward computation (gate_up -> SwiGLU -> down)""" # Gate-up projection - w1 = w1.transpose(1, 2) group_list = expert_token_nums.to(torch.int64) gate_up_out_list = torch_npu.npu_grouped_matmul( x=[expand_x], @@ -263,7 +225,6 @@ def _expert_forward( gate_up_out = torch_npu.npu_swiglu(gate_up_out) # Down projection - w2 = w2.transpose(1, 2) down_list = torch_npu.npu_grouped_matmul( x=[gate_up_out], weight=[w2], @@ -274,9 +235,44 @@ def _expert_forward( ) return torch.cat(down_list, dim=0) + def _build_dispatch_kwargs( + hidden_states: torch.Tensor, + topk_ids: torch.Tensor, + moe_expert_num: int, + ctx: dict + ) -> dict: + quant_mode = 0 + """Construct kwargs for MoE dispatch operation""" + base_kwargs = { + "x": hidden_states, + "expert_ids": topk_ids, + "expert_shard_type": 0, + "shared_expert_rank_num": 0, + "moe_expert_num": moe_expert_num, + "global_bs": ctx['global_bs'], + "scales": None, + "quant_mode": quant_mode, + "group_ep": ctx['moe_all_to_all_group_name'], + "ep_world_size": ctx['ep_world_size'], + "ep_rank_id": ctx['ep_rank_id'], + } + + if ctx['need_extra_params']: + base_kwargs.update({ + "group_tp": ctx['moe_all_to_all_group_name'], + "tp_world_size": 1, + "tp_rank_id": 0, + }) + + if ctx['a3_enabled'] and ctx['enable_dispatch_v2']: + base_kwargs["x_active_mask"] = ctx['mc2_mask'] + + return base_kwargs + def _build_combine_kwargs( topk_ids: torch.Tensor, topk_weights: torch.Tensor, + moe_expert_num: int, down_out: torch.Tensor, ep_recv_counts: torch.Tensor, tp_recv_counts: torch.Tensor, @@ -290,7 +286,7 @@ def _build_combine_kwargs( "expert_scales": topk_weights.to(torch.float32), "expert_shard_type": 0, "shared_expert_rank_num": 0, - "moe_expert_num": ctx['moe_expert_num'], + "moe_expert_num": moe_expert_num, "global_bs": ctx['global_bs'], "ep_send_counts": ep_recv_counts, "group_ep": ctx['moe_all_to_all_group_name'], @@ -349,10 +345,11 @@ def _single_stream_execution( topk_weights: torch.Tensor, w1: torch.Tensor, w2: torch.Tensor, + moe_expert_num: int, shared_experts: Optional[Any] = None ) -> Union[torch.Tensor, Tuple[torch.Tensor, torch.Tensor]]: # MoE dispatch phase - dispatch_kwargs = _build_dispatch_kwargs(hidden_states,topk_ids, ctx) + dispatch_kwargs = _build_dispatch_kwargs(hidden_states,topk_ids, moe_expert_num, ctx) dispatch_output = _get_dispatch_func(ctx)(**dispatch_kwargs) expand_x, _, assist_info, expert_token_nums, ep_recv_counts = dispatch_output[:5] # Shared experts computation (if any) @@ -365,16 +362,19 @@ def _single_stream_execution( expand_x=expand_x ) # Expert forward computation + w1_t = w1.transpose(1, 2) + w2_t = w2.transpose(1, 2) down_out = _expert_forward( expand_x=expand_x, - w1=w1, - w2=w2, + w1=w1_t, + w2=w2_t, expert_token_nums=expert_token_nums ) # MoE combine phase combine_kwargs = _build_combine_kwargs( topk_ids=topk_ids, topk_weights=topk_weights, + moe_expert_num=moe_expert_num, down_out=down_out, ep_recv_counts=ep_recv_counts, tp_recv_counts=dispatch_output[5], @@ -390,15 +390,112 @@ def _single_stream_execution( down_out_list=down_out ) + def _dual_stream_execution( + ctx: dict, + hidden_states: torch.Tensor, + topk_ids: torch.Tensor, + topk_weights: torch.Tensor, + moe_expert_num: int, + w1: torch.Tensor, + w2: torch.Tensor + ) -> Union[torch.Tensor, Tuple[torch.Tensor, torch.Tensor]]: + """双流并行执行路径(无共享专家)""" + # 专家分组(关键:专家索引切分) + split_idx = moe_expert_num // 2 + expert_groupA_size = split_idx + expert_groupB_size = moe_expert_num - split_idx + + # 专家权重切分 + w1_groupA = w1[:split_idx] + w1_groupB = w1[split_idx:] + w2_groupA = w2[:split_idx] + w2_groupB = w2[split_idx:] + + # 流A维持原始topk_ids + topk_ids_A = topk_ids + + # 流B重映射专家ID:将原始ID [expert_groupA_size, moe_expert_num-1] 映射到 [0, expert_groupB_size-1] + # 将 [0, expert_groupA_size-1] 映射到无效值 -1 + topk_ids_B = topk_ids.clone() + # 创建掩码:标识哪些token分配给专家组B + maskB = (topk_ids_B >= expert_groupA_size) & (topk_ids_B < moe_expert_num) + # 重映射专家ID + topk_ids_B[maskB] = topk_ids_B[maskB] - expert_groupA_size + # 将不属于专家组B的分配标记为无效 + topk_ids_B[~maskB] = -1 + + # 阶段1: 权重转置(主线程准备) + w1_t_A = w1_groupA.transpose(1, 2) + w2_t_A = w2_groupA.transpose(1, 2) + w1_t_B = w1_groupB.transpose(1, 2) + w2_t_B = w2_groupB.transpose(1, 2) + + # 阶段2: 流A执行Dispatch A + with torch_npu.npu.npu_stream_switch("streamA", 0): + kwargs = _build_dispatch_kwargs(hidden_states,topk_ids_A, expert_groupA_size, ctx) + dispatchA_result = _get_dispatch_func(ctx)(**kwargs) + expand_xA, _, assist_infoA, expert_tokensA, ep_recvA = dispatchA_result[:5] + + # 阶段3: 流A执行Compute A + 流B执行Dispatch B(并行) + # 流A: Compute A + down_outA = None + with torch_npu.npu.npu_stream_switch("streamA", 0): + # 确保Dispatch A完成 + torch_npu.npu.npu_wait_tensor(down_outA, expand_xA) + down_outA = _expert_forward(expand_xA, w1_t_A, w2_t_A, expert_tokensA) + + # 流B: Dispatch B(依赖Dispatch A完成) + dispatchB_result = None + with torch_npu.npu.npu_stream_switch("streamB", 0): + # 等待Dispatch A完成 + torch_npu.npu.npu_wait_tensor(dispatchB_result, expand_xA) + kwargs = _build_dispatch_kwargs(hidden_states,topk_ids_B, expert_groupB_size, ctx) + dispatchB_result = _get_dispatch_func(ctx)(**kwargs) + expand_xB, _, assist_infoB, expert_tokensB, ep_recvB = dispatchB_result[:5] + + # 阶段4: 流A执行Combine A + 流B执行Compute B(并行) + # 流A: Combine A(依赖Compute A完成) + resultA = None + with torch_npu.npu.npu_stream_switch("streamA", 0): + # 确保Compute A完成 + torch_npu.npu.npu_wait_tensor(resultA, down_outA) + kwargs = _build_combine_kwargs(topk_ids_A, topk_weights, expert_groupA_size, down_outA, ep_recvA, dispatchA_result[5], assist_infoA, ctx) + resultA = _get_combine_func(ctx)(**kwargs) + + # 流B: Compute B(依赖Dispatch B完成) + down_outB = None + with torch_npu.npu.npu_stream_switch("streamB", 0): + # 确保Dispatch B完成 + torch_npu.npu.npu_wait_tensor(down_outB, expand_xB) + down_outB = _expert_forward(expand_xB, w1_t_B, w2_t_B, expert_tokensB) + + # 阶段5: 流B执行Combine B(依赖Compute B完成) + resultB = None + with torch_npu.npu.npu_stream_switch("streamB", 0): + # 确保Compute B完成 + torch_npu.npu.npu_wait_tensor(resultB, down_outB) + kwargs = _build_combine_kwargs(topk_ids_B, topk_weights, expert_groupB_size, down_outB, ep_recvB, dispatchB_result[5], assist_infoB) + resultB = _get_combine_func(ctx)(**kwargs) + + # 合并结果 + torch_npu.npu.npu_wait_tensor(resultA, resultB) + return resultA + resultB + ctx = _build_context() # 双流模式启用条件:配置开启且无共享专家且元数据可用 use_dual_stream = (get_ascend_config().fc_dual_batch and - ctx['ms_metadata'] is not None and shared_experts is None) if use_dual_stream: - return _dual_stream_execution() + return _dual_stream_execution( + ctx=ctx, + hidden_states=hidden_states, + topk_ids=topk_ids, + topk_weights=topk_weights, + moe_expert_num=len(expert_map), + w1=w1, + w2=w2) else: return _single_stream_execution( ctx=ctx, @@ -407,6 +504,7 @@ def _single_stream_execution( topk_weights=topk_weights, w1=w1, w2=w2, + moe_expert_num=len(expert_map), shared_experts=shared_experts) def apply_mlp( From 0757d65003f045de4fd5f41c268d9041cf4148ab Mon Sep 17 00:00:00 2001 From: wyu0-0 Date: Fri, 25 Jul 2025 17:29:55 +0800 Subject: [PATCH 4/6] split expert --- vllm_ascend/ops/fused_moe.py | 35 ++++++++++------------------------- 1 file changed, 10 insertions(+), 25 deletions(-) diff --git a/vllm_ascend/ops/fused_moe.py b/vllm_ascend/ops/fused_moe.py index 4abcd00d66f..6ce49ef3fed 100644 --- a/vllm_ascend/ops/fused_moe.py +++ b/vllm_ascend/ops/fused_moe.py @@ -406,29 +406,14 @@ def _dual_stream_execution( expert_groupB_size = moe_expert_num - split_idx # 专家权重切分 - w1_groupA = w1[:split_idx] - w1_groupB = w1[split_idx:] - w2_groupA = w2[:split_idx] - w2_groupB = w2[split_idx:] - - # 流A维持原始topk_ids - topk_ids_A = topk_ids - - # 流B重映射专家ID:将原始ID [expert_groupA_size, moe_expert_num-1] 映射到 [0, expert_groupB_size-1] - # 将 [0, expert_groupA_size-1] 映射到无效值 -1 - topk_ids_B = topk_ids.clone() - # 创建掩码:标识哪些token分配给专家组B - maskB = (topk_ids_B >= expert_groupA_size) & (topk_ids_B < moe_expert_num) - # 重映射专家ID - topk_ids_B[maskB] = topk_ids_B[maskB] - expert_groupA_size - # 将不属于专家组B的分配标记为无效 - topk_ids_B[~maskB] = -1 + topk_ids_A = topk_ids[:split_idx] + topk_ids_B = topk_ids[split_idx:] + topk_weights_A = topk_weights[:split_idx] + topk_weights_B = topk_weights[split_idx:] # 阶段1: 权重转置(主线程准备) - w1_t_A = w1_groupA.transpose(1, 2) - w2_t_A = w2_groupA.transpose(1, 2) - w1_t_B = w1_groupB.transpose(1, 2) - w2_t_B = w2_groupB.transpose(1, 2) + w1_t = w1.transpose(1, 2) + w2_t = w2.transpose(1, 2) # 阶段2: 流A执行Dispatch A with torch_npu.npu.npu_stream_switch("streamA", 0): @@ -442,7 +427,7 @@ def _dual_stream_execution( with torch_npu.npu.npu_stream_switch("streamA", 0): # 确保Dispatch A完成 torch_npu.npu.npu_wait_tensor(down_outA, expand_xA) - down_outA = _expert_forward(expand_xA, w1_t_A, w2_t_A, expert_tokensA) + down_outA = _expert_forward(expand_xA, w1_t, w2_t, expert_tokensA) # 流B: Dispatch B(依赖Dispatch A完成) dispatchB_result = None @@ -459,7 +444,7 @@ def _dual_stream_execution( with torch_npu.npu.npu_stream_switch("streamA", 0): # 确保Compute A完成 torch_npu.npu.npu_wait_tensor(resultA, down_outA) - kwargs = _build_combine_kwargs(topk_ids_A, topk_weights, expert_groupA_size, down_outA, ep_recvA, dispatchA_result[5], assist_infoA, ctx) + kwargs = _build_combine_kwargs(topk_ids_A, topk_weights_A, expert_groupA_size, down_outA, ep_recvA, dispatchA_result[5], assist_infoA, ctx) resultA = _get_combine_func(ctx)(**kwargs) # 流B: Compute B(依赖Dispatch B完成) @@ -467,14 +452,14 @@ def _dual_stream_execution( with torch_npu.npu.npu_stream_switch("streamB", 0): # 确保Dispatch B完成 torch_npu.npu.npu_wait_tensor(down_outB, expand_xB) - down_outB = _expert_forward(expand_xB, w1_t_B, w2_t_B, expert_tokensB) + down_outB = _expert_forward(expand_xB, w1_t, w2_t, expert_tokensB) # 阶段5: 流B执行Combine B(依赖Compute B完成) resultB = None with torch_npu.npu.npu_stream_switch("streamB", 0): # 确保Compute B完成 torch_npu.npu.npu_wait_tensor(resultB, down_outB) - kwargs = _build_combine_kwargs(topk_ids_B, topk_weights, expert_groupB_size, down_outB, ep_recvB, dispatchB_result[5], assist_infoB) + kwargs = _build_combine_kwargs(topk_ids_B, topk_weights_B, expert_groupB_size, down_outB, ep_recvB, dispatchB_result[5], assist_infoB) resultB = _get_combine_func(ctx)(**kwargs) # 合并结果 From 2be0e871b76aea0ad2fea323025cfddfc2590bbe Mon Sep 17 00:00:00 2001 From: wyu0-0 Date: Wed, 30 Jul 2025 10:44:22 +0800 Subject: [PATCH 5/6] debug --- vllm_ascend/ops/fused_moe.py | 47 +++++++++++++++++++----------------- 1 file changed, 25 insertions(+), 22 deletions(-) diff --git a/vllm_ascend/ops/fused_moe.py b/vllm_ascend/ops/fused_moe.py index 6ce49ef3fed..5b3a4160a0c 100644 --- a/vllm_ascend/ops/fused_moe.py +++ b/vllm_ascend/ops/fused_moe.py @@ -24,6 +24,9 @@ import torch_npu from torch import nn from transformers import PretrainedConfig + +from vllm.logger import logger + from vllm.attention import AttentionMetadata from vllm.config import get_current_vllm_config from vllm.distributed import (GroupCoordinator, get_tensor_model_parallel_rank, @@ -416,24 +419,23 @@ def _dual_stream_execution( w2_t = w2.transpose(1, 2) # 阶段2: 流A执行Dispatch A - with torch_npu.npu.npu_stream_switch("streamA", 0): - kwargs = _build_dispatch_kwargs(hidden_states,topk_ids_A, expert_groupA_size, ctx) - dispatchA_result = _get_dispatch_func(ctx)(**kwargs) - expand_xA, _, assist_infoA, expert_tokensA, ep_recvA = dispatchA_result[:5] + # with npu_stream_switch("streamA", 0): + kwargs = _build_dispatch_kwargs(hidden_states,topk_ids_A, expert_groupA_size, ctx) + dispatchA_result = _get_dispatch_func(ctx)(**kwargs) + expand_xA, _, assist_infoA, expert_tokensA, ep_recvA = dispatchA_result[:5] # 阶段3: 流A执行Compute A + 流B执行Dispatch B(并行) # 流A: Compute A down_outA = None - with torch_npu.npu.npu_stream_switch("streamA", 0): - # 确保Dispatch A完成 - torch_npu.npu.npu_wait_tensor(down_outA, expand_xA) - down_outA = _expert_forward(expand_xA, w1_t, w2_t, expert_tokensA) - + # with npu_stream_switch("streamA", 0): + # 确保Dispatch A完成 + # npu_wait_tensor(down_outA, expand_xA) + down_outA = _expert_forward(expand_xA, w1_t, w2_t, expert_tokensA) # 流B: Dispatch B(依赖Dispatch A完成) dispatchB_result = None - with torch_npu.npu.npu_stream_switch("streamB", 0): + with npu_stream_switch("moe_secondary", 0): # 等待Dispatch A完成 - torch_npu.npu.npu_wait_tensor(dispatchB_result, expand_xA) + npu_wait_tensor(dispatchB_result, expand_xA) kwargs = _build_dispatch_kwargs(hidden_states,topk_ids_B, expert_groupB_size, ctx) dispatchB_result = _get_dispatch_func(ctx)(**kwargs) expand_xB, _, assist_infoB, expert_tokensB, ep_recvB = dispatchB_result[:5] @@ -441,29 +443,28 @@ def _dual_stream_execution( # 阶段4: 流A执行Combine A + 流B执行Compute B(并行) # 流A: Combine A(依赖Compute A完成) resultA = None - with torch_npu.npu.npu_stream_switch("streamA", 0): - # 确保Compute A完成 - torch_npu.npu.npu_wait_tensor(resultA, down_outA) - kwargs = _build_combine_kwargs(topk_ids_A, topk_weights_A, expert_groupA_size, down_outA, ep_recvA, dispatchA_result[5], assist_infoA, ctx) - resultA = _get_combine_func(ctx)(**kwargs) - + # with npu_stream_switch("streamA", 0): + # 确保Compute A完成 + # npu_wait_tensor(resultA, down_outA) + kwargs = _build_combine_kwargs(topk_ids_A, topk_weights_A, expert_groupA_size, down_outA, ep_recvA, dispatchA_result[5], assist_infoA, ctx) + resultA = _get_combine_func(ctx)(**kwargs) # 流B: Compute B(依赖Dispatch B完成) down_outB = None - with torch_npu.npu.npu_stream_switch("streamB", 0): + with npu_stream_switch("moe_secondary", 0): # 确保Dispatch B完成 - torch_npu.npu.npu_wait_tensor(down_outB, expand_xB) + npu_wait_tensor(down_outB, expand_xB) down_outB = _expert_forward(expand_xB, w1_t, w2_t, expert_tokensB) # 阶段5: 流B执行Combine B(依赖Compute B完成) resultB = None - with torch_npu.npu.npu_stream_switch("streamB", 0): + with npu_stream_switch("moe_secondary", 0): # 确保Compute B完成 - torch_npu.npu.npu_wait_tensor(resultB, down_outB) + npu_wait_tensor(resultB, down_outB) kwargs = _build_combine_kwargs(topk_ids_B, topk_weights_B, expert_groupB_size, down_outB, ep_recvB, dispatchB_result[5], assist_infoB) resultB = _get_combine_func(ctx)(**kwargs) # 合并结果 - torch_npu.npu.npu_wait_tensor(resultA, resultB) + npu_wait_tensor(resultA, resultB) return resultA + resultB ctx = _build_context() @@ -473,6 +474,7 @@ def _dual_stream_execution( shared_experts is None) if use_dual_stream: + logger.warning_once("=============vLLM is using dual stream") return _dual_stream_execution( ctx=ctx, hidden_states=hidden_states, @@ -482,6 +484,7 @@ def _dual_stream_execution( w1=w1, w2=w2) else: + logger.warning_once("=============vLLM is using single stream") return _single_stream_execution( ctx=ctx, hidden_states=hidden_states, From 9f8a730106b722c091701c4404f0e2b774268803 Mon Sep 17 00:00:00 2001 From: wyu0-0 Date: Mon, 11 Aug 2025 21:42:27 +0800 Subject: [PATCH 6/6] dual batch --- vllm_ascend/ops/fused_moe.py | 300 ++++++++++++++++++----------------- 1 file changed, 152 insertions(+), 148 deletions(-) diff --git a/vllm_ascend/ops/fused_moe.py b/vllm_ascend/ops/fused_moe.py index 5b3a4160a0c..d68026c552c 100644 --- a/vllm_ascend/ops/fused_moe.py +++ b/vllm_ascend/ops/fused_moe.py @@ -168,35 +168,32 @@ def _build_context() -> dict: soc_version = get_ascend_soc_version() return { - # 基础配置 - 'expert_map': expert_map, - 'moe_all_to_all_group_name': moe_all_to_all_group_name, - 'is_torchair': is_torchair, - 'mc2_mask': mc2_mask, - - 'ep_group': ep_group, - 'ep_rank_id': ep_group.rank_in_group, - 'ep_world_size': ep_group.world_size, - 'tp_world_size': tp_group.world_size, - + # basic config + "moe_all_to_all_group_name": moe_all_to_all_group_name, + "is_torchair": is_torchair, + "mc2_mask": mc2_mask, + "ep_rank_id": ep_group.rank_in_group, + "ep_world_size": ep_group.world_size, + "tp_world_size": tp_group.world_size, # NOTE: `global_bs` should be equal to `max_num_tokens_across_dp` * `ep_world_size`, # and `max_num_tokens_across_dp` has been split into `tp_world_size` parts before. - 'global_bs': math.ceil(forward_ctx.max_tokens_across_dp / tp_group.world_size) * ep_group.world_size, - - # 功能开关 + "global_bs": math.ceil( + forward_ctx.max_tokens_across_dp / tp_group.world_size + ) + * ep_group.world_size, + # feature switch # NOTE: Currently, when in A3, we need to pass in some extra param into dispatch & combine # NOTE: Currently, when in A3 or in torchair graph, we need to pass in some extra param into dispatch & combine - 'dual_move_enabled': get_ascend_config().fc_dual_batch, - 'enable_dispatch_v2': hasattr(torch_npu, "npu_moe_distribute_dispatch_v2"), - 'a3_enabled': soc_version == AscendSocVersion.A3, - 'need_extra_params': (soc_version == AscendSocVersion.A3) or is_torchair, + "enable_dispatch_v2": hasattr(torch_npu, "npu_moe_distribute_dispatch_v2"), + "a3_enabled": soc_version == AscendSocVersion.A3, + "need_extra_params": (soc_version == AscendSocVersion.A3) or is_torchair, } def _process_shared_experts( - experts: Any, - h_states: torch.Tensor, - weights: torch.Tensor, - expand_x: torch.Tensor + experts: Any, + h_states: torch.Tensor, + weights: torch.Tensor, + expand_x: torch.Tensor, ) -> torch.Tensor: """Compute shared expert activations in secondary stream""" with npu_stream_switch("moe_secondary", 0): @@ -206,10 +203,10 @@ def _process_shared_experts( return experts.act_fn(shared_gate_up) def _expert_forward( - expand_x: torch.Tensor, - w1: torch.Tensor, - w2: torch.Tensor, - expert_token_nums: torch.Tensor + expand_x: torch.Tensor, + w1: torch.Tensor, + w2: torch.Tensor, + expert_token_nums: torch.Tensor, ) -> torch.Tensor: """Execute expert forward computation (gate_up -> SwiGLU -> down)""" # Gate-up projection @@ -239,10 +236,10 @@ def _expert_forward( return torch.cat(down_list, dim=0) def _build_dispatch_kwargs( - hidden_states: torch.Tensor, - topk_ids: torch.Tensor, - moe_expert_num: int, - ctx: dict + hidden_states: torch.Tensor, + topk_ids: torch.Tensor, + moe_expert_num: int, + ctx: dict, ) -> dict: quant_mode = 0 """Construct kwargs for MoE dispatch operation""" @@ -252,35 +249,37 @@ def _build_dispatch_kwargs( "expert_shard_type": 0, "shared_expert_rank_num": 0, "moe_expert_num": moe_expert_num, - "global_bs": ctx['global_bs'], + "global_bs": ctx["global_bs"], "scales": None, "quant_mode": quant_mode, - "group_ep": ctx['moe_all_to_all_group_name'], - "ep_world_size": ctx['ep_world_size'], - "ep_rank_id": ctx['ep_rank_id'], + "group_ep": ctx["moe_all_to_all_group_name"], + "ep_world_size": ctx["ep_world_size"], + "ep_rank_id": ctx["ep_rank_id"], } - if ctx['need_extra_params']: - base_kwargs.update({ - "group_tp": ctx['moe_all_to_all_group_name'], - "tp_world_size": 1, - "tp_rank_id": 0, - }) + if ctx["need_extra_params"]: + base_kwargs.update( + { + "group_tp": ctx["moe_all_to_all_group_name"], + "tp_world_size": 1, + "tp_rank_id": 0, + } + ) - if ctx['a3_enabled'] and ctx['enable_dispatch_v2']: - base_kwargs["x_active_mask"] = ctx['mc2_mask'] + if ctx["a3_enabled"] and ctx["enable_dispatch_v2"]: + base_kwargs["x_active_mask"] = ctx["mc2_mask"] return base_kwargs def _build_combine_kwargs( - topk_ids: torch.Tensor, - topk_weights: torch.Tensor, - moe_expert_num: int, - down_out: torch.Tensor, - ep_recv_counts: torch.Tensor, - tp_recv_counts: torch.Tensor, - assist_info: Any, - ctx: dict + topk_ids: torch.Tensor, + topk_weights: torch.Tensor, + moe_expert_num: int, + down_out: torch.Tensor, + ep_recv_counts: torch.Tensor, + tp_recv_counts: torch.Tensor, + assist_info: Any, + ctx: dict, ) -> dict: """Construct kwargs for MoE combine operation""" base_kwargs = { @@ -290,71 +289,77 @@ def _build_combine_kwargs( "expert_shard_type": 0, "shared_expert_rank_num": 0, "moe_expert_num": moe_expert_num, - "global_bs": ctx['global_bs'], + "global_bs": ctx["global_bs"], "ep_send_counts": ep_recv_counts, - "group_ep": ctx['moe_all_to_all_group_name'], - "ep_world_size": ctx['ep_world_size'], - "ep_rank_id": ctx['ep_rank_id'], + "group_ep": ctx["moe_all_to_all_group_name"], + "ep_world_size": ctx["ep_world_size"], + "ep_rank_id": ctx["ep_rank_id"], } - if ctx['enable_dispatch_v2']: + if ctx["enable_dispatch_v2"]: base_kwargs["assist_info_for_combine"] = assist_info else: base_kwargs["expand_idx"] = assist_info - if ctx['need_extra_params']: - base_kwargs.update({ - "tp_send_counts": tp_recv_counts, - "group_tp": ctx['moe_all_to_all_group_name'], - "tp_world_size": 1, - "tp_rank_id": 0, - }) - if ctx['a3_enabled'] and ctx['enable_dispatch_v2']: - base_kwargs["x_active_mask"] = ctx['mc2_mask'] + if ctx["need_extra_params"]: + base_kwargs.update( + { + "tp_send_counts": tp_recv_counts, + "group_tp": ctx["moe_all_to_all_group_name"], + "tp_world_size": 1, + "tp_rank_id": 0, + } + ) + if ctx["a3_enabled"] and ctx["enable_dispatch_v2"]: + base_kwargs["x_active_mask"] = ctx["mc2_mask"] return base_kwargs def _prepare_return( - hidden_states: torch.Tensor, - shared_experts: Optional[Any], - shared_act: Optional[torch.Tensor], - down_out_list: torch.Tensor + hidden_states: torch.Tensor, + shared_experts: Optional[Any], + shared_act: Optional[torch.Tensor], + down_out_list: torch.Tensor, ) -> Union[torch.Tensor, Tuple[torch.Tensor, torch.Tensor]]: """Construct final output with optional shared experts""" if shared_experts is None: return hidden_states with npu_stream_switch("moe_secondary", 0): - npu_wait_tensor(shared_act, down_out_list) # Correct sync point + npu_wait_tensor(shared_act, down_out_list) shared_hidden_states, _ = shared_experts.down_proj(shared_act) return hidden_states, shared_hidden_states def _get_dispatch_func(ctx: dict): - if ctx['enable_dispatch_v2']: + if ctx["enable_dispatch_v2"]: return torch_npu.npu_moe_distribute_dispatch_v2 else: return torch_npu.npu_moe_distribute_dispatch def _get_combine_func(ctx: dict): - if ctx['enable_dispatch_v2']: + if ctx["enable_dispatch_v2"]: return torch_npu.npu_moe_distribute_combine_v2 else: return torch_npu.npu_moe_distribute_combine def _single_stream_execution( - ctx: dict, - hidden_states: torch.Tensor, - topk_ids: torch.Tensor, - topk_weights: torch.Tensor, - w1: torch.Tensor, - w2: torch.Tensor, - moe_expert_num: int, - shared_experts: Optional[Any] = None + ctx: dict, + hidden_states: torch.Tensor, + topk_ids: torch.Tensor, + topk_weights: torch.Tensor, + w1: torch.Tensor, + w2: torch.Tensor, + moe_expert_num: int, + shared_experts: Optional[Any] = None, ) -> Union[torch.Tensor, Tuple[torch.Tensor, torch.Tensor]]: # MoE dispatch phase - dispatch_kwargs = _build_dispatch_kwargs(hidden_states,topk_ids, moe_expert_num, ctx) + dispatch_kwargs = _build_dispatch_kwargs( + hidden_states, topk_ids, moe_expert_num, ctx + ) dispatch_output = _get_dispatch_func(ctx)(**dispatch_kwargs) - expand_x, _, assist_info, expert_token_nums, ep_recv_counts = dispatch_output[:5] + expand_x, _, assist_info, expert_token_nums, ep_recv_counts = dispatch_output[ + :5 + ] # Shared experts computation (if any) shared_act = None if shared_experts is not None: @@ -362,16 +367,13 @@ def _single_stream_execution( experts=shared_experts, h_states=hidden_states, weights=topk_weights, - expand_x=expand_x + expand_x=expand_x, ) # Expert forward computation w1_t = w1.transpose(1, 2) w2_t = w2.transpose(1, 2) down_out = _expert_forward( - expand_x=expand_x, - w1=w1_t, - w2=w2_t, - expert_token_nums=expert_token_nums + expand_x=expand_x, w1=w1_t, w2=w2_t, expert_token_nums=expert_token_nums ) # MoE combine phase combine_kwargs = _build_combine_kwargs( @@ -382,7 +384,7 @@ def _single_stream_execution( ep_recv_counts=ep_recv_counts, tp_recv_counts=dispatch_output[5], assist_info=assist_info, - ctx=ctx + ctx=ctx, ) hidden_states = _get_combine_func(ctx)(**combine_kwargs) # Final output preparation @@ -390,7 +392,7 @@ def _single_stream_execution( hidden_states=hidden_states, shared_experts=shared_experts, shared_act=shared_act, - down_out_list=down_out + down_out_list=down_out, ) def _dual_stream_execution( @@ -399,101 +401,103 @@ def _dual_stream_execution( topk_ids: torch.Tensor, topk_weights: torch.Tensor, moe_expert_num: int, + top_k: int, w1: torch.Tensor, - w2: torch.Tensor + w2: torch.Tensor, ) -> Union[torch.Tensor, Tuple[torch.Tensor, torch.Tensor]]: - """双流并行执行路径(无共享专家)""" - # 专家分组(关键:专家索引切分) - split_idx = moe_expert_num // 2 + # split experts ids and weights + split_idx = top_k // 2 expert_groupA_size = split_idx - expert_groupB_size = moe_expert_num - split_idx + expert_groupB_size = top_k - split_idx - # 专家权重切分 - topk_ids_A = topk_ids[:split_idx] - topk_ids_B = topk_ids[split_idx:] - topk_weights_A = topk_weights[:split_idx] - topk_weights_B = topk_weights[split_idx:] + topk_ids_A, topk_ids_B = torch.split( + topk_ids, [expert_groupA_size, expert_groupB_size], dim=1 + ) + topk_weights_A, topk_weights_B = torch.split( + topk_weights, [expert_groupA_size, expert_groupB_size], dim=1 + ) - # 阶段1: 权重转置(主线程准备) w1_t = w1.transpose(1, 2) w2_t = w2.transpose(1, 2) - # 阶段2: 流A执行Dispatch A - # with npu_stream_switch("streamA", 0): - kwargs = _build_dispatch_kwargs(hidden_states,topk_ids_A, expert_groupA_size, ctx) + # default stream:Dispatch A + kwargs = _build_dispatch_kwargs(hidden_states, topk_ids_A, moe_expert_num, ctx) dispatchA_result = _get_dispatch_func(ctx)(**kwargs) expand_xA, _, assist_infoA, expert_tokensA, ep_recvA = dispatchA_result[:5] - # 阶段3: 流A执行Compute A + 流B执行Dispatch B(并行) - # 流A: Compute A - down_outA = None - # with npu_stream_switch("streamA", 0): - # 确保Dispatch A完成 - # npu_wait_tensor(down_outA, expand_xA) + # parallel execution: default stream: Compute A + secondary stream: Dispatch B + # default stream: Compute A down_outA = _expert_forward(expand_xA, w1_t, w2_t, expert_tokensA) - # 流B: Dispatch B(依赖Dispatch A完成) + # secondary stream: Dispatch B dispatchB_result = None - with npu_stream_switch("moe_secondary", 0): - # 等待Dispatch A完成 + with npu_stream_switch("moe_secondary1", 0): npu_wait_tensor(dispatchB_result, expand_xA) - kwargs = _build_dispatch_kwargs(hidden_states,topk_ids_B, expert_groupB_size, ctx) + kwargs = _build_dispatch_kwargs( + hidden_states, topk_ids_B, moe_expert_num, ctx + ) dispatchB_result = _get_dispatch_func(ctx)(**kwargs) expand_xB, _, assist_infoB, expert_tokensB, ep_recvB = dispatchB_result[:5] - # 阶段4: 流A执行Combine A + 流B执行Compute B(并行) - # 流A: Combine A(依赖Compute A完成) - resultA = None - # with npu_stream_switch("streamA", 0): - # 确保Compute A完成 - # npu_wait_tensor(resultA, down_outA) - kwargs = _build_combine_kwargs(topk_ids_A, topk_weights_A, expert_groupA_size, down_outA, ep_recvA, dispatchA_result[5], assist_infoA, ctx) + # parallel execution: default stream: Combine A + secondary stream: Compute B + # default stream: Combine A + kwargs = _build_combine_kwargs( + topk_ids=topk_ids_A, + topk_weights=topk_weights_A, + moe_expert_num=moe_expert_num, + down_out=down_outA, + ep_recv_counts=ep_recvA, + tp_recv_counts=dispatchA_result[5], + assist_info=assist_infoA, + ctx=ctx, + ) + resultA = _get_combine_func(ctx)(**kwargs) - # 流B: Compute B(依赖Dispatch B完成) + # secondary stream: Compute B down_outB = None - with npu_stream_switch("moe_secondary", 0): - # 确保Dispatch B完成 + with npu_stream_switch("moe_secondary2", 0): npu_wait_tensor(down_outB, expand_xB) down_outB = _expert_forward(expand_xB, w1_t, w2_t, expert_tokensB) - # 阶段5: 流B执行Combine B(依赖Compute B完成) - resultB = None - with npu_stream_switch("moe_secondary", 0): - # 确保Compute B完成 - npu_wait_tensor(resultB, down_outB) - kwargs = _build_combine_kwargs(topk_ids_B, topk_weights_B, expert_groupB_size, down_outB, ep_recvB, dispatchB_result[5], assist_infoB) - resultB = _get_combine_func(ctx)(**kwargs) + kwargs = _build_combine_kwargs( + topk_ids=topk_ids_B, + topk_weights=topk_weights_B, + moe_expert_num=moe_expert_num, + down_out=down_outB, + ep_recv_counts=ep_recvB, + tp_recv_counts=dispatchB_result[5], + assist_info=assist_infoB, + ctx=ctx, + ) + resultB = _get_combine_func(ctx)(**kwargs) - # 合并结果 - npu_wait_tensor(resultA, resultB) - return resultA + resultB + return torch.add(resultA, resultB) ctx = _build_context() - # 双流模式启用条件:配置开启且无共享专家且元数据可用 - use_dual_stream = (get_ascend_config().fc_dual_batch and - shared_experts is None) - + # Enabling conditions for dual-stream mode: Configuration is enabled and there are no shared experts. + use_dual_stream = get_ascend_config().fc_dual_batch and shared_experts is None if use_dual_stream: - logger.warning_once("=============vLLM is using dual stream") return _dual_stream_execution( ctx=ctx, hidden_states=hidden_states, topk_ids=topk_ids, topk_weights=topk_weights, moe_expert_num=len(expert_map), - w1=w1, - w2=w2) - else: - logger.warning_once("=============vLLM is using single stream") - return _single_stream_execution( - ctx=ctx, - hidden_states=hidden_states, - topk_ids=topk_ids, - topk_weights=topk_weights, + top_k=top_k, w1=w1, w2=w2, - moe_expert_num=len(expert_map), - shared_experts=shared_experts) + ) + + return _single_stream_execution( + ctx=ctx, + hidden_states=hidden_states, + topk_ids=topk_ids, + topk_weights=topk_weights, + w1=w1, + w2=w2, + moe_expert_num=len(expert_map), + shared_experts=shared_experts, + ) def apply_mlp( hidden_states: torch.Tensor,