diff --git a/tools/atomic/gen_mt_data_0to1e.py b/tools/atomic/gen_mt_data_0to1e.py new file mode 100644 index 0000000000000000000000000000000000000000..b9c89c6507257fdc4a2dc8b8b9204310bb1a5b67 --- /dev/null +++ b/tools/atomic/gen_mt_data_0to1e.py @@ -0,0 +1,78 @@ +import numpy as np +import tensorflow as tf +import random + +np.random.seed(0) + +line_per_sample = 10000 +samples_num = 10000 * 800 # +sparse_feat_list = ['feat_ids'] +# todo +sparse_feat_len = [100] + +# uniq_ratio = pd.read_csv("./uniq_ratio.csv") +# uniq_ratio["uniq_num"] = round(uniq_ratio["uniq_ratio"] * 301) + +num = 0 +import sys + +hot_zhanbi = sys.argv[1:][0] +hot_zhanbi = float(hot_zhanbi)/10 +print(hot_zhanbi) + +tfpath = "/home/insert/data"+str(hot_zhanbi) +import os +if not os.path.exists(tfpath): + os.mkdir(tfpath) + +tfpath = "/home/insert/data"+str(hot_zhanbi)+"/tf" + +part1=np.array(random.sample(range(0 , 2), 1) ) + +def write_records(writer,line_cnt,file_cnt): + features = { + 'label': tf.train.Feature( + float_list=tf.train.FloatList(value=np.random.randint(2, size=line_per_sample).tolist())) + } + + count = 0 + for i, sparse_feat in enumerate(sparse_feat_list): + np.random.seed(count) + # global num + # print("process num: ",num) + print("===sparse=", sparse_feat) + part2=np.array(random.sample(range(0 + 100*line_per_sample*(10*file_cnt + line_cnt),100*line_per_sample*(10* file_cnt + line_cnt+1)),int(100 * line_per_sample* (1- hot_zhanbi)) )) + features[sparse_feat] = tf.train.Feature( + int64_list=tf.train.Int64List( + value=part1.astype(np.int64).tolist()* int(100 * line_per_sample * hot_zhanbi) + part2.astype(np.int64).tolist()) + ) + + count += 1 + features = tf.train.Features(feature=features) + example = tf.train.Example(features=features) + writer.write(example.SerializeToString()) + + +def gen_tfrecords(tfpath): + file_cnt = 0 + line_per_file = 10 + line_cnt = 0 + writer = tf.python_io.TFRecordWriter(f"{tfpath}_{file_cnt}.tfrecord") + sample_cnt = 0 + while True: + write_records(writer,line_cnt,file_cnt) + line_cnt += 1 + sample_cnt += line_per_sample + print(f">>>>>>>>>>>>count {sample_cnt} end.") + if sample_cnt == samples_num: + break + if line_cnt == line_per_file: + file_cnt += 1 + line_cnt = 0 + writer.close() + writer = tf.python_io.TFRecordWriter(f"{tfpath}_{file_cnt}.tfrecord") + writer.close() + + +if __name__ == '__main__': + gen_tfrecords(tfpath=tfpath) diff --git a/tools/atomic/model_info.md b/tools/atomic/model_info.md new file mode 100644 index 0000000000000000000000000000000000000000..a14533cc9371cd29c3c339ccddd1c60979f994f2 --- /dev/null +++ b/tools/atomic/model_info.md @@ -0,0 +1,19 @@ + +### 业务领域/场景 +原子操作测试 + +### 模型框架 +TF1.15.0/TF2.6.5 + +### 使用方法 +#### 生成数据集 +Python3 gen_mt_data_0to1e.py 5 (这里5的含义为重复度50%) +默认生成在 /home/insert/ 路径下 + +#### 运行测试 +Sparse.sh 需要根据实际环境进行配置 +测试 sparse lookup +./sparse.sh 8(卡数) sparse_lookup.py 8(emb size) 5(重复度) 1 0 0 + + + diff --git a/tools/atomic/sparse.sh b/tools/atomic/sparse.sh new file mode 100644 index 0000000000000000000000000000000000000000..56968da107f1e16e80e6d2ef778d121a0cba75eb --- /dev/null +++ b/tools/atomic/sparse.sh @@ -0,0 +1,60 @@ +#!/bin/bash +local_rank_size=$1 +host=localhost +py=$2 +my_dim=$3 +chongfudu=$4 +all2all=$5 +pre=$6 +slp=$7 +rm -rf /root/atc_data/* +rm -rf /root/ascend/* +rm -rf kernel_meta_* + + +export ALL2ALL=$5 +export HOST_PIPELINE_OPS_LIB_PATH=/usr/local/python3.7.5/lib/python3.7/site-packages/mx_rec/libasc/libasc_ops.so +export EMPTY_TENSOR=1 +export ENABLE_RUNTIME_V2=0 +mpi_path=/usr/local/openmpi/bin/ +so_path=/usr/local/python3.7.5/lib/python3.7/site-packages/mx_rec/libasc/ +interface="enp61s0f0" +ulimit -c 0 +export ASCEND_GLOBAL_LOG_LEVEL=0 +export TF_CPP_MIN_LOG_LEVEL=3 +export ASCEND_INSTALL_PATH=/usr/local/Ascend/latest/ +export ASCEND_HOME_PATH=${ASCEND_INSTALL_PATH} +export ASCEND_LATEST_INSTALL_PATH=/usr/local/Ascend +#export ASCEND_HOME_PATH=${ASCEND_INSTALL_PATH}/ +CANN_BIN_PATH=${ASCEND_HOME_PATH}/bin:${ASCEND_HOME_PATH}/compiler/ccec_compiler/bin +CANN_PYTHONPATH=${ASCEND_HOME_PATH}/python/site-packages:${ASCEND_HOME_PATH}/opp/op_impl/built-in/ai_core/tbe #:${ASCEND_INSTALL_PATH}/tfplugin/latest/python/site-packages +PYTHON_BIN_PATH=/usr/local/python3.7.5/bin/ +export PATH=${mpi_path}/bin:${PYTHON_BIN_PATH}:${CANN_BIN_PATH}:$PATH +export PYTHONPATH=${PYTHONPATH}:/usr/local/Ascend/latest/python/site-packages:${so_path}:${CANN_PYTHONPATH} +export LD_PRELOAD=/lib64/libgomp.so.1 +CANN_LD_PATH=${ASCEND_HOME_PATH}/runtime/lib64:${ASCEND_HOME_PATH}/fwkacllib/lib64:${ASCEND_HOME_PATH}/lib64:${ASCEND_HOME_PATH}/lib64/plugin/opskernel:${ASCEND_HOME_PATH}/lib64/plugin/nnengine +export LD_LIBRARY_PATH=${so_path}:/usr/local/python3.7.5/lib/python3.7/site-packages/mx_rec/libasc/:/home/insert/src/platform/securec/lib/:${CANN_LD_PATH}:/home/opensource/opensource/hdf5/lib:/usr/local/lib:/usr/local/python3.7.5/lib:$LD_LIBRARY_PATH +export ASCEND_AICPU_PATH=${ASCEND_HOME_PATH} +export ASCEND_OPP_PATH=${ASCEND_HOME_PATH}/opp +export TOOLCHAIN_HOME=${ASCEND_HOME_PATH}/toolkit + +export BETTER_EXCEPTIONS=1 +mpi_args='-x BIND_INFO="0:48 48:48 96:48" -x SPDLOG_LEVEL=debug -bind-to none' +# rm logs +rm *txt >/dev/null +rm -rf /root/ascend/log/* + +# rm shm +for i in $(ipcs -m | tail -n +4 | awk {'print $2'}); do + ipcrm -m $i +done + +num_process=${local_rank_size} +host_string=${host//_/:${local_rank_size},node}:${local_rank_size} +echo run in $host_string + +interface="lo" + +#python3.7 -c "import tensorflow;print(tensorflow.__path__)" +horovodrun --network-interface ${interface} -np ${num_process} --mpi-args "${mpi_args}" --mpi -H localhost:${local_rank_size} \ + python3.7 ${py} --local_rank_size ${local_rank_size} --hccl_json hccl_json_${local_rank_size}p.json --my_dim ${my_dim} --chongfudu $chongfudu --pre $pre --slp $slp |tee temp_{$my_dim}_{$chongfudu}_{$ALL2ALL}_{$pre}_{$slp}.log diff --git a/tools/atomic/sparse_lookup.py b/tools/atomic/sparse_lookup.py new file mode 100644 index 0000000000000000000000000000000000000000..570c683e5934daa1d688708abad27639f347f73f --- /dev/null +++ b/tools/atomic/sparse_lookup.py @@ -0,0 +1,266 @@ +import os +import sys +import time +import argparse +import numpy as np +import tensorflow as tf +from mpi4py import MPI # must before emb_cache after SparseOps +import psutil +import sys +from sklearn.metrics import roc_auc_score + +from tensorflow.python.ops import math_ops +from tensorflow.python.framework import ops +from tensorflow.core.protobuf.rewriter_config_pb2 import RewriterConfig +from npu_bridge.hccl import hccl_ops +from npu_bridge.estimator import npu_ops + +from mx_rec.graph.modifier import modify_graph_and_start_emb_cache +from mx_rec.core.asc.manager import start_asc_pipeline +from mx_rec.core.asc.helper import FeatureSpec, get_asc_insert_func +from mx_rec.util.initialize import get_rank_size, init, clear_channel, get_rank_id, set_if_load, \ + terminate_config_initializer +from mx_rec.constants.constants import MxRecMode +from mx_rec.core.embedding import create_table, sparse_lookup +from mx_rec.util.initialize import get_ascend_global_hashtable_collection + +from sparse_ops.config import set_ascend_env + +USE_PIPELINE_TEST = False +USE_STATIC = False +USE_HOT = False +USE_EXPANSION = False + +from mx_rec.constants.constants import ASCEND_SPARSE_LOOKUP_LOCAL_EMB, ASCEND_SPARSE_LOOKUP_ID_OFFSET + + +class WideDeep: + def __init__(self, input_data, feature_spec_list, hashtable): + self.lbl_hldr = input_data["global_labels"][0] + self.input_data = input_data + self.feature_spec_list = feature_spec_list + self.hash_table_list = hashtable + self.forward() + + def forward(self): + for feature, hash_table in zip(self.feature_spec_list, self.hash_table_list): + self.embedding = sparse_lookup(hash_table, feature, 1024 * 1024 // rank_size, dim=None, is_train=True, + name="merged_embedding_lookup", modify_graph=False, batch=self.input_data) + + # with tf.control_dependencies([self.embedding]): + self.op = self.embedding[0][0] + return self.op + + +def input_fn_tfrecord(feature_spec_list, rank_id, local_rank_id, rank_size, data_path, file_pattern, total_batch_size, + num_epochs=1, perform_shuffle=False, training=True): + line_per_sample = 1024 * 8 + total_batch_size = int(total_batch_size / line_per_sample) + num_parallel = 8 + + def extract_fn(data_record): + features = { + 'label': tf.FixedLenFeature(shape=(line_per_sample,), dtype=tf.float32), + 'feat_ids': tf.FixedLenFeature(shape=(128 * line_per_sample,), dtype=tf.int64) + } + sample = tf.parse_single_example(data_record, features) + return sample + + def reshape_fn(batch): + batch['label'] = tf.reshape(batch['label'], [-1, ]) + batch['feat_ids'] = tf.reshape(batch['feat_ids'], [-1, 128]) + return batch + + all_files = os.listdir(data_path) + files = [os.path.join(data_path, f) for f in all_files if f.startswith(file_pattern)] + dataset = tf.data.TFRecordDataset(files, num_parallel_reads=num_parallel) + batch_size = total_batch_size // rank_size + dataset = dataset.shard(rank_size, rank_id) + dataset = dataset.repeat(num_epochs) + dataset = dataset.map(extract_fn, num_parallel_calls=num_parallel).batch(batch_size, + drop_remainder=True) + dataset = dataset.map(reshape_fn, num_parallel_calls=num_parallel) + insert_fn = get_asc_insert_func(tgt_key_specs=feature_spec_list, is_training=True, dump_graph=False) + dataset = dataset.map(insert_fn) + + dataset = dataset.prefetch(int(100)) + return dataset + + +if __name__ == '__main__': + parser = argparse.ArgumentParser(description='base') + parser.add_argument('--local_rank_size') + parser.add_argument('--hosts') + parser.add_argument('--hccl_json') + parser.add_argument('--my_dim') + parser.add_argument('--chongfudu') + parser.add_argument('--new_key') + parser.add_argument('--slp') + args = parser.parse_args() + local_rank_size = int(args.local_rank_size) + comm = MPI.COMM_WORLD + rank_id = comm.Get_rank() + rank_size = comm.Get_size() + print(f"rank {rank_id}/{rank_size}") + local_rank_id = rank_id % local_rank_size + set_ascend_env(rank_id, rank_size, local_rank_size, host=args.hosts, file=args.hccl_json) + + # create session + sess_config = tf.ConfigProto() + custom_op = sess_config.graph_options.rewrite_options.custom_optimizers.add() + custom_op.parameter_map["use_off_line"].b = True + custom_op.parameter_map["mix_compile_mode"].b = True + custom_op.name = "NpuOptimizer" + custom_op.parameter_map["precision_mode"].s = tf.compat.as_bytes('must_keep_origin_dtype') + sess_config.graph_options.rewrite_options.remapping = RewriterConfig.OFF + custom_op.parameter_map["enable_data_pre_proc"].b = True + sess_config.gpu_options.allow_growth = True + custom_op.parameter_map["hcom_parallel"].b = False + custom_op.parameter_map["HCCL_algorithm"].s = tf.compat.as_bytes("level0:fullmesh;level1:pairwise") + + custom_op.parameter_map["iterations_per_loop"].i = 10 + # custom_op.parameter_map["enable_dump"].b = True + # custom_op.parameter_map["dump_path"].s = tf.compat.as_bytes("./dump") + # custom_op.parameter_map["dump_step"].s = tf.compat.as_bytes("11|12") + # custom_op.parameter_map["dump_mode"].s = tf.compat.as_bytes("all") + # custom_op.parameter_map["op_debug_level"].i = 0 + custom_op.parameter_map["op_wait_timeout"].i = 500 + custom_op.parameter_map["op_execute_timeout"].i = 500 + custom_op.parameter_map["op_precision_mode"].s = tf.compat.as_bytes("op_impl_mode.ini") + custom_op.parameter_map["graph_memory_max_size"].s = tf.compat.as_bytes(str(30000000000)) + custom_op.parameter_map["variable_memory_max_size"].s = tf.compat.as_bytes(str(30000000000)) + # custom_op.parameter_map["profiling_mode"].b = True + # custom_op.parameter_map["profiling_options"].s = tf.compat.as_bytes( + # '{"output":"/home","training_trace":"on","task_trace":"on","fp_point":"","bp_point":"","aicpu":"on","aic_metrics":"PipeUtilization"}') + + global_start_time = time.time() + tf.set_random_seed(10086) + np.random.seed(10086) + + my_dim = int(args.my_dim) + print("my_dim=", my_dim) + + hot_zhanbi = args.chongfudu + hot_zhanbi = float(hot_zhanbi) / 10 + + # if hot_zhanbi == 0: + # hot_zhanbi = int(hot_zhanbi) + + config = { + "data_path": "./data1/data" + str(hot_zhanbi) + "_" + str(float(args.new_key)) + "/", + "train_file_pattern": "tf", + "test_file_pattern": "test", + "batch_size": 1024 * 8, + "field_num": 128, + "send_count": 1024 * 1024 // rank_size, # 65536 * 10 > 39(field num) * 16000(bz) + "id_emb_dim": my_dim, + "ext_emb_vec_size": my_dim, + "train_epoch": 1, + "dev_vocab_size": 100000001 + } + + # model run parameter + print_steps = 300 + evaluate_stride = 80000 # eval every 200 steps + eval_steps = -1 # 8 ranks 34 + stop_steps = 95 + # Hybrid step1.1: init cache + emb_name = "wide_deep_emb" + + dev_vocab_size = config["dev_vocab_size"] # 23120 + host_vocab_size = 0 + + init(True, rank_id=rank_id, rank_size=local_rank_size, train_interval=100, eval_steps=-1, + prefetch_batch_number=1, use_dynamic=0, use_hot=1, use_dynamic_expansion=0) + + tf.disable_eager_execution() + ###################################### + feature_spec_list = [ + FeatureSpec("feat_ids", feat_count=128, table_name="merged_sparse_embeddings", batch_size=config["batch_size"])] + with tf.device('/cpu:0'): + train_dataset = input_fn_tfrecord(feature_spec_list=feature_spec_list, + rank_id=rank_id, + local_rank_id=local_rank_id, + rank_size=rank_size, + data_path=config["data_path"], + file_pattern=config["train_file_pattern"], + total_batch_size=int(rank_size * config["batch_size"]), + perform_shuffle=(not USE_PIPELINE_TEST), + num_epochs=config["train_epoch"]) + train_iterator = train_dataset.make_initializable_iterator() + train_next_iter = train_iterator.get_next() + + train_input_data = {"global_labels": train_next_iter["label"], + "feat_ids": train_next_iter["feat_ids"], + } + + sparse_hashtable = create_table(key_dtype=tf.int64, + dim=tf.TensorShape([my_dim]), + name="merged_sparse_embeddings", + emb_initializer=tf.variance_scaling_initializer(mode="fan_avg", + distribution='normal', seed=0), + device_vocabulary_size=dev_vocab_size * local_rank_size, + mode=MxRecMode.mapping("ASC")) + + model = WideDeep(train_input_data, feature_spec_list, [sparse_hashtable]) + MODIFY_GRAPH_FLAG = False + if MODIFY_GRAPH_FLAG: + modify_graph_and_start_emb_cache(dump_graph=False) + else: + start_asc_pipeline() + + with tf.Session(config=sess_config) as sess: + sess.run(tf.global_variables_initializer()) + sess.run([train_iterator.initializer]) + # build model + print("start build wdl(single domain) model") + print("=========start============") + # start run loop + total_start_time = time.time() + current_steps = 0 + train_finished = False + time.sleep(int(args.slp)) + while not train_finished: + try: + current_steps += 1 + print("current step =", current_steps) + # + run_dict = { + "adam": model.op, + "lbl_hldr": model.lbl_hldr, + } + if current_steps == 1: + total_start_time = time.time() + start_time = time.time() + print("start sess run") + results = sess.run(fetches=run_dict) + print("start sess run 1") + end_time = time.time() + print(f"current_steps: {current_steps} ,step time:{(end_time - start_time) * 1000}") + if current_steps <= 5: + total_start_time = time.time() + if current_steps % print_steps == 0: + print("----------" * 10) + try: + print( + f"current_steps: {current_steps} ,deep_loss:{results['deep_loss']}," + f"e2etime per step:{(end_time - start_time) * 1000}") + except KeyError: + print(f"current_steps: {current_steps}") + print("----------" * 10) + + if current_steps >= stop_steps: + train_finished = True + # + except tf.errors.OutOfRangeError: + train_finished = True + + # train_finished + # emb_cache.destroy() + # MPI.Finalize() + print( + f"training {current_steps} steps, consume time: {(time.time() - total_start_time) / (current_steps - 5) * 1000} ") + + terminate_config_initializer() + # emb_cache.destroy() + # MPI.Finalize() diff --git a/tools/atomic/sparse_lookup_with_grad.py b/tools/atomic/sparse_lookup_with_grad.py new file mode 100644 index 0000000000000000000000000000000000000000..3d7d37e5bd89764df52b00bbbfcd8b254b094938 --- /dev/null +++ b/tools/atomic/sparse_lookup_with_grad.py @@ -0,0 +1,277 @@ +import os +import sys +import time +import argparse +import numpy as np +import tensorflow as tf +from mpi4py import MPI # must before emb_cache after SparseOps +import psutil +import sys +from sklearn.metrics import roc_auc_score + +from tensorflow.python.ops import math_ops +from tensorflow.python.framework import ops +from tensorflow.core.protobuf.rewriter_config_pb2 import RewriterConfig +from npu_bridge.hccl import hccl_ops +from npu_bridge.estimator import npu_ops + +from mx_rec.graph.modifier import modify_graph_and_start_emb_cache +from mx_rec.core.asc.manager import start_asc_pipeline +from mx_rec.core.asc.helper import FeatureSpec, get_asc_insert_func +from mx_rec.util.initialize import get_rank_size, init, clear_channel, get_rank_id, set_if_load, \ + terminate_config_initializer +from mx_rec.constants.constants import MxRecMode +from mx_rec.core.embedding import create_table, sparse_lookup +from mx_rec.util.initialize import get_ascend_global_hashtable_collection +from mx_rec.optimizers.lazy_adam import CustomizedLazyAdam +from sparse_ops.config import set_ascend_env + +USE_PIPELINE_TEST = False +USE_STATIC = False +USE_HOT = False +USE_EXPANSION = False + + +def create_hash_optimizer(): + return CustomizedLazyAdam() + + +def get_sparse_optimizer(): + sparse_optimizer = create_hash_optimizer() + return sparse_optimizer + + +class WideDeep: + def __init__(self, input_data, feature_spec_list, hashtable): + self.lbl_hldr = input_data["global_labels"][0] + self.input_data = input_data + self.feature_spec_list = feature_spec_list + self.hash_table_list = hashtable + self.forward() + + def forward(self): + for feature, hash_table in zip(self.feature_spec_list, self.hash_table_list): + self.embedding = sparse_lookup(hash_table, feature, 1024 * 1024 // rank_size, dim=None, is_train=True, + name="merged_embedding_lookup", modify_graph=False, batch=self.input_data) + self.loss = tf.reduce_mean(self.embedding, axis=0) + with tf.control_dependencies([self.loss]): + self.op = tf.no_op() + return self.op + + +def input_fn_tfrecord(feature_spec_list, rank_id, local_rank_id, rank_size, data_path, file_pattern, total_batch_size, + num_epochs=1, perform_shuffle=False, training=True): + line_per_sample = 1024 * 8 + total_batch_size = int(total_batch_size / line_per_sample) + num_parallel = 8 + + def extract_fn(data_record): + features = { + 'label': tf.FixedLenFeature(shape=(line_per_sample,), dtype=tf.float32), + 'feat_ids': tf.FixedLenFeature(shape=(128 * line_per_sample,), dtype=tf.int64) + } + sample = tf.parse_single_example(data_record, features) + return sample + + def reshape_fn(batch): + batch['label'] = tf.reshape(batch['label'], [-1, ]) + batch['feat_ids'] = tf.reshape(batch['feat_ids'], [-1, 128]) + return batch + + all_files = os.listdir(data_path) + files = [os.path.join(data_path, f) for f in all_files if f.startswith(file_pattern)] + dataset = tf.data.TFRecordDataset(files, num_parallel_reads=num_parallel) + batch_size = total_batch_size // rank_size + dataset = dataset.shard(rank_size, rank_id) + dataset = dataset.repeat(num_epochs) + dataset = dataset.map(extract_fn, num_parallel_calls=num_parallel).batch(batch_size, + drop_remainder=True) + dataset = dataset.map(reshape_fn, num_parallel_calls=num_parallel) + insert_fn = get_asc_insert_func(tgt_key_specs=feature_spec_list, is_training=True, dump_graph=False) + dataset = dataset.map(insert_fn) + dataset = dataset.prefetch(int(100)) + return dataset + + +if __name__ == '__main__': + parser = argparse.ArgumentParser(description='base') + parser.add_argument('--local_rank_size') + parser.add_argument('--hosts') + parser.add_argument('--hccl_json') + parser.add_argument('--my_dim') + parser.add_argument('--chongfudu') + parser.add_argument('--new_key') + parser.add_argument('--slp') + args = parser.parse_args() + local_rank_size = int(args.local_rank_size) + comm = MPI.COMM_WORLD + rank_id = comm.Get_rank() + rank_size = comm.Get_size() + print(f"rank {rank_id}/{rank_size}") + local_rank_id = rank_id % local_rank_size + set_ascend_env(rank_id, rank_size, local_rank_size, host=args.hosts, file=args.hccl_json) + + # create session + sess_config = tf.ConfigProto() + custom_op = sess_config.graph_options.rewrite_options.custom_optimizers.add() + custom_op.parameter_map["use_off_line"].b = True + custom_op.parameter_map["mix_compile_mode"].b = True + custom_op.name = "NpuOptimizer" + custom_op.parameter_map["precision_mode"].s = tf.compat.as_bytes('must_keep_origin_dtype') + sess_config.graph_options.rewrite_options.remapping = RewriterConfig.OFF + custom_op.parameter_map["enable_data_pre_proc"].b = True + sess_config.gpu_options.allow_growth = True + custom_op.parameter_map["hcom_parallel"].b = False + custom_op.parameter_map["HCCL_algorithm"].s = tf.compat.as_bytes("level0:fullmesh;level1:pairwise") + + custom_op.parameter_map["iterations_per_loop"].i = 5 + custom_op.parameter_map["enable_dump"].b = True + custom_op.parameter_map["dump_path"].s = tf.compat.as_bytes("./dump") + custom_op.parameter_map["dump_step"].s = tf.compat.as_bytes("1|2") + custom_op.parameter_map["dump_mode"].s = tf.compat.as_bytes("all") + custom_op.parameter_map["op_wait_timeout"].i = 500 + custom_op.parameter_map["op_execute_timeout"].i = 500 + custom_op.parameter_map["op_precision_mode"].s = tf.compat.as_bytes("op_impl_mode.ini") + custom_op.parameter_map["graph_memory_max_size"].s = tf.compat.as_bytes(str(30000000000)) + custom_op.parameter_map["variable_memory_max_size"].s = tf.compat.as_bytes(str(30000000000)) + + global_start_time = time.time() + tf.set_random_seed(10086) + np.random.seed(10086) + + my_dim = int(args.my_dim) + print("my_dim=", my_dim) + + hot_zhanbi = args.chongfudu + hot_zhanbi = float(hot_zhanbi) / 10 + + # if hot_zhanbi == 0: + # hot_zhanbi = int(hot_zhanbi) + + config = { + "data_path": "./data1/data" + str(hot_zhanbi) + "_" + str(float(args.new_key)) + "/", + "train_file_pattern": "tf", + "test_file_pattern": "test", + "batch_size": 1024 * 8, + "field_num": 128, + "send_count": 1024 * 1024 // rank_size, # 65536 * 10 > 39(field num) * 16000(bz) + "id_emb_dim": my_dim, + "ext_emb_vec_size": my_dim, + "train_epoch": 1, + "dev_vocab_size": 5000001 + } + + # model run parameter + print_steps = 300 + evaluate_stride = 80000 # eval every 200 steps + eval_steps = -1 # 8 ranks 34 + stop_steps = 5 + # Hybrid step1.1: init cache + emb_name = "wide_deep_emb" + + dev_vocab_size = config["dev_vocab_size"] # 23120 + host_vocab_size = 0 + + init(True, rank_id=rank_id, rank_size=local_rank_size, train_interval=100, eval_steps=-1, + prefetch_batch_number=1, use_dynamic=0, use_hot=1, use_dynamic_expansion=0) + + tf.disable_eager_execution() + ###################################### + feature_spec_list = [ + FeatureSpec("feat_ids", feat_count=128, table_name="merged_sparse_embeddings", batch_size=config["batch_size"])] + with tf.device('/cpu:0'): + train_dataset = input_fn_tfrecord(feature_spec_list=feature_spec_list, + rank_id=rank_id, + local_rank_id=local_rank_id, + rank_size=rank_size, + data_path=config["data_path"], + file_pattern=config["train_file_pattern"], + total_batch_size=int(rank_size * config["batch_size"]), + perform_shuffle=(not USE_PIPELINE_TEST), + num_epochs=config["train_epoch"]) + train_iterator = train_dataset.make_initializable_iterator() + train_next_iter = train_iterator.get_next() + + train_input_data = {"global_labels": train_next_iter["label"], + "feat_ids": train_next_iter["feat_ids"], + } + + sparse_optimizer_list = get_sparse_optimizer() + + sparse_hashtable = create_table(key_dtype=tf.int64, + dim=tf.TensorShape([my_dim]), + name="merged_sparse_embeddings", + emb_initializer=tf.variance_scaling_initializer(mode="fan_avg", + distribution='normal', seed=0), + device_vocabulary_size=dev_vocab_size * local_rank_size, + optimizer_list=sparse_optimizer_list, + mode=MxRecMode.mapping("ASC")) + + sparse_variables = tf.compat.v1.get_collection(get_ascend_global_hashtable_collection()) + model = WideDeep(train_input_data, feature_spec_list, [sparse_hashtable]) + + train_ops = [] + for loss, sparse_optimizer in zip([model.loss], [sparse_optimizer_list]): + sparse_grads = tf.gradients(loss, sparse_variables) + grads_and_vars = [(grad, variable) for grad, variable in zip(sparse_grads, sparse_variables)] + train_ops.append(sparse_optimizer.apply_gradients(grads_and_vars)) + + MODIFY_GRAPH_FLAG = False + if MODIFY_GRAPH_FLAG: + modify_graph_and_start_emb_cache(dump_graph=False) + else: + start_asc_pipeline() + + with tf.Session(config=sess_config) as sess: + sess.run(tf.global_variables_initializer()) + sess.run([train_iterator.initializer]) + # build model + print("start build wdl(single domain) model") + print("=========start============") + # start run loop + total_start_time = time.time() + current_steps = 0 + train_finished = False + time.sleep(int(args.slp)) + while not train_finished: + try: + current_steps += 1 + print("current step =", current_steps) + # + run_dict = { + "loss": model.op, + "adam": train_ops, + "lbl_hldr": model.lbl_hldr, + } + if current_steps == 1: + total_start_time = time.time() + start_time = time.time() + print("start sess run") + results = sess.run(fetches=run_dict) + print("start sess run 1") + end_time = time.time() + print(f"current_steps: {current_steps} ,step time:{(end_time - start_time) * 1000}") + if current_steps <= 5: + total_start_time = time.time() + if current_steps % print_steps == 0: + print("----------" * 10) + try: + print( + f"current_steps: {current_steps} ,deep_loss:{results['deep_loss']}," + f"e2etime per step:{(end_time - start_time) * 1000}") + except KeyError: + print(f"current_steps: {current_steps}") + print("----------" * 10) + + if current_steps >= stop_steps: + train_finished = True + + except tf.errors.OutOfRangeError: + train_finished = True + + # train_finished + print( + f"training {current_steps} steps, consume time: {(time.time() - total_start_time) / (current_steps - 5) * 1000} ") + + terminate_config_initializer() + MPI.Finalize() \ No newline at end of file diff --git a/tools/atomic/sparse_ops/__init__.py b/tools/atomic/sparse_ops/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..53640a7ea0715663c756097b99cfe5e75f08972d --- /dev/null +++ b/tools/atomic/sparse_ops/__init__.py @@ -0,0 +1,7 @@ +""" +init +""" +from __future__ import absolute_import +from sparse_ops.config import get_path + +__all__ = ["get_path", ] diff --git a/tools/atomic/sparse_ops/config.py b/tools/atomic/sparse_ops/config.py new file mode 100644 index 0000000000000000000000000000000000000000..f10d12fd110519fbafff51379be40910780f2ea1 --- /dev/null +++ b/tools/atomic/sparse_ops/config.py @@ -0,0 +1,111 @@ +""" +配置文件 +""" +from __future__ import absolute_import +import os +import json +import psutil + + +def get_path(): + """ + 打印当前行号 + """ + return os.path.dirname(__file__) + + +def gen_config(server_str, local_rank_size, path=None): + """ + 生成hccl配置 + """ + + def _device(local_rank_id, rank_id, server_id): + return { + "device_id": f"{local_rank_id}", + "device_ip": f'192.{local_rank_id % 4}.{server_id}.{1 + local_rank_id // 4}', + "rank_id": f"{rank_id}" + } + + def _server(server_id): + return { + "device": [], + "server_id": f"90.91.141.{server_id}" + } + + conf = { + "server_count": "-1", + "server_list": [], + "status": "completed", + "version": "1.0" + } + rank_id = 0 + servers = str(server_str).split('_') + conf['server_count'] = str(len(servers)) + for server in servers: + srv = _server(server) + for local_rank_id in range(local_rank_size): + dev = _device(local_rank_id, rank_id, server) + rank_id = rank_id + 1 + srv["device"].append(dev) + conf['server_list'].append(srv) + + conf_str = json.dumps(conf) + if path is None: + path = '/tmp/hccl.json' + with open(path, 'w') as file_handle: + file_handle.write(conf_str) + + +def set_ascend_env(rank, rank_size, local_rank_size, host, file=None, dev_id=-1, dev_index=-1): + """ + 配置昇腾相关的参数和环境变量,生成hccl配置 + """ + rank = str(rank) + rank_size = str(rank_size) + local_rank_size = int(local_rank_size) + host = str(host) + + os.environ["MOX_USE_NPU"] = "1" + os.environ["FUSION_TENSOR_SIZE"] = "2000000000" + os.environ["MOX_USE_TF_ESTIMATOR"] = "0" + os.environ["MOX_USE_TDT"] = "1" + os.environ["HEARTBEAT"] = "1" + os.environ["CONITNUE_TRAIN"] = "true" + + os.environ["RANK_ID"] = rank + local_rank_id = int(rank) % int(local_rank_size) + if dev_id != -1: + os.environ["DEVICE_ID"] = str(dev_id) + os.environ["ASCEND_DEVICE_ID"] = str(dev_id) + else: + os.environ["DEVICE_ID"] = str(local_rank_id) + os.environ["ASCEND_DEVICE_ID"] = str(local_rank_id) + if dev_index != -1: + os.environ["DEVICE_INDEX"] = str(dev_index) + else: + os.environ["DEVICE_INDEX"] = str(local_rank_id) + + os.environ["RANK_SIZE"] = rank_size + if file: + os.environ["RANK_TABLE_FILE"] = file + else: + gen_config(host, local_rank_size) + os.environ["RANK_TABLE_FILE"] = "/tmp/hccl.json" + os.environ["HCCL_CONNECT_TIMEOUT"] = "600" + + os.environ["JOB_ID"] = "10086" + os.environ["SOC_VERSION"] = "Ascend910" + os.environ["GE_AICPU_FLAG"] = "1" + os.environ["NEW_GE_FE_ID"] = "1" + os.environ["EXPERIMENTAL_DYNAMIC_PARTITION"] = "1" + os.environ["ENABLE_FORCE_V2_CONTROL"] = "1" + + +def bind_cpu(): + p = psutil.Process() + try: + bind_start = 48 + bind_count = 96 + p.cpu_affinity([bind_start + x for x in range(bind_count)]) + except IndexError: + print("error cpu bind info, skipped.") diff --git a/tools/atomic/sparse_ops/ops.py b/tools/atomic/sparse_ops/ops.py new file mode 100644 index 0000000000000000000000000000000000000000..35fe246269a3313b83eb5e6d3348ec878ea07bf8 --- /dev/null +++ b/tools/atomic/sparse_ops/ops.py @@ -0,0 +1,133 @@ +""" +sparse ops +""" +from __future__ import absolute_import +import tensorflow as tf +from npu_bridge.hccl import hccl_ops +from sparse_ops import utils +from mpi4py import MPI + +MPI.Init_thread(MPI.THREAD_MULTIPLE) # must before emb_cache +utils.init = True + + +class SparseOps: + """ + embedding相关的接口 + """ + + def __init__(self, fallback=False): + # context + self.fallback = fallback + self.all2all = hccl_ops.all_to_all_v + + def get_a2a_args(self, lookup_vec_size, mini_bs_w_field, rank_size, send_count, emb_vec_size): + """ + 获取a2a args信息 + """ + if self.fallback: + send_count = tf.cond(lookup_vec_size > send_count * rank_size, + lambda: mini_bs_w_field // rank_size, + lambda: send_count) + all2all_args = { + "sc": tf.cast([send_count * emb_vec_size] * rank_size, tf.int64), + "ss": tf.cast([send_count * emb_vec_size * i for i in range(rank_size)], tf.int64)} + all2all_args['rc'] = all2all_args['sc'] + all2all_args['rs'] = all2all_args['ss'] + return all2all_args, send_count * rank_size + + def forward_alltoall(self, all2all_args, restore_vec, hot_pos, emb_vec, emb_vec_size): + """ + emb的前向通信 + all2all_args:用all2all用到的参数 + restore_vec:恢复向量 + emb_vec:输入的emb + """ + emb_vec = tf.reshape(emb_vec, [-1]) + + result = self.all2all(send_data=emb_vec, + send_counts=all2all_args['sc'], + send_displacements=all2all_args['ss'], + recv_counts=all2all_args['rc'], + recv_displacements=all2all_args['rs'] + ) + + result = tf.reshape(result, + [-1, emb_vec_size], + name="after_all2all_reshape") + if hot_pos is not None: + result = tf.concat([tf.gather(result, hot_pos, name="hot_pos"), result], axis=0) + + output = tf.gather(result, restore_vec) + return output + + def forward_alltoallc(self, all2all_args, restore_vec, emb_vec, emb_vec_size, rank): + """ + emb的前向通信 + all2all_args:用all2all用到的参数 + restore_vec:恢复向量 + emb_vec:输入的emb + """ + emb_vec = tf.reshape(emb_vec, [-1]) + + result = hccl_ops.all_to_all_v_c(send_data=emb_vec, + send_count_matrix=all2all_args, + rank=rank + ) + + result = tf.reshape(result, + [-1, emb_vec_size], + name="after_all2all_reshape") + output = tf.gather(result, restore_vec) + return output + + def backward_alltoall(self, emb_grad, hot_pos, segment_ids, num_segments, all2all_args): + """ + emb梯度的反向通信 + id_emb_grad:原始梯度 + segment_ids:恢复向量 + num_segments:压缩后的长度 + """ + # unique_local_grad 2node shape 37755 same with rc total and num_segment + # unique_local_grad shape is [40052, 80] + if hot_pos is not None: + unique_local_grad = tf.math.unsorted_segment_sum(emb_grad, + segment_ids=segment_ids, + num_segments=num_segments + tf.shape(hot_pos)[0], + name="backward_combine") + hot, cold = tf.split(unique_local_grad, + [tf.shape(hot_pos)[0], tf.shape(unique_local_grad)[0] - tf.shape(hot_pos)[0]], axis=0) + unique_local_grad = tf.tensor_scatter_nd_update(cold, tf.expand_dims(hot_pos, 1), hot) + else: + unique_local_grad = tf.math.unsorted_segment_sum(emb_grad, + segment_ids=segment_ids, + num_segments=num_segments, name="backward_combine") + + unique_grad = self.all2all(send_data=unique_local_grad, + send_counts=all2all_args['rc'], + send_displacements=all2all_args['rs'], + recv_counts=all2all_args['sc'], + recv_displacements=all2all_args['ss'] + ) + return unique_grad + + def backward_alltoallc(self, emb_grad, segment_ids, num_segments, all2all_args, rank): + """ + emb梯度的反向通信 + id_emb_grad:原始梯度 + segment_ids:恢复向量 + num_segments:压缩后的长度 + """ + unique_local_grad = tf.math.unsorted_segment_sum(emb_grad, + segment_ids=segment_ids, + num_segments=num_segments, name="backward_combine") + # unique_local_grad 2node shape 37755 same with rc total and num_segment + # unique_local_grad shape is [40052, 80] + unique_local_grad = tf.reshape(unique_local_grad, [-1]) + + all2all_args = tf.transpose(all2all_args) + unique_grad = hccl_ops.all_to_all_v_c(send_data=unique_local_grad, + send_count_matrix=all2all_args, + rank=rank + ) + return unique_grad diff --git a/tools/atomic/sparse_ops/utils.py b/tools/atomic/sparse_ops/utils.py new file mode 100644 index 0000000000000000000000000000000000000000..07cf796db757ef817b86d0a74d0e111e7d2abd65 --- /dev/null +++ b/tools/atomic/sparse_ops/utils.py @@ -0,0 +1,23 @@ +""" +utils +""" +from __future__ import absolute_import +import tensorflow as tf +from mpi4py import rc + +tf.get_logger().setLevel("ERROR") +rc.initialize = False # if = True, The Init is done when "from mpi4py import MPI" is called + + +def ops(): + """ + 返回emb相关的算子 + """ + return tf.load_op_library("libcust_ops.so") + + +def dataset_ops(): + """ + 返回emb相关的算子 + """ + return tf.load_op_library("libasc_dataset_ops.so") diff --git "a/tools/atomic/\345\216\237\345\255\220\346\265\213\350\257\225\347\273\223\346\236\234-tf1.15-rec0630-cann530.xlsx" "b/tools/atomic/\345\216\237\345\255\220\346\265\213\350\257\225\347\273\223\346\236\234-tf1.15-rec0630-cann530.xlsx" new file mode 100644 index 0000000000000000000000000000000000000000..195f0ed29faf77934d67422b1775a106b03db272 Binary files /dev/null and "b/tools/atomic/\345\216\237\345\255\220\346\265\213\350\257\225\347\273\223\346\236\234-tf1.15-rec0630-cann530.xlsx" differ