diff --git a/examples/demo/little_demo/config.py b/examples/demo/little_demo/config.py index a0912ac511abf447f632aea67f443a9fa189d17e..ef8d91801533a4b4e25cbd79d7f730c50fa2184c 100644 --- a/examples/demo/little_demo/config.py +++ b/examples/demo/little_demo/config.py @@ -13,14 +13,34 @@ # See the License for the specific language governing permissions and # limitations under the License. # ============================================================================== - +import json import math +import os import tensorflow as tf from tensorflow.core.protobuf.rewriter_config_pb2 import RewriterConfig from mx_rec.util.communication.hccl_ops import get_rank_size +from utils import PrecisionDumpInfo, PRECISION_CHECK_PATH, PRECISION_DUMP_STEP + + +GLOBAL_RANDOM_SEED = 128 +try: + USE_DYNAMIC = bool(int(os.getenv("USE_DYNAMIC", 0))) + USE_DYNAMIC_EXPANSION = bool(int(os.getenv("USE_DYNAMIC_EXPANSION", 0))) + USE_MULTI_LOOKUP = bool(int(os.getenv("USE_MULTI_LOOKUP", 1))) + MODIFY_GRAPH_FLAG = bool(int(os.getenv("USE_MODIFY_GRAPH", 0))) + USE_TIMESTAMP = bool(int(os.getenv("USE_TIMESTAMP", 0))) + USE_ONE_SHOT = bool(int(os.getenv("USE_ONE_SHOT", 0))) + USE_DETERMINISTIC = bool(int(os.getenv("USE_DETERMINISTIC", 0))) + MULTI_LOOKUP_TIMES = int(os.getenv("MULTI_LOOKUP_TIMES", 2)) + PRECISION_CHECK = bool(int(os.getenv("PRECISION_CHECK", 0))) +except ValueError as err: + raise ValueError("please correctly config USE_MPI or USE_DYNAMIC or USE_DYNAMIC_EXPANSION or " + "USE_MULTI_LOOKUP or USE_MODIFY_GRAPH or USE_TIMESTAMP or USE_ONE_SHOT or USE_DETERMINISTIC" + "only 0 or 1 is supported.") from err + class Config: def __init__(self, mode="simple", task_name="default"): @@ -94,36 +114,88 @@ class Config: self.hashtable_dim = 8 self.learning_rate = 0.01 +def construct_basic_config(npu_custom_op): + npu_custom_op.parameter_map["mix_compile_mode"].b = False + npu_custom_op.parameter_map["use_off_line"].b = True + npu_custom_op.parameter_map["min_group_size"].b = 1 + npu_custom_op.parameter_map["HCCL_algorithm"].s = tf.compat.as_bytes("level0:pairwise;level1:pairwise") + npu_custom_op.parameter_map["enable_data_pre_proc"].b = True + npu_custom_op.parameter_map["iterations_per_loop"].i = 1 + npu_custom_op.parameter_map["hcom_parallel"].b = False + npu_custom_op.parameter_map["op_precision_mode"].s = tf.compat.as_bytes("op_impl_mode.ini") + npu_custom_op.parameter_map["op_execute_timeout"].i = 2000 + npu_custom_op.parameter_map["precision_mode"].s = tf.compat.as_bytes("allow_mix_precision") + +def construct_deterministic_config(npu_custom_op): + npu_custom_op.parameter_map["precision_mode"].s = tf.compat.as_bytes("must_keep_origin_dtype") + npu_custom_op.parameter_map["deterministic"].i = 1 + +def construct_op_dump_config(npu_custom_op): + npu_custom_op.parameter_map["enable_dump"].b = True + + + dump_path = os.path.join(PRECISION_CHECK_PATH, "03dump_op") + if os.path.exists(dump_path): + raise ValueError(f"03dump_op exist! Your files may have been tampered:{dump_path}") + + os.makedirs(dump_path, exist_ok=False) + dump_step = "|".join([str(step_num) for step_num in PRECISION_DUMP_STEP]) + dump_mode = "all" + + npu_custom_op.parameter_map["dump_path"].s = tf.compat.as_bytes(dump_path) + npu_custom_op.parameter_map["dump_step"].s = tf.compat.as_bytes(dump_step) + npu_custom_op.parameter_map["dump_mode"].s = tf.compat.as_bytes(dump_mode) + + + dump_op_info = {"dump_step": dump_step, + "dump_mode": dump_mode} + + PrecisionDumpInfo.add_item("dump_op_info", dump_op_info) + + table_list = ["user_table", "item_table"] + dump_op_list = [] + + dump_emb_op_info = {} + for table_name in table_list: + look_up_by_id_offset = [f"{table_name}//{table_name}_lookup/gather_for_id_offsets", + f"LazyAdam_0/update_{table_name}/GatherV2", + f"LazyAdam_0/update_{table_name}/GatherV2_1"] + + update_by_id_offset = [f"LazyAdam_0/update_{table_name}/ScatterNdAdd_2", + f"LazyAdam_0/update_{table_name}/ScatterNdAdd", + f"LazyAdam_0/update_{table_name}/ScatterNdAdd_1"] + # f"{table_name}//{table_name}_lookup/EmbeddingLookupByAddress", + lookup_byaddress = [f"LazyAdamByAddress_0/update_{table_name}//{table_name}_lookup/id_offsets/{table_name}/GetNext/EmbeddingLookupByAddress"] + update_byaddress = [f"LazyAdamByAddress_0/update_{table_name}//{table_name}_lookup/id_offsets/{table_name}/GetNext/EmbeddingUpdateByAddress"] + + if USE_DYNAMIC_EXPANSION: + table_dump_op_name_list = lookup_byaddress + update_byaddress + dump_emb_op_info[table_name] = {"emb_look_ops": lookup_byaddress, + "emb_update_ops":update_byaddress} + else: + table_dump_op_name_list = look_up_by_id_offset + update_by_id_offset + dump_emb_op_info[table_name] = {"emb_look_ops": look_up_by_id_offset, + "emb_update_ops":update_by_id_offset} -def sess_config(dump_data=False, dump_path="./dump_output", dump_steps="0|1|2", use_deterministic=0): + dump_op_list.extend(table_dump_op_name_list) + + dump_ops_string = " ".join(dump_op_list) + PrecisionDumpInfo.add_item("dump_emb_op_info", dump_emb_op_info) + npu_custom_op.parameter_map["dump_layer"].s = tf.compat.as_bytes(dump_ops_string) + + +def construct_npu_sess_config(dump_data=False, use_deterministic=0): session_config = tf.compat.v1.ConfigProto(allow_soft_placement=False, log_device_placement=False) - + session_config.gpu_options.allow_growth = True custom_op = session_config.graph_options.rewrite_options.custom_optimizers.add() custom_op.name = "NpuOptimizer" - custom_op.parameter_map["mix_compile_mode"].b = False - custom_op.parameter_map["use_off_line"].b = True - custom_op.parameter_map["min_group_size"].b = 1 - custom_op.parameter_map["HCCL_algorithm"].s = tf.compat.as_bytes("level0:pairwise;level1:pairwise") - custom_op.parameter_map["enable_data_pre_proc"].b = True - custom_op.parameter_map["iterations_per_loop"].i = 1 + construct_basic_config(custom_op) if use_deterministic: - custom_op.parameter_map["precision_mode"].s = tf.compat.as_bytes("must_keep_origin_dtype") - custom_op.parameter_map["deterministic"].i = 1 - else: - custom_op.parameter_map["precision_mode"].s = tf.compat.as_bytes("allow_mix_precision") - custom_op.parameter_map["hcom_parallel"].b = False - custom_op.parameter_map["op_precision_mode"].s = tf.compat.as_bytes("op_impl_mode.ini") - custom_op.parameter_map["op_execute_timeout"].i = 2000 + construct_deterministic_config(custom_op) if dump_data: - """ - To see the details, please refer to the descriptions at official web site - """ - custom_op.parameter_map["enable_dump"].b = True - custom_op.parameter_map["dump_path"].s = tf.compat.as_bytes(dump_path) - custom_op.parameter_map["dump_step"].s = tf.compat.as_bytes(dump_steps) - custom_op.parameter_map["dump_mode"].s = tf.compat.as_bytes("all") + construct_op_dump_config(custom_op) session_config.graph_options.rewrite_options.remapping = RewriterConfig.OFF session_config.graph_options.rewrite_options.memory_optimization = RewriterConfig.OFF diff --git a/examples/demo/little_demo/main.py b/examples/demo/little_demo/main.py index cfaecbde4493b66795b3d0fcf9cc293c309ae34b..d4ba7dbf9d165ad7d37b22b02bc51430426b95f3 100644 --- a/examples/demo/little_demo/main.py +++ b/examples/demo/little_demo/main.py @@ -41,6 +41,10 @@ from model import MyModel from optimizer import create_dense_and_sparse_optimizer from run_mode import RunMode, UseMode +from config import USE_DYNAMIC, USE_DYNAMIC_EXPANSION, USE_MULTI_LOOKUP, PRECISION_CHECK, GLOBAL_RANDOM_SEED, \ + MODIFY_GRAPH_FLAG, USE_TIMESTAMP, USE_ONE_SHOT, USE_DETERMINISTIC, MULTI_LOOKUP_TIMES +from utils import PrecisionDumpInfo + tf.compat.v1.disable_eager_execution() _SSD_SAVE_PATH = ["ssd_data"] # user should make sure directory exist and clean before training @@ -92,7 +96,7 @@ def build_graph(hash_table_list, is_train, feature_spec_list=None, config_dict=N [cfg.user_send_cnt, cfg.item_send_cnt], [True, True], [cfg.user_hashtable_dim, cfg.item_hashtable_dim]] - if use_multi_lookup: + if USE_MULTI_LOOKUP: # add `MULTI_LOOKUP_TIMES` times for i, _ in enumerate(input_list): input_list[i].extend([input_list[i][0]] * MULTI_LOOKUP_TIMES) @@ -106,7 +110,7 @@ def build_graph(hash_table_list, is_train, feature_spec_list=None, config_dict=N [cfg.user_send_cnt, cfg.item_send_cnt], [True, True], [cfg.user_hashtable_dim, cfg.item_hashtable_dim]] - if use_multi_lookup: + if USE_MULTI_LOOKUP: # add `MULTI_LOOKUP_TIMES` times for i, _ in enumerate(input_list): if i == 0: @@ -130,7 +134,7 @@ def create_feature_spec_list(use_timestamp=False): access_threshold=access_threshold, eviction_threshold=eviction_threshold, faae_coefficient=4)] - if use_multi_lookup: + if USE_MULTI_LOOKUP: # add `MULTI_LOOKUP_TIMES` times for _ in range(MULTI_LOOKUP_TIMES): feature_spec_list.append(FeatureSpec("user_ids", table_name="user_table", @@ -171,6 +175,14 @@ def _clear_saved_model() -> None: os.makedirs(sub_path, mode=0o550, exist_ok=True) logger.info(f"Create dir:{sub_path}") +def index_initializer(shape, dtype=None, partition_info=None): + # shape 是一个元组,表示张量的形状,例如 (rows, cols) + rows, cols = shape + # 创建一个与shape相同大小的列表,用于存储初始化值 + values = [[i * 1e06 + j * 1e-20 for j in range(cols)] for i in range(rows)] + # 将列表转换为numpy数组 + return tf.constant(values, dtype=dtype) + if __name__ == "__main__": tf.compat.v1.logging.set_verbosity(tf.compat.v1.logging.ERROR) @@ -188,30 +200,15 @@ if __name__ == "__main__": # 评估多少步切换为训练 EVAL_STEPS = 10 # 训练多少步进行保存 - SAVING_INTERVAL = 100 - - # get init configuration - try: - use_dynamic = bool(int(os.getenv("USE_DYNAMIC", 0))) - use_dynamic_expansion = bool(int(os.getenv("USE_DYNAMIC_EXPANSION", 0))) - use_multi_lookup = bool(int(os.getenv("USE_MULTI_LOOKUP", 1))) - MODIFY_GRAPH_FLAG = bool(int(os.getenv("USE_MODIFY_GRAPH", 0))) - USE_TIMESTAMP = bool(int(os.getenv("USE_TIMESTAMP", 0))) - USE_ONE_SHOT = bool(int(os.getenv("USE_ONE_SHOT", 0))) - USE_DETERMINISTIC = bool(int(os.getenv("USE_DETERMINISTIC", 0))) - except ValueError as err: - raise ValueError("please correctly config USE_MPI or USE_DYNAMIC or USE_DYNAMIC_EXPANSION or " - "USE_MULTI_LOOKUP or USE_MODIFY_GRAPH or USE_TIMESTAMP or USE_ONE_SHOT or USE_DETERMINISTIC" - "only 0 or 1 is supported.") from err - - try: - MULTI_LOOKUP_TIMES = int(os.getenv("MULTI_LOOKUP_TIMES", 2)) - except ValueError as err: - raise ValueError("please correctly config MULTI_LOOKUP_TIMES only int is supported.") from err + SAVING_INTERVAL = 1 + + task_config = {"use_dynamic":USE_DYNAMIC, "use_dynamic_expansion":USE_DYNAMIC_EXPANSION, "use_multi_lookup":USE_MULTI_LOOKUP, + "modify_graph_flag":MODIFY_GRAPH_FLAG, "use_timestamp":USE_TIMESTAMP, "use_one_shot":USE_ONE_SHOT, "use_deterministic":USE_DETERMINISTIC, "multi_lookup_times":MULTI_LOOKUP_TIMES} + PrecisionDumpInfo.add_item(key="task_config", value=task_config) if USE_DETERMINISTIC: - np.random.seed(128) - tf.random.set_random_seed(128) + np.random.seed(GLOBAL_RANDOM_SEED) + tf.random.set_random_seed(GLOBAL_RANDOM_SEED) if_load = False save_path = "./saved-model" @@ -228,13 +225,13 @@ if __name__ == "__main__": eval_steps=EVAL_STEPS, save_steps=SAVING_INTERVAL, max_steps=MAX_TRAIN_STEPS, - use_dynamic=use_dynamic, - use_dynamic_expansion=use_dynamic_expansion, + use_dynamic=USE_DYNAMIC, + use_dynamic_expansion=USE_DYNAMIC_EXPANSION, if_load=if_load) cfg = Config() # multi lookup config, batch size: 32 * 128 = 4096 - if use_multi_lookup and MULTI_LOOKUP_TIMES > 2: + if USE_MULTI_LOOKUP and MULTI_LOOKUP_TIMES > 2: cfg.batch_size = 32 # access_threshold unit counts; eviction_threshold unit seconds @@ -245,8 +242,10 @@ if __name__ == "__main__": config_for_item_table = dict(access_threshold=cfg.access_threshold, eviction_threshold=cfg.eviction_threshold, faae_coefficient=4) ACCESS_AND_EVICT = dict(user_table=config_for_user_table, item_table=config_for_item_table) + train_feature_spec_list = None eval_feature_spec_list = None + if not MODIFY_GRAPH_FLAG: train_feature_spec_list = create_feature_spec_list(use_timestamp=USE_TIMESTAMP) eval_feature_spec_list = create_feature_spec_list(use_timestamp=USE_TIMESTAMP) @@ -270,9 +269,10 @@ if __name__ == "__main__": cache_mode = os.getenv("CACHE_MODE") if cache_mode not in cache_mode_dict.keys(): raise ValueError(f"cache mode must in {list(cache_mode_dict.keys())}, get:{cache_mode}") - if cache_mode in ["DDR", "SSD"] and not use_dynamic: + if cache_mode in ["DDR", "SSD"] and not USE_DYNAMIC: logger.warning("when cache_mode in [DDR, SSD], suggest use_dynamic=true to avoid tuning size parameter") - emb_initializer = tf.compat.v1.constant_initializer(0) if USE_DETERMINISTIC \ + + emb_initializer = tf.compat.v1.constant_initializer(0.1) if USE_DETERMINISTIC or PRECISION_CHECK \ else tf.compat.v1.truncated_normal_initializer() user_hashtable = create_table(key_dtype=tf.int64, dim=tf.TensorShape([cfg.user_hashtable_dim]), @@ -292,6 +292,7 @@ if __name__ == "__main__": train_model = None train_batch = None table_list = [user_hashtable, item_hashtable] + if use_mode in [UseMode.TRAIN, UseMode.LOAD_AND_TRAIN]: train_iterator, train_model, train_batch = build_graph( table_list, is_train=True, @@ -299,14 +300,15 @@ if __name__ == "__main__": config_dict=ACCESS_AND_EVICT, batch_number=MAX_DATASET_GENERATE_TRAIN * get_rank_size() ) + eval_iterator, eval_model, eval_batch = build_graph(table_list, is_train=False, feature_spec_list=eval_feature_spec_list, config_dict=ACCESS_AND_EVICT, batch_number=MAX_DATASET_GENERATE_EVAL * get_rank_size()) dense_variables, sparse_variables = get_dense_and_sparse_variable() - params = {"train_batch": train_batch, "eval_batch": eval_batch, "use_one_shot": USE_ONE_SHOT, - "use_deterministic": USE_DETERMINISTIC} + params = {"train_batch": train_batch, "eval_batch": eval_batch, "use_one_shot": USE_ONE_SHOT} + run_mode = RunMode( MODIFY_GRAPH_FLAG, USE_TIMESTAMP, table_list, optimizer_list, train_model, eval_model, train_iterator, eval_iterator, MAX_TRAIN_STEPS, EVAL_STEPS, params @@ -315,6 +317,7 @@ if __name__ == "__main__": # start host pipeline if not MODIFY_GRAPH_FLAG: start_asc_pipeline() + # start modify graph if MODIFY_GRAPH_FLAG and use_mode not in [UseMode.TRAIN, UseMode.LOAD_AND_TRAIN]: logger.info("start to modifying graph") diff --git a/examples/demo/little_demo/model.py b/examples/demo/little_demo/model.py index 526f421c8f8b8596121e95144e0a2c57301aadb0..7d16bba218cbf1b562d2e230c19869f992068e3f 100644 --- a/examples/demo/little_demo/model.py +++ b/examples/demo/little_demo/model.py @@ -18,6 +18,8 @@ from __future__ import print_function import tensorflow as tf +from config import GLOBAL_RANDOM_SEED + class MyModel: def __init__(self): @@ -41,7 +43,7 @@ class MyModel: with tf.compat.v1.variable_scope("mlp", reuse=tf.compat.v1.AUTO_REUSE): for i in range(len(self.all_layer_dims) - 2): self.h_w.append(tf.compat.v1.get_variable('h%d_w' % (i + 1), shape=self.all_layer_dims[i: i + 2], - initializer=tf.random_uniform_initializer(-0.01, 0.01), + initializer=tf.random_uniform_initializer(-0.01, 0.01, GLOBAL_RANDOM_SEED), dtype=tf.float32, collections=[tf.compat.v1.GraphKeys.GLOBAL_VARIABLES, "deep", "mlp_wts"])) self.h_b.append( @@ -51,7 +53,7 @@ class MyModel: collections=[tf.compat.v1.GraphKeys.GLOBAL_VARIABLES, "deep", "mlp_bias"])) i += 1 self.h_w_head_0 = tf.compat.v1.get_variable('h_w_head_0', shape=self.all_layer_dims[i: i + 2], - initializer=tf.random_uniform_initializer(-0.01, 0.01), + initializer=tf.random_uniform_initializer(-0.01, 0.01, GLOBAL_RANDOM_SEED), dtype=tf.float32, collections=[tf.compat.v1.GraphKeys.GLOBAL_VARIABLES, "deep", "mlp_wts"]) self.h_b_head_0 = tf.compat.v1.get_variable('h_b_head_0', shape=[self.all_layer_dims[i + 1]], @@ -59,7 +61,7 @@ class MyModel: dtype=tf.float32, collections=[tf.compat.v1.GraphKeys.GLOBAL_VARIABLES, "deep", "mlp_bias"]) self.h_w_head_1 = tf.compat.v1.get_variable('h_w_head_1', shape=self.all_layer_dims[i: i + 2], - initializer=tf.random_uniform_initializer(-0.01, 0.01), + initializer=tf.random_uniform_initializer(-0.01, 0.01, GLOBAL_RANDOM_SEED), dtype=tf.float32, collections=[tf.compat.v1.GraphKeys.GLOBAL_VARIABLES, "deep", "mlp_wts"]) self.h_b_head_1 = tf.compat.v1.get_variable('h_b_head_1', shape=[self.all_layer_dims[i + 1]], diff --git a/examples/demo/little_demo/run.sh b/examples/demo/little_demo/run.sh index 5c5d9d1d730bafdcb68367e86ad3879511db0c5e..6571789ce88474bd9e9a59c45dfe883a5aa6546c 100644 --- a/examples/demo/little_demo/run.sh +++ b/examples/demo/little_demo/run.sh @@ -23,13 +23,16 @@ export USE_MODE="train" # if train mode, will remove dir ./saved-model export CACHE_MODE="HBM" # 获取输入参数:py、ip -if [ $# -ge 1 ]; then - py=$1 - ip=$2 -else - echo "for example: bash run.sh main.py 10.10.10.10 or bash run.sh main.py" - exit 1 -fi +# if [ $# -ge 1 ]; then +# py="$1" +# ip=$2 +# else +# echo "for example: bash run.sh main.py 10.10.10.10 or bash run.sh main.py" +# exit 1 +# fi + +py=main.py +ip=127.0.0.1 # 检查输入的python文件是否合法 if [[ $py =~ ^[a-z0-9_]+\.py$ ]]; then @@ -65,12 +68,13 @@ if [ -n "$ip" ]; then fi cur_path=`pwd` -mx_rec_package_path="/usr/local/python3.7.5/lib/python3.7/site-packages/mx_rec" # please config +mx_rec_package_path=$(dirname "$(dirname "$(which python3.7)")")/lib/python3.7/site-packages/mx_rec +# mx_rec_package_path="/usr/local/python3.7.5/lib/python3.7/site-packages/mx_rec" # please config so_path=${mx_rec_package_path}/libasc # GLOG_stderrthreshold -2:TRACE -1:DEBUG 0:INFO 1:WARN 2.ERROR, 默认为INFO mpi_args='-x BIND_INFO="0:12 12:48 60:48" -x GLOG_stderrthreshold=0 -x GLOG_logtostderr=true -bind-to none -x NCCL_SOCKET_IFNAME=docker0 -mca btl_tcp_if_exclude docker0' interface="lo" -local_rank_size=8 # 每个节点使用的NPU卡数 +local_rank_size=1 # 每个节点使用的NPU卡数 num_server=1 # 训练节点数 num_process=$((${num_server} * ${local_rank_size})) # 训练总的进程数,等于使用的NPU卡的总数 @@ -89,7 +93,7 @@ export MXREC_MODE="ASC" ################# 参数配置 ###################### export USE_DYNAMIC=1 # 0:静态shape;1:动态shape -export USE_DYNAMIC_EXPANSION=0 # 0:关闭动态扩容;1: 开启动态扩容 +export USE_DYNAMIC_EXPANSION=1 # 0:关闭动态扩容;1: 开启动态扩容 export USE_MULTI_LOOKUP=1 # 0:一表一查;1:一表多查 export MULTI_LOOKUP_TIMES=2 # 一表多查次数:默认2,上限127(因为一表已经有一查);仅当export USE_MULTI_LOOKUP=1时生效 export USE_MODIFY_GRAPH=1 # 0:feature spec模式;1:自动改图模式 @@ -101,7 +105,22 @@ export USE_COMBINE_FAAE=0 # 0: separate history when faae; 1: combine hist export KEY_PROCESS_THREAD_NUM=6 #default 6, max 10 export FAST_UNIQUE=0 #if use fast unique export MGMT_HBM_TASK_MODE=0 #if async h2d (get and send tensors) -################################################ +############## DUMP CANN计算图 ############## +# export DUMP_GE_GRAPH=3 +# export DUMP_GRAPH_LEVEL=3 +# if [ "$USE_DYNAMIC_EXPANSION" == 1 ]; then +# dyn_scope="dyn" +# else +# dyn_scope="nodyn" +# fi +# export DUMP_GRAPH_PATH=${cur_path}/${current_date_time}_${dyn_scope}_dumpgraph_${DUMP_GE_GRAPH}_${DUMP_GRAPH_LEVEL} +############## 精度对齐相关 ############## +export PRECISION_CHECK=1 +if [ "$PRECISION_CHECK" == 1 ]; then + export USE_DETERMINISTIC=1 + export KEY_PROCESS_THREAD_NUM=1 +fi + # 帮助信息,不需要修改 if [[ $1 == --help || $1 == -h ]];then diff --git a/examples/demo/little_demo/run_mode.py b/examples/demo/little_demo/run_mode.py index 1a15fcc61b7d82b17847d36c1076234d2b7c8074..83f7eefded7a3667cd9c3bc70533cd26afdde216 100644 --- a/examples/demo/little_demo/run_mode.py +++ b/examples/demo/little_demo/run_mode.py @@ -18,9 +18,10 @@ import os import sys from typing import List +import numpy as np import tensorflow as tf -from config import sess_config + from mx_rec.util.variable import get_dense_and_sparse_variable from mx_rec.util.tf_version_adapter import hccl_ops @@ -31,6 +32,8 @@ from mx_rec.util.ops import import_host_pipeline_ops from mx_rec.util.initialize import ConfigInitializer from mx_rec.util.communication.hccl_ops import get_rank_id, get_rank_size +from config import construct_npu_sess_config, PRECISION_CHECK, USE_DETERMINISTIC +from utils import PrecisionDumpInfo, PRECISION_CHECK_PATH, PRECISION_DUMP_STEP class UseMode(BaseEnum): TRAIN = "train" @@ -39,7 +42,6 @@ class UseMode(BaseEnum): class RunMode: - def __init__( self, is_modify_graph: bool, is_faae: bool, table_list: list, optimizer_list: list, train_model, eval_model, train_iterator, eval_iterator, max_train_steps: int, infer_steps: int, params: dict): @@ -47,7 +49,7 @@ class RunMode: self.is_faae = is_faae self.use_deterministic = params.get("use_deterministic") self.session = tf.compat.v1.Session( - config=sess_config(dump_data=False, use_deterministic=self.use_deterministic)) + config=construct_npu_sess_config(dump_data=PRECISION_CHECK, use_deterministic=USE_DETERMINISTIC)) self.train_model = train_model self.train_iterator = train_iterator self.eval_model = eval_model @@ -118,6 +120,7 @@ class RunMode: def train(self, train_interval: int, saving_interval: int, if_load: bool, model_file: List[str]): self.set_train_ops() + # In train mode, graph modify needs to be performed after compute gradients if self.is_modify_graph: logger.info("start to modifying graph") @@ -129,6 +132,7 @@ class RunMode: self.session.run(initializer) else: logger.debug(f"use one shot iterator and modify graph is `{self.is_modify_graph}`.") + self.saver = tf.compat.v1.train.Saver() latest_ckpt_step = 0 @@ -142,12 +146,18 @@ class RunMode: if self.max_train_steps == -1: self.max_train_steps = sys.maxsize # 消耗全部数据 + + dump_precision_ckpt(self.session, self.saver, 0) + PrecisionDumpInfo.write_dump_info() + for i in range(start_step, start_step + self.max_train_steps): logger.info("################ training at step %d ################", i) try: _, loss = self.session.run([self.train_ops, self.train_model.loss_list]) if self.use_deterministic: logger.info(f"train_loss: {loss[0]}") + if i == 2: + break except tf.errors.OutOfRangeError: logger.info("Encounter the end of Sequence for training.") break @@ -158,6 +168,8 @@ class RunMode: if train_interval != -1 and (i - latest_ckpt_step) % train_interval == 0: self.evaluate() + + dump_precision_ckpt(self.session, self.saver, i) if saving_interval != -1 and (i - latest_ckpt_step) % saving_interval == 0: self.saver.save(self.session, f"./saved-model/model", global_step=i) @@ -166,6 +178,8 @@ class RunMode: logger.info("############### set_threshold at step:%d ################", i) self.change_threshold() + print("yz_debug ====================================================") + dump_precision_dataset(self.session, self.train_iterator) # save last step without duplication if i % saving_interval != 0: self.saver.save(self.session, f"./saved-model/model", global_step=i) @@ -210,4 +224,32 @@ def get_load_step(model_file: List[str]): latest_step = step if latest_step == -1: raise RuntimeError("latest model not found") - return latest_step \ No newline at end of file + return latest_step + + +def dump_precision_dataset(sess, train_iterator): + initializer = train_iterator.initializer + data_batch = train_iterator.get_next() + if PRECISION_CHECK: + try: + sess.run(initializer) + batch_index = 0 + while True and batch_index in PRECISION_DUMP_STEP: + batch_data = sess.run(data_batch) # 获取批次数据 + # 将批次数据中的每个特征转换为 NumPy 数组并保存 + + for key, value in batch_data.items(): + folder_name = PRECISION_CHECK_PATH + '/01dump_dataset' + os.makedirs(folder_name, exist_ok=True) + + filename = folder_name + f'/data_batch_{batch_index}_{key}.npy' + np.save(filename, value) # 保存 NumPy 数组到文件 + print(f'Saved {filename}') + batch_index += 1 + except tf.errors.OutOfRangeError: + raise IndexError("data set end.") + +def dump_precision_ckpt(sess, saver, current_step): + if PRECISION_CHECK and current_step in PRECISION_DUMP_STEP: + dump_ckpt_path = PRECISION_CHECK_PATH + "/02dump_model/model" + saver.save(sess, dump_ckpt_path, global_step=current_step) \ No newline at end of file