From e17cf823a78fafc802e7487429fb23e9b24956f2 Mon Sep 17 00:00:00 2001 From: JavaZero <2487163254@qq.com> Date: Sat, 20 Sep 2025 15:04:24 +0800 Subject: [PATCH] Add parallel configuration support to various tests and runners --- .../run_column_parallel_linear.py | 20 ++++----- .../test_column_parallel_linear.py | 5 ++- .../run_column_parallel_linear_with_lora.py | 20 ++++----- .../test_column_parallel_linear_with_lora.py | 5 ++- .../run_row_parallel_linear.py | 39 ++++++++--------- .../test_row_parallel_linear.py | 6 ++- .../run_row_parallel_linear_with_lora.py | 40 ++++++++--------- .../test_row_parallel_linear_with_lora.py | 5 ++- .../run_vocab_parallel_embedding.py | 20 ++++----- .../test_vocab_parallel_embedding.py | 5 ++- .../run_multi_token_prediction.py | 40 +++++++++-------- .../test_multi_token_prediction.py | 5 ++- .../run_transformer_block.py | 43 +++++++++---------- .../test_transformer_block.py | 3 ++ .../run_transformer_layer.py | 43 ++++++++++--------- .../test_transformer_layer.py | 3 ++ .../run_vocab_parallel_cross_entropy.py | 16 +++---- .../test_vocab_parallel_cross_entropy.py | 3 ++ 18 files changed, 171 insertions(+), 150 deletions(-) diff --git a/tests/st/test_ut/test_parallel_core/test_training_graph/test_tensor_parallel/test_column_parallel_linear_train/run_column_parallel_linear.py b/tests/st/test_ut/test_parallel_core/test_training_graph/test_tensor_parallel/test_column_parallel_linear_train/run_column_parallel_linear.py index 53a409376..689d1026a 100644 --- a/tests/st/test_ut/test_parallel_core/test_training_graph/test_tensor_parallel/test_column_parallel_linear_train/run_column_parallel_linear.py +++ b/tests/st/test_ut/test_parallel_core/test_training_graph/test_tensor_parallel/test_column_parallel_linear_train/run_column_parallel_linear.py @@ -14,11 +14,10 @@ # ============================================================================ """Run ColumnParallelLinear accuracy test with configurable parameters via args""" import argparse -import os from pathlib import Path import numpy as np import mindspore as ms -from mindspore.communication import init +from mindspore.communication import init, get_rank, get_group_size from mindformers.parallel_core.training_graph.tensor_parallel.layers import ColumnParallelLinear from mindformers.parallel_core.training_graph.device_matrix import layout from mindformers.parallel_core.transformer_config import TransformerConfig @@ -48,19 +47,17 @@ class ColumnParallelLinearRunner: self.net_weight = init_params.get("weight") self.net_bias = init_params.get("bias") - # RANK_ID and worker_num are set by msrun environment - rank_id_str = os.environ.get("RANK_ID") - self.rank_id = int(rank_id_str) if rank_id_str is not None else None - - self.worker_num = int(os.environ.get("MS_WORKER_NUM", "1")) - - - # Set parallel context - if self.rank_id is not None: + # Setup parallel configuration + if self.args.use_parallel: + self.rank_id = get_rank() + self.worker_num = get_group_size() ms.set_auto_parallel_context( parallel_mode=ms.ParallelMode.SEMI_AUTO_PARALLEL, full_batch=True ) init() + else: + self.rank_id = None + self.worker_num = 1 # Transformer config self.config = TransformerConfig( @@ -120,6 +117,7 @@ def main(): parser.add_argument("--use_weight_tensor", type=lambda x: x.lower() == "true", default=False) parser.add_argument("--output_path", type=str, default="output_ms.npz") parser.add_argument("--tensor_parallel", type=int, default=1) + parser.add_argument("--use_parallel", type=lambda x: x.lower() == "true", default=True) args = parser.parse_args() diff --git a/tests/st/test_ut/test_parallel_core/test_training_graph/test_tensor_parallel/test_column_parallel_linear_train/test_column_parallel_linear.py b/tests/st/test_ut/test_parallel_core/test_training_graph/test_tensor_parallel/test_column_parallel_linear_train/test_column_parallel_linear.py index bb7ba0480..35b43b442 100644 --- a/tests/st/test_ut/test_parallel_core/test_training_graph/test_tensor_parallel/test_column_parallel_linear_train/test_column_parallel_linear.py +++ b/tests/st/test_ut/test_parallel_core/test_training_graph/test_tensor_parallel/test_column_parallel_linear_train/test_column_parallel_linear.py @@ -73,6 +73,7 @@ def build_msrun_command_list( """ Build the msrun command with the specified parameters. """ if worker_num == 1 and local_worker_num == 1: cmd_list = ["python"] + use_parallel = False else: cmd_list = [ "msrun", @@ -82,6 +83,7 @@ def build_msrun_command_list( f"--log_dir={log_dir}", "--join=True", ] + use_parallel = True cmd_list += [ str(run_script_path), f"--input_size={input_size}", @@ -91,7 +93,8 @@ def build_msrun_command_list( f"--skip_weight_param_allocation={str(skip_weight_param_allocation).lower()}", f"--use_weight_tensor={str(use_weight_tensor).lower()}", f"--output_path={output_path_param}", - f"--tensor_parallel={tensor_parallel}" + f"--tensor_parallel={tensor_parallel}", + f"--use_parallel={use_parallel}" ] logger.info(f"Equivalent shell command for debugging (approximate): {' '.join(cmd_list)}") return cmd_list diff --git a/tests/st/test_ut/test_parallel_core/test_training_graph/test_tensor_parallel/test_column_parallel_linear_with_lora/run_column_parallel_linear_with_lora.py b/tests/st/test_ut/test_parallel_core/test_training_graph/test_tensor_parallel/test_column_parallel_linear_with_lora/run_column_parallel_linear_with_lora.py index 36db95333..f6454e8dc 100644 --- a/tests/st/test_ut/test_parallel_core/test_training_graph/test_tensor_parallel/test_column_parallel_linear_with_lora/run_column_parallel_linear_with_lora.py +++ b/tests/st/test_ut/test_parallel_core/test_training_graph/test_tensor_parallel/test_column_parallel_linear_with_lora/run_column_parallel_linear_with_lora.py @@ -14,11 +14,10 @@ # ============================================================================ """Run ColumnParallelLinearWithLoRA accuracy test with configurable parameters via args""" import argparse -import os from pathlib import Path import numpy as np import mindspore as ms -from mindspore.communication import init +from mindspore.communication import init, get_rank, get_group_size from mindformers.parallel_core.training_graph.tensor_parallel.lora_layers import ColumnParallelLinearWithLoRA from mindformers.parallel_core.training_graph.device_matrix import layout from mindformers.parallel_core.transformer_config import TransformerConfig @@ -48,19 +47,17 @@ class ColumnParallelLinearWithLoRARunner: self.net_weight = init_params.get("weight") self.net_bias = init_params.get("bias") - # RANK_ID and worker_num are set by msrun environment - rank_id_str = os.environ.get("RANK_ID") - self.rank_id = int(rank_id_str) if rank_id_str is not None else None - - self.worker_num = int(os.environ.get("MS_WORKER_NUM", "1")) - - - # Set parallel context - if self.rank_id is not None: + # Setup parallel configuration + if self.args.use_parallel: + self.rank_id = get_rank() + self.worker_num = get_group_size() ms.set_auto_parallel_context( parallel_mode=ms.ParallelMode.SEMI_AUTO_PARALLEL, full_batch=True ) init() + else: + self.rank_id = None + self.worker_num = 1 # Transformer config self.config = TransformerConfig( @@ -119,6 +116,7 @@ def main(): parser.add_argument("--use_weight_tensor", type=lambda x: x.lower() == "true", default=False) parser.add_argument("--output_path", type=str, default="output_ms.npz") parser.add_argument("--tensor_parallel", type=int, default=1) + parser.add_argument("--use_parallel", type=lambda x: x.lower() == "true", default=True) args = parser.parse_args() diff --git a/tests/st/test_ut/test_parallel_core/test_training_graph/test_tensor_parallel/test_column_parallel_linear_with_lora/test_column_parallel_linear_with_lora.py b/tests/st/test_ut/test_parallel_core/test_training_graph/test_tensor_parallel/test_column_parallel_linear_with_lora/test_column_parallel_linear_with_lora.py index 82b5825c4..163379e3a 100644 --- a/tests/st/test_ut/test_parallel_core/test_training_graph/test_tensor_parallel/test_column_parallel_linear_with_lora/test_column_parallel_linear_with_lora.py +++ b/tests/st/test_ut/test_parallel_core/test_training_graph/test_tensor_parallel/test_column_parallel_linear_with_lora/test_column_parallel_linear_with_lora.py @@ -73,6 +73,7 @@ def build_msrun_command_list( """ Build the msrun command with the specified parameters. """ if worker_num == 1 and local_worker_num == 1: cmd_list = ["python"] + use_parallel = False else: cmd_list = [ "msrun", @@ -82,6 +83,7 @@ def build_msrun_command_list( f"--log_dir={log_dir}", "--join=True", ] + use_parallel = True cmd_list += [ str(run_script_path), f"--input_size={input_size}", @@ -91,7 +93,8 @@ def build_msrun_command_list( f"--skip_weight_param_allocation={str(skip_weight_param_allocation).lower()}", f"--use_weight_tensor={str(use_weight_tensor).lower()}", f"--output_path={output_path_param}", - f"--tensor_parallel={tensor_parallel}" + f"--tensor_parallel={tensor_parallel}", + f"--use_parallel={use_parallel}" ] logger.info(f"Equivalent shell command for debugging (approximate): {' '.join(cmd_list)}") return cmd_list diff --git a/tests/st/test_ut/test_parallel_core/test_training_graph/test_tensor_parallel/test_row_parallel_linear_train/run_row_parallel_linear.py b/tests/st/test_ut/test_parallel_core/test_training_graph/test_tensor_parallel/test_row_parallel_linear_train/run_row_parallel_linear.py index f199358ed..550525d47 100644 --- a/tests/st/test_ut/test_parallel_core/test_training_graph/test_tensor_parallel/test_row_parallel_linear_train/run_row_parallel_linear.py +++ b/tests/st/test_ut/test_parallel_core/test_training_graph/test_tensor_parallel/test_row_parallel_linear_train/run_row_parallel_linear.py @@ -14,11 +14,10 @@ # ============================================================================ """Run RowParallelLinear accuracy test with configurable parameters via args""" import argparse -import os from pathlib import Path import numpy as np import mindspore as ms -from mindspore.communication import init +from mindspore.communication import init, get_rank, get_group_size from mindformers.parallel_core.transformer_config import TransformerConfig from mindformers.parallel_core.training_graph.tensor_parallel.layers import RowParallelLinear from mindformers.parallel_core.training_graph.device_matrix import layout @@ -47,11 +46,24 @@ class RowParallelLinearRunner: self.net_weight = init_params.get("weight") self.net_bias = init_params.get("bias") - # RANK_ID and worker_num are set by msrun environment - rank_id_str = os.environ.get("RANK_ID") - self.rank_id = int(rank_id_str) if rank_id_str is not None else None + # Setup parallel configuration + if self.args.use_parallel: + self.rank_id = get_rank() + self.worker_num = get_group_size() + ms.set_auto_parallel_context( + parallel_mode=ms.ParallelMode.SEMI_AUTO_PARALLEL, full_batch=True + ) + init() - self.worker_num = int(os.environ.get("MS_WORKER_NUM", "1")) + # Initialize model parallel + tp = self.args.tensor_parallel + dp = self.worker_num // tp + pp = 1 # pipeline parallel size + initialize_model_parallel(tensor_model_parallel_size=tp, data_parallel_size=dp, + pipeline_model_parallel_size=pp) + else: + self.rank_id = None + self.worker_num = 1 # Transformer config self.config = TransformerConfig( @@ -62,20 +74,6 @@ class RowParallelLinearRunner: num_layers=1 ) - # Set parallel context - if self.rank_id is not None: - ms.set_auto_parallel_context( - parallel_mode=ms.ParallelMode.SEMI_AUTO_PARALLEL, full_batch=True - ) - init() - self.tp = self.config.tensor_model_parallel_size \ - if self.config.tensor_model_parallel_size is not None else 1 - self.dp = self.config.data_parallel_size if self.config.data_parallel_size is not None else 1 - self.pp = self.config.pipeline_model_parallel_size \ - if self.config.pipeline_model_parallel_size is not None else 1 - initialize_model_parallel(tensor_model_parallel_size=self.tp, data_parallel_size=self.dp, - pipeline_model_parallel_size=self.pp) - layout.init_layout(self.config) def build_model(self): @@ -118,6 +116,7 @@ def main(): parser.add_argument("--skip_bias_add", type=lambda x: x.lower() == "true", default=True) parser.add_argument("--output_path", type=str, default="output_ms.npz") parser.add_argument("--tensor_parallel", type=int, default=1) + parser.add_argument("--use_parallel", type=lambda x: x.lower() == "true", default=True) args = parser.parse_args() diff --git a/tests/st/test_ut/test_parallel_core/test_training_graph/test_tensor_parallel/test_row_parallel_linear_train/test_row_parallel_linear.py b/tests/st/test_ut/test_parallel_core/test_training_graph/test_tensor_parallel/test_row_parallel_linear_train/test_row_parallel_linear.py index 2823e8fc3..f54810503 100644 --- a/tests/st/test_ut/test_parallel_core/test_training_graph/test_tensor_parallel/test_row_parallel_linear_train/test_row_parallel_linear.py +++ b/tests/st/test_ut/test_parallel_core/test_training_graph/test_tensor_parallel/test_row_parallel_linear_train/test_row_parallel_linear.py @@ -54,6 +54,7 @@ def build_msrun_command_list( """ Build the msrun command with the specified parameters. """ if worker_num == 1 and local_worker_num == 1: cmd_list = ["python"] + use_parallel = False else: cmd_list = [ "msrun", @@ -63,6 +64,8 @@ def build_msrun_command_list( f"--log_dir={log_dir}", "--join=True", ] + use_parallel = True + cmd_list += [ str(run_script_path), f"--input_size={input_size}", @@ -70,7 +73,8 @@ def build_msrun_command_list( f"--bias={str(bias).lower()}", f"--skip_bias_add={str(skip_bias_add).lower()}", f"--output_path={output_path_param}", - f"--tensor_parallel={tensor_parallel}" + f"--tensor_parallel={tensor_parallel}", + f"--use_parallel={use_parallel}" ] logger.info(f"Equivalent shell command for RowParallelLinear (approximate): {' '.join(cmd_list)}") return cmd_list diff --git a/tests/st/test_ut/test_parallel_core/test_training_graph/test_tensor_parallel/test_row_parallel_linear_with_lora/run_row_parallel_linear_with_lora.py b/tests/st/test_ut/test_parallel_core/test_training_graph/test_tensor_parallel/test_row_parallel_linear_with_lora/run_row_parallel_linear_with_lora.py index d6c91c5c4..7b43acee6 100644 --- a/tests/st/test_ut/test_parallel_core/test_training_graph/test_tensor_parallel/test_row_parallel_linear_with_lora/run_row_parallel_linear_with_lora.py +++ b/tests/st/test_ut/test_parallel_core/test_training_graph/test_tensor_parallel/test_row_parallel_linear_with_lora/run_row_parallel_linear_with_lora.py @@ -14,11 +14,10 @@ # ============================================================================ """Run RowParallelLinearWithLoRA accuracy test with configurable parameters via args""" import argparse -import os from pathlib import Path import numpy as np import mindspore as ms -from mindspore.communication import init +from mindspore.communication import init, get_rank, get_group_size from mindformers.parallel_core.transformer_config import TransformerConfig from mindformers.parallel_core.training_graph.tensor_parallel.lora_layers import RowParallelLinearWithLoRA from mindformers.parallel_core.training_graph.device_matrix import layout @@ -47,12 +46,24 @@ class RowParallelLinearWithLoRARunner: self.net_weight = init_params.get("weight") self.net_bias = init_params.get("bias") - # RANK_ID and worker_num are set by msrun environment - rank_id_str = os.environ.get("RANK_ID") - self.rank_id = int(rank_id_str) if rank_id_str is not None else None - - self.worker_num = int(os.environ.get("MS_WORKER_NUM", "1")) + # Setup parallel configuration + if self.args.use_parallel: + self.rank_id = get_rank() + self.worker_num = get_group_size() + ms.set_auto_parallel_context( + parallel_mode=ms.ParallelMode.SEMI_AUTO_PARALLEL, full_batch=True + ) + init() + # Initialize model parallel + tp = self.args.tensor_parallel + dp = self.worker_num // tp + pp = 1 # pipeline parallel size + initialize_model_parallel(tensor_model_parallel_size=tp, data_parallel_size=dp, + pipeline_model_parallel_size=pp) + else: + self.rank_id = None + self.worker_num = 1 # Transformer config self.config = TransformerConfig( @@ -63,20 +74,6 @@ class RowParallelLinearWithLoRARunner: num_layers=1 ) - # Set parallel context - if self.rank_id is not None: - ms.set_auto_parallel_context( - parallel_mode=ms.ParallelMode.SEMI_AUTO_PARALLEL, full_batch=True - ) - init() - self.tp = self.config.tensor_model_parallel_size \ - if self.config.tensor_model_parallel_size is not None else 1 - self.dp = self.config.data_parallel_size if self.config.data_parallel_size is not None else 1 - self.pp = self.config.pipeline_model_parallel_size \ - if self.config.pipeline_model_parallel_size is not None else 1 - initialize_model_parallel(tensor_model_parallel_size=self.tp, data_parallel_size=self.dp, - pipeline_model_parallel_size=self.pp) - layout.init_layout(self.config) def build_model(self): @@ -119,6 +116,7 @@ def main(): parser.add_argument("--skip_bias_add", type=lambda x: x.lower() == "true", default=True) parser.add_argument("--output_path", type=str, default="output_ms.npz") parser.add_argument("--tensor_parallel", type=int, default=1) + parser.add_argument("--use_parallel", type=lambda x: x.lower() == "true", default=True) args = parser.parse_args() diff --git a/tests/st/test_ut/test_parallel_core/test_training_graph/test_tensor_parallel/test_row_parallel_linear_with_lora/test_row_parallel_linear_with_lora.py b/tests/st/test_ut/test_parallel_core/test_training_graph/test_tensor_parallel/test_row_parallel_linear_with_lora/test_row_parallel_linear_with_lora.py index 7fc90b54f..8f71a4dc5 100644 --- a/tests/st/test_ut/test_parallel_core/test_training_graph/test_tensor_parallel/test_row_parallel_linear_with_lora/test_row_parallel_linear_with_lora.py +++ b/tests/st/test_ut/test_parallel_core/test_training_graph/test_tensor_parallel/test_row_parallel_linear_with_lora/test_row_parallel_linear_with_lora.py @@ -54,6 +54,7 @@ def build_msrun_command_list( """ Build the msrun command with the specified parameters. """ if worker_num == 1 and local_worker_num == 1: cmd_list = ["python"] + use_parallel = False else: cmd_list = [ "msrun", @@ -63,6 +64,7 @@ def build_msrun_command_list( f"--log_dir={log_dir}", "--join=True", ] + use_parallel = True cmd_list += [ str(run_script_path), f"--input_size={input_size}", @@ -70,7 +72,8 @@ def build_msrun_command_list( f"--bias={str(bias).lower()}", f"--skip_bias_add={str(skip_bias_add).lower()}", f"--output_path={output_path_param}", - f"--tensor_parallel={tensor_parallel}" + f"--tensor_parallel={tensor_parallel}", + f"--use_parallel={use_parallel}" ] logger.info(f"Equivalent shell command for RowParallelLinearWithLoRA (approximate): {' '.join(cmd_list)}") return cmd_list diff --git a/tests/st/test_ut/test_parallel_core/test_training_graph/test_tensor_parallel/test_vocab_parallel_embedding/run_vocab_parallel_embedding.py b/tests/st/test_ut/test_parallel_core/test_training_graph/test_tensor_parallel/test_vocab_parallel_embedding/run_vocab_parallel_embedding.py index 555a04938..e58202c94 100644 --- a/tests/st/test_ut/test_parallel_core/test_training_graph/test_tensor_parallel/test_vocab_parallel_embedding/run_vocab_parallel_embedding.py +++ b/tests/st/test_ut/test_parallel_core/test_training_graph/test_tensor_parallel/test_vocab_parallel_embedding/run_vocab_parallel_embedding.py @@ -14,11 +14,10 @@ # ============================================================================ """Run VocabParallelEmbedding accuracy test with configurable parameters via args""" import argparse -import os from pathlib import Path import numpy as np import mindspore as ms -from mindspore.communication import init +from mindspore.communication import init, get_rank, get_group_size from mindformers.parallel_core.training_graph.tensor_parallel.layers import VocabParallelEmbedding from mindformers.parallel_core.transformer_config import TransformerConfig from mindformers.parallel_core.utils.init_method import init_method_normal @@ -42,19 +41,17 @@ class VocabParallelEmbeddingRunner: self.inputs = ms.Tensor(init_params.get("inputs"), dtype=ms.int32) self.net_weight = init_params.get("weight") - # RANK_ID and worker_num are set by msrun environment - rank_id_str = os.environ.get("RANK_ID") - self.rank_id = int(rank_id_str) if rank_id_str is not None else None - - self.worker_num = int(os.environ.get("MS_WORKER_NUM", "1")) - - - # Set parallel context - if self.rank_id is not None: + # Setup parallel configuration + if self.args.use_parallel: + self.rank_id = get_rank() + self.worker_num = get_group_size() ms.set_auto_parallel_context( parallel_mode=ms.ParallelMode.SEMI_AUTO_PARALLEL, full_batch=True ) init() + else: + self.rank_id = None + self.worker_num = 1 # Transformer config self.config = TransformerConfig( @@ -104,6 +101,7 @@ def main(): parser.add_argument("--config_vocab_emb_dp", type=lambda x: x.lower() == "true", default=True) parser.add_argument("--output_path", type=str, default="output_ms.npz") parser.add_argument("--tensor_parallel", type=int, default=1) + parser.add_argument("--use_parallel", type=lambda x: x.lower() == "true", default=True) args = parser.parse_args() diff --git a/tests/st/test_ut/test_parallel_core/test_training_graph/test_tensor_parallel/test_vocab_parallel_embedding/test_vocab_parallel_embedding.py b/tests/st/test_ut/test_parallel_core/test_training_graph/test_tensor_parallel/test_vocab_parallel_embedding/test_vocab_parallel_embedding.py index fa74056d5..0e52edcc5 100644 --- a/tests/st/test_ut/test_parallel_core/test_training_graph/test_tensor_parallel/test_vocab_parallel_embedding/test_vocab_parallel_embedding.py +++ b/tests/st/test_ut/test_parallel_core/test_training_graph/test_tensor_parallel/test_vocab_parallel_embedding/test_vocab_parallel_embedding.py @@ -42,6 +42,7 @@ def build_msrun_command_list( """ Build the msrun command with the specified parameters. """ if worker_num == 1 and local_worker_num == 1: cmd_list = ["python"] + use_parallel = False else: cmd_list = [ "msrun", @@ -51,13 +52,15 @@ def build_msrun_command_list( f"--log_dir={log_dir}", "--join=True", ] + use_parallel = True cmd_list += [ str(run_script_path), f"--num_embeddings={num_embeddings}", f"--embedding_dim={embedding_dim}", f"--config_vocab_emb_dp={str(config_vocab_emb_dp).lower()}", f"--output_path={output_path_param}", - f"--tensor_parallel={tensor_parallel}" + f"--tensor_parallel={tensor_parallel}", + f"--use_parallel={use_parallel}" ] logger.info(f"Equivalent shell command for VocabParallelEmbedding (approximate): {' '.join(cmd_list)}") return cmd_list diff --git a/tests/st/test_ut/test_parallel_core/test_training_graph/test_transformer/test_multi_token_prediction/run_multi_token_prediction.py b/tests/st/test_ut/test_parallel_core/test_training_graph/test_transformer/test_multi_token_prediction/run_multi_token_prediction.py index ab10ec0ea..7a52e7050 100644 --- a/tests/st/test_ut/test_parallel_core/test_training_graph/test_transformer/test_multi_token_prediction/run_multi_token_prediction.py +++ b/tests/st/test_ut/test_parallel_core/test_training_graph/test_transformer/test_multi_token_prediction/run_multi_token_prediction.py @@ -13,13 +13,12 @@ # limitations under the License. # ============================================================================ """Run Multi-Token Prediction (MTP) accuracy test with configurable parameters via args.""" -import os import argparse from pathlib import Path import numpy as np import mindspore as ms from mindspore import Tensor, Parameter, set_auto_parallel_context, load_param_into_net, set_context, set_seed -from mindspore.communication import init +from mindspore.communication import init, get_rank, get_group_size from mindformers.parallel_core.training_graph.transformer.multi_token_prediction import MultiTokenPredictionLayer, \ MultiTokenPredictionLayerSubmodules, MultiTokenPredictionBlock, MultiTokenPredictionBlockSubmodules @@ -50,9 +49,25 @@ class MTPRunner: self.args = args_from_parser self.compute_dtype = ms.bfloat16 - rank_id_str: str | None = os.environ.get("RANK_ID") - self.rank_id = int(rank_id_str) if rank_id_str is not None else None - self.worker_num = int(os.environ.get("MS_WORKER_NUM", "1")) + # Setup parallel configuration + if self.args.use_parallel: + self.rank_id = get_rank() + self.worker_num = get_group_size() + set_auto_parallel_context( + parallel_mode=ms.ParallelMode.SEMI_AUTO_PARALLEL, + full_batch=True + ) + init() + + # Initialize model parallel + tp = self.args.tp + dp = self.worker_num // tp + pp = 1 # pipeline parallel size + initialize_model_parallel(tensor_model_parallel_size=tp, data_parallel_size=dp, + pipeline_model_parallel_size=pp) + else: + self.rank_id = None + self.worker_num = 1 self.config = TransformerConfig( hidden_size=self.args.hidden_size, @@ -73,20 +88,6 @@ class MTPRunner: vocab_size=self.args.vocab_size ) - if self.rank_id is not None: - set_auto_parallel_context( - parallel_mode=ms.ParallelMode.SEMI_AUTO_PARALLEL, - full_batch=True - ) - init() - self.tp = self.config.tensor_model_parallel_size \ - if self.config.tensor_model_parallel_size is not None else 1 - self.dp = self.config.data_parallel_size if self.config.data_parallel_size is not None else 1 - self.pp = self.config.pipeline_model_parallel_size \ - if self.config.pipeline_model_parallel_size is not None else 1 - initialize_model_parallel(tensor_model_parallel_size=self.tp, data_parallel_size=self.dp, - pipeline_model_parallel_size=self.pp) - layout.init_layout(self.config) def build_model(self): @@ -191,6 +192,7 @@ def main(): parser.add_argument("--dp", type=int, default=1, help='data_parallel') parser.add_argument("--cp", type=int, default=1, help='context_parallel') parser.add_argument("--tp", type=int, default=1, help='tensor_parallel') + parser.add_argument("--use_parallel", type=lambda x: x.lower() == "true", default=True) args = parser.parse_args() build_context({"use_legacy": False}) diff --git a/tests/st/test_ut/test_parallel_core/test_training_graph/test_transformer/test_multi_token_prediction/test_multi_token_prediction.py b/tests/st/test_ut/test_parallel_core/test_training_graph/test_transformer/test_multi_token_prediction/test_multi_token_prediction.py index 488f35b34..b30e90559 100644 --- a/tests/st/test_ut/test_parallel_core/test_training_graph/test_transformer/test_multi_token_prediction/test_multi_token_prediction.py +++ b/tests/st/test_ut/test_parallel_core/test_training_graph/test_transformer/test_multi_token_prediction/test_multi_token_prediction.py @@ -55,6 +55,7 @@ def build_msrun_command_list( cmd_list = [ "python", ] + use_parallel = False else: cmd_list = [ "msrun", @@ -64,6 +65,7 @@ def build_msrun_command_list( f"--log_dir={log_dir}", "--join=True" ] + use_parallel = True cmd_list.extend([ str(run_script_path), @@ -73,7 +75,8 @@ def build_msrun_command_list( f"--output_path={output_path_param}", f"--tp={tensor_parallel}", f"--position_embedding_type={model_args['position_embedding_type']}", - f"--test_name={test_name}" + f"--test_name={test_name}", + f"--use_parallel={use_parallel}" ]) logger.info(f"Test case shell command: {' '.join(cmd_list)}") diff --git a/tests/st/test_ut/test_parallel_core/test_training_graph/test_transformer/test_transformer_block/run_transformer_block.py b/tests/st/test_ut/test_parallel_core/test_training_graph/test_transformer/test_transformer_block/run_transformer_block.py index 1dba5fb28..e17f95986 100644 --- a/tests/st/test_ut/test_parallel_core/test_training_graph/test_transformer/test_transformer_block/run_transformer_block.py +++ b/tests/st/test_ut/test_parallel_core/test_training_graph/test_transformer/test_transformer_block/run_transformer_block.py @@ -15,11 +15,10 @@ """Run TransformerLayer accuracy test with configurable parameters via args""" import argparse -import os from pathlib import Path import numpy as np import mindspore as ms -from mindspore.communication import init +from mindspore.communication import init, get_rank, get_group_size from mindformers.parallel_core.training_graph.transformer.identity_op import IdentityOp from mindformers.parallel_core.transformer_config import TransformerConfig @@ -69,13 +68,27 @@ class TransformerLayerRunner: self.post_layer_norm = self.args.post_layer_norm + # Setup parallel configuration + if self.args.use_parallel: + self.rank_id = get_rank() + self.worker_num = get_group_size() + ms.set_auto_parallel_context( + parallel_mode=ms.ParallelMode.SEMI_AUTO_PARALLEL, full_batch=True + ) + init() # Initialize communication + + # Initialize model parallel + tp = self.args.tensor_parallel + dp = self.worker_num // tp + pp = 1 # pipeline parallel size + initialize_model_parallel(tensor_model_parallel_size=tp, data_parallel_size=dp, + pipeline_model_parallel_size=pp) + else: + self.rank_id = None + self.worker_num = 1 + # Parallelism self.tensor_parallel = self.args.tensor_parallel - self.rank_id = None - self.worker_num = int(os.environ.get("MS_WORKER_NUM", "1")) - - rank_id_str = os.environ.get("RANK_ID") - self.rank_id = int(rank_id_str) if rank_id_str is not None else None self.data_parallel = self.worker_num // self.tensor_parallel if self.worker_num % self.tensor_parallel != 0: @@ -98,21 +111,6 @@ class TransformerLayerRunner: params_dtype='fp32' ) - if self.rank_id is not None: - ms.set_auto_parallel_context( - parallel_mode=ms.ParallelMode.SEMI_AUTO_PARALLEL, full_batch=True - ) - init() # Initialize communication - if self.config.tensor_model_parallel_size is not None: - self.tp = self.config.tensor_model_parallel_size - else: - self.tp = 1 - self.dp = self.config.data_parallel_size if self.config.data_parallel_size is not None else 1 - self.pp = self.config.pipeline_model_parallel_size \ - if self.config.pipeline_model_parallel_size is not None else 1 - initialize_model_parallel(tensor_model_parallel_size=self.tp, data_parallel_size=self.dp, - pipeline_model_parallel_size=self.pp) - layout.init_layout(self.config) # Submodules @@ -213,6 +211,7 @@ def main(): # Execution config parser.add_argument("--output_path", type=str, default="output_transformer_layer.npz") parser.add_argument("--tensor_parallel", type=int, default=1) + parser.add_argument("--use_parallel", type=lambda x: x.lower() == "true", default=True) args = parser.parse_args() diff --git a/tests/st/test_ut/test_parallel_core/test_training_graph/test_transformer/test_transformer_block/test_transformer_block.py b/tests/st/test_ut/test_parallel_core/test_training_graph/test_transformer/test_transformer_block/test_transformer_block.py index 5e4673e30..b6f9441c3 100644 --- a/tests/st/test_ut/test_parallel_core/test_training_graph/test_transformer/test_transformer_block/test_transformer_block.py +++ b/tests/st/test_ut/test_parallel_core/test_training_graph/test_transformer/test_transformer_block/test_transformer_block.py @@ -64,6 +64,7 @@ def build_msrun_command_list( """ Build the msrun command with the specified parameters. """ if worker_num == 1: cmd_list = ["python"] + use_parallel = False else: cmd_list = [ "msrun", @@ -72,9 +73,11 @@ def build_msrun_command_list( f"--master_port={port}", # Ensure port is unique per test run if parallelized at pytest level f"--log_dir={log_dir}", "--join=True"] + use_parallel = True cmd_list += [str(run_script_path), f"--output_path={output_path_param}", f"--tensor_parallel={tensor_parallel}", + f"--use_parallel={use_parallel}", # Add model dimensions f"--seq_length={model_args.get('seq_length', seq_length)}", f"--batch_size={model_args.get('batch_size', batch_size)}", diff --git a/tests/st/test_ut/test_parallel_core/test_training_graph/test_transformer/test_transformer_layer/run_transformer_layer.py b/tests/st/test_ut/test_parallel_core/test_training_graph/test_transformer/test_transformer_layer/run_transformer_layer.py index 9041cfd02..8914d93c6 100644 --- a/tests/st/test_ut/test_parallel_core/test_training_graph/test_transformer/test_transformer_layer/run_transformer_layer.py +++ b/tests/st/test_ut/test_parallel_core/test_training_graph/test_transformer/test_transformer_layer/run_transformer_layer.py @@ -15,11 +15,10 @@ """Run TransformerLayer accuracy test with configurable parameters via args""" import argparse -import os from pathlib import Path import numpy as np import mindspore as ms -from mindspore.communication import init +from mindspore.communication import init, get_rank, get_group_size from mindformers.parallel_core.training_graph.transformer.identity_op import IdentityOp from mindformers.parallel_core.transformer_config import TransformerConfig @@ -64,13 +63,28 @@ class TransformerLayerRunner: self.compute_dtype = ms.bfloat16 self.param_init_dtype = ms.float32 + # Setup parallel configuration + if self.args.use_parallel: + self.rank_id = get_rank() + self.worker_num = get_group_size() + ms.set_auto_parallel_context( + parallel_mode=ms.ParallelMode.SEMI_AUTO_PARALLEL, full_batch=True, + # device_num=self.worker_num # device_num is often set by environment + ) + init() # Initialize communication + + # Initialize model parallel + tp = self.args.tensor_parallel + dp = self.worker_num // tp + pp = 1 # pipeline parallel size + initialize_model_parallel(tensor_model_parallel_size=tp, data_parallel_size=dp, + pipeline_model_parallel_size=pp) + else: + self.rank_id = None + self.worker_num = 1 + # Parallelism self.tensor_parallel = self.args.tensor_parallel - self.rank_id = None - self.worker_num = int(os.environ.get("MS_WORKER_NUM", "1")) - - rank_id_str = os.environ.get("RANK_ID") - self.rank_id = int(rank_id_str) if rank_id_str is not None else None self.data_parallel = self.worker_num // self.tensor_parallel if self.worker_num % self.tensor_parallel != 0: @@ -93,20 +107,6 @@ class TransformerLayerRunner: params_dtype='fp32' ) - if self.rank_id is not None: - ms.set_auto_parallel_context( - parallel_mode=ms.ParallelMode.SEMI_AUTO_PARALLEL, full_batch=True, - # device_num=self.worker_num # device_num is often set by environment - ) - init() # Initialize communication - self.tp = self.config.tensor_model_parallel_size \ - if self.config.tensor_model_parallel_size is not None else 1 - self.dp = self.config.data_parallel_size if self.config.data_parallel_size is not None else 1 - self.pp = self.config.pipeline_model_parallel_size \ - if self.config.pipeline_model_parallel_size is not None else 1 - initialize_model_parallel(tensor_model_parallel_size=self.tp, data_parallel_size=self.dp, - pipeline_model_parallel_size=self.pp) - layout.init_layout(self.config) # Submodules @@ -205,6 +205,7 @@ def main(): # Execution config parser.add_argument("--output_path", type=str, default="output_transformer_layer.npz") parser.add_argument("--tensor_parallel", type=int, default=1) + parser.add_argument("--use_parallel", type=lambda x: x.lower() == "true", default=True) args = parser.parse_args() diff --git a/tests/st/test_ut/test_parallel_core/test_training_graph/test_transformer/test_transformer_layer/test_transformer_layer.py b/tests/st/test_ut/test_parallel_core/test_training_graph/test_transformer/test_transformer_layer/test_transformer_layer.py index 9bcf117a6..fef9165eb 100644 --- a/tests/st/test_ut/test_parallel_core/test_training_graph/test_transformer/test_transformer_layer/test_transformer_layer.py +++ b/tests/st/test_ut/test_parallel_core/test_training_graph/test_transformer/test_transformer_layer/test_transformer_layer.py @@ -56,6 +56,7 @@ def build_msrun_command_list( """ Build the msrun command with the specified parameters. """ if worker_num == 1: cmd_list = ["python"] + use_parallel = False else: cmd_list = [ "msrun", @@ -64,9 +65,11 @@ def build_msrun_command_list( f"--master_port={port}", # Ensure port is unique per test run if parallelized at pytest level f"--log_dir={log_dir}", "--join=True"] + use_parallel = True cmd_list += [str(run_script_path), f"--output_path={output_path_param}", f"--tensor_parallel={tensor_parallel}", + f"--use_parallel={use_parallel}", # Add model dimensions f"--seq_length={model_args.get('seq_length', seq_length)}", f"--batch_size={model_args.get('batch_size', batch_size)}", diff --git a/tests/st/test_ut/test_parallel_core/test_training_graph/test_vocab_parallel_crossentropy/run_vocab_parallel_cross_entropy.py b/tests/st/test_ut/test_parallel_core/test_training_graph/test_vocab_parallel_crossentropy/run_vocab_parallel_cross_entropy.py index c41160e07..0948b6ddc 100644 --- a/tests/st/test_ut/test_parallel_core/test_training_graph/test_vocab_parallel_crossentropy/run_vocab_parallel_cross_entropy.py +++ b/tests/st/test_ut/test_parallel_core/test_training_graph/test_vocab_parallel_crossentropy/run_vocab_parallel_cross_entropy.py @@ -15,11 +15,10 @@ """Run VocabParallelCrossEntropy accuracy test with configurable parameters via args""" import argparse -import os from pathlib import Path import numpy as np import mindspore as ms -from mindspore.communication import init +from mindspore.communication import init, get_rank, get_group_size from mindformers.core.context.build_context import init_context, set_context from mindformers.parallel_core.training_graph.loss_func import VocabParallelCrossEntropy from mindformers.parallel_core.transformer_config import TransformerConfig @@ -40,16 +39,16 @@ class VocabParallelCrossEntropyRunner: self.batch_size = self.args.batch_size self.seq_length = self.args.seq_length - rank_id_str = os.environ.get("RANK_ID") - self.rank_id = int(rank_id_str) if rank_id_str is not None else None - - self.worker_num = int(os.environ.get("MS_WORKER_NUM", "1")) - - if self.rank_id is not None: + # Setup parallel configuration + if self.args.use_parallel: + self.rank_id = get_rank() + self.worker_num = get_group_size() init_context(use_parallel=True) ms.set_auto_parallel_context(parallel_mode=ms.ParallelMode.SEMI_AUTO_PARALLEL, full_batch=True) init() else: + self.rank_id = None + self.worker_num = 1 init_context(use_parallel=False) self.config = TransformerConfig( @@ -113,6 +112,7 @@ def main(): parser.add_argument("--calculate_per_token_loss", type=lambda x: x.lower() == "true", default="false") parser.add_argument("--output_path", type=str, default="output_ms_loss.npz") parser.add_argument("--tensor_parallel", type=int, default=1) + parser.add_argument("--use_parallel", type=lambda x: x.lower() == "true", default=True) args = parser.parse_args() diff --git a/tests/st/test_ut/test_parallel_core/test_training_graph/test_vocab_parallel_crossentropy/test_vocab_parallel_cross_entropy.py b/tests/st/test_ut/test_parallel_core/test_training_graph/test_vocab_parallel_crossentropy/test_vocab_parallel_cross_entropy.py index a7665fdca..9b4109bbc 100644 --- a/tests/st/test_ut/test_parallel_core/test_training_graph/test_vocab_parallel_crossentropy/test_vocab_parallel_cross_entropy.py +++ b/tests/st/test_ut/test_parallel_core/test_training_graph/test_vocab_parallel_crossentropy/test_vocab_parallel_cross_entropy.py @@ -59,6 +59,7 @@ def build_msrun_command_list( """Build the msrun command with the specified parameters for VocabParallelCrossEntropy.""" if tensor_parallel == 1: cmd_list = ["python"] + use_parallel = False else: cmd_list = [ "msrun", @@ -67,6 +68,7 @@ def build_msrun_command_list( f"--master_port={port}", f"--log_dir={log_dir}", "--join=True",] + use_parallel = True cmd_list += [ str(run_script_path), f"--vocab_size={vocab_size}", @@ -76,6 +78,7 @@ def build_msrun_command_list( f"--calculate_per_token_loss={str(calculate_per_token_loss).lower()}", f"--output_path={output_path_param}", f"--tensor_parallel={tensor_parallel}", + f"--use_parallel={use_parallel}", ] print(f"Equivalent shell command for debugging (approximate): {' '.join(cmd_list)}") return cmd_list -- Gitee