From 5ec8b7accc0970e3f3f2fb02982dda17766cd662 Mon Sep 17 00:00:00 2001 From: 18608119613 Date: Wed, 28 May 2025 14:25:33 +0800 Subject: [PATCH 1/2] sgd fusion --- cust_op/fused_sgd/op_host/sgd.cpp | 222 ++++++++++++++++++ cust_op/fused_sgd/op_host/sgd_tiling.h | 37 +++ cust_op/fused_sgd/op_kernel/sgd.cpp | 37 +++ cust_op/fused_sgd/op_kernel/sgd_kernel.h | 198 ++++++++++++++++ cust_op/fused_sgd/op_kernel/sgd_kernel_base.h | 152 ++++++++++++ cust_op/fused_sgd/run.sh | 58 +++++ cust_op/fused_sgd/sgd.json | 68 ++++++ examples/dlrm/model/config.py | 5 +- examples/dlrm/model/gradient_descent_w.py | 72 ------ examples/dlrm/model/optimizer.py | 7 +- mx_rec/optimizers/gradient_descent.py | 42 +++- mx_rec/validator/validator.py | 14 ++ src/ops_tf/hybrid_dataset_ops.cpp | 11 + 13 files changed, 840 insertions(+), 83 deletions(-) create mode 100644 cust_op/fused_sgd/op_host/sgd.cpp create mode 100644 cust_op/fused_sgd/op_host/sgd_tiling.h create mode 100644 cust_op/fused_sgd/op_kernel/sgd.cpp create mode 100644 cust_op/fused_sgd/op_kernel/sgd_kernel.h create mode 100644 cust_op/fused_sgd/op_kernel/sgd_kernel_base.h create mode 100644 cust_op/fused_sgd/run.sh create mode 100644 cust_op/fused_sgd/sgd.json delete mode 100644 examples/dlrm/model/gradient_descent_w.py diff --git a/cust_op/fused_sgd/op_host/sgd.cpp b/cust_op/fused_sgd/op_host/sgd.cpp new file mode 100644 index 00000000..8e1b847b --- /dev/null +++ b/cust_op/fused_sgd/op_host/sgd.cpp @@ -0,0 +1,222 @@ +/* Copyright 2025. Huawei Technologies Co.,Ltd. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and + limitations under the License. +==============================================================================*/ +#include +#include +#include +#include "sgd_tiling.h" +#include "register/op_def_registry.h" +#include "tiling/platform/platform_ascendc.h" + +namespace optiling { + +constexpr uint32_t MAX_DIM_SIZE = 4096; +constexpr int RESERVE_UB_SIZE = 20 * 1024; + +enum class WeightDecay : uint64_t { + DISABLE = 0, + ENABLE = 1 +}; + +template +static ge::graphStatus CheckNullPointer(T* pointer, const char* errorMessage) +{ + if (pointer == nullptr) { + printf("%s nullptr\n", errorMessage); + return ge::GRAPH_FAILED; + } + + return ge::GRAPH_SUCCESS; +} + +static ge::graphStatus TilingShapeInfo(const gert::TilingContext* context, SgdTilingData &tilingData) +{ + const gert::StorageShape* gradShape = context->GetInputShape(0); + if (CheckNullPointer(gradShape, "gradShape") != ge::GRAPH_SUCCESS) { + return ge::GRAPH_FAILED; + } + const gert::StorageShape* inputVarShape = context->GetInputShape(2); + if (CheckNullPointer(inputVarShape, "inputVarShape") != ge::GRAPH_SUCCESS) { + return ge::GRAPH_FAILED; + } + + tilingData.set_batchSize(gradShape->GetStorageShape().GetDim(0)); + tilingData.set_dimSize(gradShape->GetStorageShape().GetDim(1)); + tilingData.set_tableSize(inputVarShape->GetStorageShape().GetDim(0)); + + if (tilingData.get_dimSize() > MAX_DIM_SIZE) { + printf("dimSize %d must meet range[1, 4096]\n", tilingData.get_dimSize()); + return ge::GRAPH_FAILED; + } + + auto tableDimSize = inputVarShape->GetStorageShape().GetDim(1); + if (tilingData.get_dimSize() != tableDimSize) { + printf("grad dim must equal table dim\n"); + return ge::GRAPH_FAILED; + } + + return ge::GRAPH_SUCCESS; +} + +static ge::graphStatus TilingAttr(gert::TilingContext* context, SgdTilingData &tilingData) +{ + auto attrs = context->GetAttrs(); + if (CheckNullPointer(attrs, "GetAttrs attrs") != ge::GRAPH_SUCCESS) { + return ge::GRAPH_FAILED; + } + + auto isClose = [](float value, float other) -> bool { + return std::fabs(value - other) <= std::numeric_limits::epsilon(); + }; + + float weightDecay = *attrs->GetAttrPointer(0); + if (isClose(weightDecay, float(0.0f))) { + context->SetTilingKey(static_cast(WeightDecay::DISABLE)); + } else { + context->SetTilingKey(static_cast(WeightDecay::ENABLE)); + } + tilingData.set_weightDecay(weightDecay); + + return ge::GRAPH_SUCCESS; +} + +static ge::graphStatus TilingCore(gert::TilingContext* context, SgdTilingData &tilingData) +{ + auto platformInfo = platform_ascendc::PlatformAscendC(context->GetPlatformInfo()); + uint32_t aivCoreNum = platformInfo.GetCoreNumAiv(); + if (aivCoreNum == 0) { + printf("coreNum is zero\n"); + return ge::GRAPH_FAILED; + } + + uint64_t ub; + platformInfo.GetCoreMemSize(platform_ascendc::CoreMemType::UB, ub); + if (ub <= RESERVE_UB_SIZE) { + printf("ubSize is not enough\n"); + return ge::GRAPH_FAILED; + } + tilingData.set_ubFreeSize(ub - RESERVE_UB_SIZE); + + auto bs = tilingData.get_batchSize(); + uint32_t actualCoreNum = (bs >= aivCoreNum) ? aivCoreNum : bs; + uint32_t splitNextCoreProcBs = bs / actualCoreNum; + uint32_t splitPrevCoreProcBs = splitNextCoreProcBs + 1; + uint32_t splitCoreIndex = bs % actualCoreNum; + + tilingData.set_actualCoreNum(actualCoreNum); + tilingData.set_splitNextCoreProcBs(splitNextCoreProcBs); + tilingData.set_splitPrevCoreProcBs(splitPrevCoreProcBs); + tilingData.set_splitCoreIndex(splitCoreIndex); + + context->SetBlockDim(actualCoreNum); + return ge::GRAPH_SUCCESS; +} + +static ge::graphStatus TilingFunc(gert::TilingContext* context) +{ + if (CheckNullPointer(context, "context") != ge::GRAPH_SUCCESS) { + return ge::GRAPH_FAILED; + } + + SgdTilingData tilingData; + if (TilingShapeInfo(context, tilingData) != ge::GRAPH_SUCCESS) { + return ge::GRAPH_FAILED; + } + + if (TilingAttr(context, tilingData) != ge::GRAPH_SUCCESS) { + return ge::GRAPH_FAILED; + } + + if (TilingCore(context, tilingData) != ge::GRAPH_SUCCESS) { + return ge::GRAPH_FAILED; + } + + tilingData.SaveToBuffer(context->GetRawTilingData()->GetData(), context->GetRawTilingData()->GetCapacity()); + context->GetRawTilingData()->SetDataSize(tilingData.GetDataSize()); + return ge::GRAPH_SUCCESS; +} +} + + +namespace ge { +static ge::graphStatus InferShape(gert::InferShapeContext* context) +{ + if (optiling::CheckNullPointer(context, "context") != ge::GRAPH_SUCCESS) { + return ge::GRAPH_FAILED; + } + + gert::Shape* outputVarShape = context->GetOutputShape(0); + if (optiling::CheckNullPointer(outputVarShape, "outputVarShape") != ge::GRAPH_SUCCESS) { + return ge::GRAPH_FAILED; + } + + const gert::Shape* inputVarShape = context->GetInputShape(2); + if (optiling::CheckNullPointer(inputVarShape, "inputVarShape") != ge::GRAPH_SUCCESS) { + return ge::GRAPH_FAILED; + } + *outputVarShape = *inputVarShape; + return GRAPH_SUCCESS; +} + +static ge::graphStatus InferDataType([[maybe_unused]] gert::InferDataTypeContext* context) +{ + return GRAPH_SUCCESS; +} + +} + + +namespace ops { +class Sgd : public OpDef { +public: + explicit Sgd(const char* name) : OpDef(name) + { + this->Input("gradient") + .ParamType(REQUIRED) + .DataType({ge::DT_FLOAT}) + .Format({ge::FORMAT_ND}) + .UnknownShapeFormat({ge::FORMAT_ND}); + this->Input("indices") + .ParamType(REQUIRED) + .DataType({ge::DT_INT32}) + .Format({ge::FORMAT_ND}) + .UnknownShapeFormat({ge::FORMAT_ND}); + this->Input("inputVar") + .ParamType(REQUIRED) + .DataType({ge::DT_FLOAT}) + .Format({ge::FORMAT_ND}) + .UnknownShapeFormat({ge::FORMAT_ND}); + this->Input("learningRate") + .ParamType(REQUIRED) + .DataType({ge::DT_FLOAT}) + .Format({ge::FORMAT_ND}) + .UnknownShapeFormat({ge::FORMAT_ND}); + this->Output("inputVar") + .ParamType(REQUIRED) + .DataType({ge::DT_FLOAT}) + .Format({ge::FORMAT_ND}) + .UnknownShapeFormat({ge::FORMAT_ND}); + this->Attr("weightDecay").AttrType(OPTIONAL).Float(0.0f); + + this->SetInferShape(ge::InferShape).SetInferDataType(ge::InferDataType); + + this->AICore() + .SetTiling(optiling::TilingFunc); + this->AICore().AddConfig("ascend910b"); + this->AICore().AddConfig("ascend910_93"); + } +}; + +OP_ADD(Sgd); +} \ No newline at end of file diff --git a/cust_op/fused_sgd/op_host/sgd_tiling.h b/cust_op/fused_sgd/op_host/sgd_tiling.h new file mode 100644 index 00000000..8d2f323b --- /dev/null +++ b/cust_op/fused_sgd/op_host/sgd_tiling.h @@ -0,0 +1,37 @@ +/* Copyright 2025. Huawei Technologies Co.,Ltd. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and + limitations under the License. +==============================================================================*/ + +#ifndef SGD_TILING_H +#define SGD_TILING_H + +#include "register/tilingdata_base.h" + +namespace optiling { +BEGIN_TILING_DATA_DEF(SgdTilingData) +TILING_DATA_FIELD_DEF(uint32_t, batchSize); +TILING_DATA_FIELD_DEF(uint32_t, tableSize); +TILING_DATA_FIELD_DEF(uint32_t, dimSize); +TILING_DATA_FIELD_DEF(uint32_t, actualCoreNum); +TILING_DATA_FIELD_DEF(uint32_t, ubFreeSize); +TILING_DATA_FIELD_DEF(uint32_t, splitNextCoreProcBs); +TILING_DATA_FIELD_DEF(uint32_t, splitPrevCoreProcBs); +TILING_DATA_FIELD_DEF(uint32_t, splitCoreIndex); +TILING_DATA_FIELD_DEF(float, weightDecay); +END_TILING_DATA_DEF; + +REGISTER_TILING_DATA_CLASS(Sgd, SgdTilingData) +} + +#endif \ No newline at end of file diff --git a/cust_op/fused_sgd/op_kernel/sgd.cpp b/cust_op/fused_sgd/op_kernel/sgd.cpp new file mode 100644 index 00000000..2584cd51 --- /dev/null +++ b/cust_op/fused_sgd/op_kernel/sgd.cpp @@ -0,0 +1,37 @@ +/* Copyright 2025. Huawei Technologies Co.,Ltd. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and + limitations under the License. +==============================================================================*/ +#include "sgd_kernel.h" + +extern "C" __global__ __aicore__ void sgd(GM_ADDR gradient, GM_ADDR indices, GM_ADDR inputVar, GM_ADDR learningRate, + GM_ADDR inputVar_ref, GM_ADDR workspace, GM_ADDR tiling) +{ + GET_TILING_DATA(tiling_data, tiling); + +#ifdef KERNEL_TASK_TYPE_DEFAULT + // Set kernel type with new versions of CANN to avoid matmul error during compiling. + // In previous versions of CANN, avoid matmul error by using '#ifndef __GET_CODE_CHANNEL__'. + KERNEL_TASK_TYPE_DEFAULT(KERNEL_TYPE_AIV_ONLY); +#endif + + if (TILING_KEY_IS(0)) { + SgdKernel kernel; + kernel.Init(gradient, indices, inputVar, learningRate, inputVar_ref, tiling_data); + kernel.Process(); + } else if (TILING_KEY_IS(1)) { + SgdKernel kernel; + kernel.Init(gradient, indices, inputVar, learningRate, inputVar_ref, tiling_data); + kernel.Process(); + } +} \ No newline at end of file diff --git a/cust_op/fused_sgd/op_kernel/sgd_kernel.h b/cust_op/fused_sgd/op_kernel/sgd_kernel.h new file mode 100644 index 00000000..643d8ec4 --- /dev/null +++ b/cust_op/fused_sgd/op_kernel/sgd_kernel.h @@ -0,0 +1,198 @@ +/* Copyright 2025. Huawei Technologies Co.,Ltd. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and + limitations under the License. +==============================================================================*/ +#ifndef SGD_KERNEL_H +#define SGD_KERNEL_H + +#include "sgd_kernel_base.h" + +// 主模板声明(可无具体实现) +template +class SgdKernel; + +// 特化版本1 针对不带weight decay场景的SGD +template +class SgdKernel : public SgdKernelBase { +public: + using Base = SgdKernelBase; + using Base::Base; + static constexpr uint32_t UB_BUFFER_SIZE = 2; + + __aicore__ inline SgdKernel() = default; + + __aicore__ inline void Init(GM_ADDR gradient, GM_ADDR indices, GM_ADDR inputVar, GM_ADDR learningRate, + GM_ADDR outputVar, const SgdTilingData &tilingData) + { + Base::InitBase(gradient, indices, inputVar, learningRate, outputVar, tilingData); + this->InitUbBuffer(tilingData.ubFreeSize); + this->neLr = T1(-1) * Base::lr; + } + + __aicore__ inline void Process() + { + auto nLoop = Base::procBs / Base::nLoopBs; + for (auto i = 0; i < nLoop; i++) { + this->DataCopyInIndices(i * Base::nLoopBs, Base::nLoopBs); + this->DataCopyInGrad(i * Base::nLoopBs, Base::nLoopBs); + this->Compute(Base::nLoopBs); + this->ScatterVarByIndices(Base::nLoopBs); + } + + auto nTail = Base::procBs % Base::nLoopBs; + if (nTail != 0) { + this->DataCopyInIndices(nLoop * Base::nLoopBs, nTail); + this->DataCopyInGrad(nLoop * Base::nLoopBs, nTail); + this->Compute(nTail); + this->ScatterVarByIndices(nTail); + } + } + +private: + __aicore__ inline void InitUbBuffer(uint32_t ubFreeByteSize) + { + Base::nLoopBs = ubFreeByteSize / (Base::alignDimSize * (UB_BUFFER_SIZE * sizeof(T1)) + sizeof(T2)); + Base::nLoopBs = Base::nLoopBs / Base::T2_DATA_BLOCK * Base::T2_DATA_BLOCK; + ASSERT(Base::nLoopBs != 0 && "nLoopBs cant be zeros!"); + + Base::pipe.InitBuffer(Base::gradQue, Base::BUFFER_NUM, Base::nLoopBs * Base::alignDimSize * sizeof(T1)); + Base::pipe.InitBuffer(Base::indicesQue, Base::BUFFER_NUM, Base::nLoopBs * sizeof(T2)); + Base::pipe.InitBuffer(Base::outputQue, Base::BUFFER_NUM, Base::nLoopBs * Base::alignDimSize * sizeof(T1)); + } + + __aicore__ inline void ScatterVarByIndices(int64_t cnt) + { + Base::indicesQue.template DeQue(); + AscendC::SetFlag(Base::eventIdMTE2ToS); + AscendC::WaitFlag(Base::eventIdMTE2ToS); + + AscendC::SetAtomicAdd(); + Base::ScatterVarByIndices(cnt); + AscendC::SetAtomicNone(); + } + + __aicore__ inline void Compute(int64_t cnt) + { + auto ComputeLength = cnt * Base::alignDimSize; + Base::outputUb = Base::outputQue.template AllocTensor(); + + Base::gradUb = Base::gradQue.template DeQue(); + AscendC::Muls(Base::outputUb, Base::gradUb, this->neLr, ComputeLength); + Base::gradQue.template FreeTensor(Base::gradUb); + + Base::outputQue.EnQue(Base::outputUb); + } + + // 负学习率 优化标量运算 + T1 neLr{ 0.0f }; +}; + +// 特化版本2 针对带weight decay场景的SGD +template +class SgdKernel : public SgdKernelBase { +public: + using Base = SgdKernelBase; + using Base::Base; + static constexpr uint32_t UB_BUFFER_SIZE = 3; + + __aicore__ inline SgdKernel() = default; + + __aicore__ inline void Init(GM_ADDR gradient, GM_ADDR indices, GM_ADDR inputVar, GM_ADDR learningRate, + GM_ADDR outputVar, const SgdTilingData &tilingData) + { + Base::InitBase(gradient, indices, inputVar, learningRate, outputVar, tilingData); + + inputVarGm.SetGlobalBuffer((__gm__ T1 *)inputVar, Base::tableSize * Base::dimSize); + + this->InitUbBuffer(tilingData.ubFreeSize); + weightDecay = tilingData.weightDecay; + } + + __aicore__ inline void Process() + { + auto nLoop = Base::procBs / Base::nLoopBs; + for (auto i = 0; i < nLoop; i++) { + this->DataCopyInIndices(i * Base::nLoopBs, Base::nLoopBs); + this->GatherVarByIndices(i * Base::nLoopBs, Base::nLoopBs); + this->DataCopyInGrad(i * Base::nLoopBs, Base::nLoopBs); + this->Compute(Base::nLoopBs); + this->ScatterVarByIndices(Base::nLoopBs); + } + + auto nTail = Base::procBs % Base::nLoopBs; + if (nTail != 0) { + this->DataCopyInIndices(nLoop * Base::nLoopBs, nTail); + this->GatherVarByIndices(nLoop * Base::nLoopBs, nTail); + this->DataCopyInGrad(nLoop * Base::nLoopBs, nTail); + this->Compute(nTail); + this->ScatterVarByIndices(nTail); + } + } + +private: + __aicore__ inline void InitUbBuffer(uint32_t ubFreeByteSize) + { + Base::nLoopBs = ubFreeByteSize / (Base::alignDimSize * (UB_BUFFER_SIZE * sizeof(T1)) + sizeof(T2)); + Base::nLoopBs = Base::nLoopBs / Base::T2_DATA_BLOCK * Base::T2_DATA_BLOCK; + ASSERT(Base::nLoopBs != 0 && "nLoopBs cant be zeros!"); + + Base::pipe.InitBuffer(varQue, Base::BUFFER_NUM, Base::nLoopBs * Base::alignDimSize * sizeof(T1)); + Base::pipe.InitBuffer(Base::gradQue, Base::BUFFER_NUM, Base::nLoopBs * Base::alignDimSize * sizeof(T1)); + Base::pipe.InitBuffer(Base::indicesQue, Base::BUFFER_NUM, Base::nLoopBs * sizeof(T2)); + Base::pipe.InitBuffer(Base::outputQue, Base::BUFFER_NUM, Base::nLoopBs * Base::alignDimSize * sizeof(T1)); + } + + __aicore__ inline void GatherVarByIndices(int64_t offset, int64_t cnt) + { + Base::indicesQue.template DeQue(); + AscendC::SetFlag(Base::eventIdMTE2ToS); + AscendC::WaitFlag(Base::eventIdMTE2ToS); + + varUb = varQue.template AllocTensor(); + for (int i = 0; i < cnt; i++) { + int64_t indices = Base::indicesUb.GetValue(i); + if (likely(indices >= 0)) { + this->template DataCopyIn(varUb[i * Base::alignDimSize], inputVarGm[indices * Base::dimSize], 1, + Base::dimSize, Base::isDimAlign); + } + } + + varQue.EnQue(varUb); + } + + __aicore__ inline void Compute(int64_t cnt) + { + auto ComputeLength = cnt * Base::alignDimSize; + Base::outputUb = Base::outputQue.template AllocTensor(); + + varUb = varQue.template DeQue(); + AscendC::Muls(Base::outputUb, varUb, this->weightDecay, ComputeLength); + + Base::gradUb = Base::gradQue.template DeQue(); + AscendC::Add(Base::outputUb, Base::gradUb, Base::outputUb, ComputeLength); + Base::gradQue.template FreeTensor(Base::gradUb); + + AscendC::Muls(Base::outputUb, Base::outputUb, this->lr, ComputeLength); + AscendC::Sub(Base::outputUb, varUb, Base::outputUb, ComputeLength); + varQue.template FreeTensor(varUb); + + Base::outputQue.EnQue(Base::outputUb); + } + + AscendC::GlobalTensor inputVarGm; + AscendC::TQue varQue; + AscendC::LocalTensor varUb; + T1 weightDecay{ 0.0f }; +}; + +#endif \ No newline at end of file diff --git a/cust_op/fused_sgd/op_kernel/sgd_kernel_base.h b/cust_op/fused_sgd/op_kernel/sgd_kernel_base.h new file mode 100644 index 00000000..ebdd9656 --- /dev/null +++ b/cust_op/fused_sgd/op_kernel/sgd_kernel_base.h @@ -0,0 +1,152 @@ +/* Copyright 2025. Huawei Technologies Co.,Ltd. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and + limitations under the License. +==============================================================================*/ + +#ifndef SGD_KERNEL_BASE_H +#define SGD_KERNEL_BASE_H + +#include "kernel_operator.h" + +template +class SgdKernelBase { +protected: + static constexpr uint32_t T1_DATA_BLOCK = AscendC::ONE_BLK_SIZE / sizeof(T1); + static constexpr uint32_t T2_DATA_BLOCK = AscendC::ONE_BLK_SIZE / sizeof(T2); + static constexpr uint32_t BUFFER_NUM = 1; + + __aicore__ inline SgdKernelBase() = default; + + __aicore__ inline void InitBase(GM_ADDR gradient, GM_ADDR indices, GM_ADDR inputVar, GM_ADDR learningRate, + GM_ADDR outputVar, const SgdTilingData &tilingData) + { + if (AscendC::GetBlockIdx() < tilingData.splitCoreIndex) { + procBs = tilingData.splitPrevCoreProcBs; + offsetBs = AscendC::GetBlockIdx() * tilingData.splitPrevCoreProcBs; + } else if (AscendC::GetBlockIdx() < tilingData.actualCoreNum) { + procBs = tilingData.splitNextCoreProcBs; + offsetBs = tilingData.splitCoreIndex * tilingData.splitPrevCoreProcBs + + (AscendC::GetBlockIdx() - tilingData.splitCoreIndex) * tilingData.splitNextCoreProcBs; + } else { + procBs = 0; + offsetBs = 0; + } + + batchSize = tilingData.batchSize; + tableSize = tilingData.tableSize; + dimSize = tilingData.dimSize; + alignDimSize = AscendC::AlignUp(dimSize, T1_DATA_BLOCK); + isDimAlign = (dimSize == alignDimSize); + + lrGm.SetGlobalBuffer((__gm__ T1*)learningRate, sizeof(float)); + lr = lrGm.GetValue(0); + + gradGm.SetGlobalBuffer((__gm__ T1 *)gradient + offsetBs * dimSize, batchSize * dimSize); + indicesGm.SetGlobalBuffer((__gm__ T2 *)indices + offsetBs, batchSize); + outputGm.SetGlobalBuffer((__gm__ T1 *)outputVar, tableSize * dimSize); + + eventIdMTE2ToS = static_cast(pipe.FetchEventID(AscendC::HardEvent::MTE2_S)); + AscendC::PipeBarrier(); + } + + template + __aicore__ inline void DataCopyIn(const AscendC::LocalTensor <, const AscendC::GlobalTensor >, + uint32_t copyCnt, uint32_t copyLens, bool isAlign) + { + if (isAlign) { + AscendC::DataCopy(lt, gt, copyCnt * copyLens); + } else { + AscendC::DataCopyParams copyParams = { + static_cast(copyCnt), + static_cast(copyLens * sizeof(T)), + 0, + 0, + }; + + AscendC::DataCopyPadParams padParams = { false, 0, 0, 0 }; + AscendC::DataCopyPad(lt, gt, copyParams, padParams); + } + } + + template + __aicore__ inline void DataCopyOut(const AscendC::GlobalTensor >, const AscendC::LocalTensor <, + uint32_t copyCnt, uint32_t copyLens, bool isAlign) + { + if (isAlign) { + AscendC::DataCopy(gt, lt, copyCnt * copyLens); + } else { + AscendC::DataCopyParams copyParams = { + static_cast(copyCnt), + static_cast(copyLens * sizeof(T)), + 0, + 0, + }; + AscendC::DataCopyPad(gt, lt, copyParams); + } + } + + // mte2 + __aicore__ inline void DataCopyInGrad(int64_t offset, int64_t cnt) + { + gradUb = gradQue.AllocTensor(); + DataCopyIn(gradUb, gradGm[offset * dimSize], cnt, dimSize, isDimAlign); + gradQue.EnQue(gradUb); + } + + // mte2 + __aicore__ inline void DataCopyInIndices(int64_t offset, int64_t cnt) + { + indicesUb = indicesQue.AllocTensor(); + + DataCopyIn(indicesUb, indicesGm[offset], 1, cnt, ((cnt % T2_DATA_BLOCK) == 0)); + indicesQue.EnQue(indicesUb); + } + + __aicore__ inline void ScatterVarByIndices(int64_t cnt) + { + outputUb = outputQue.template DeQue(); + + for (int i = 0; i < cnt; i++) { + int64_t indices = indicesUb.GetValue(i); + if (likely(indices >= 0)) { + this->template DataCopyOut(outputGm[indices * dimSize], outputUb[i * alignDimSize], 1, dimSize, + isDimAlign); + } + } + + indicesQue.template FreeTensor(indicesUb); + outputQue.template FreeTensor(outputUb); + } + + uint32_t batchSize{ 0 }; + uint32_t tableSize{ 0 }; + uint32_t dimSize{ 0 }; + uint32_t alignDimSize{ 0 }; + uint32_t offsetBs{ 0 }; + uint32_t procBs{ 0 }; + uint32_t nLoopBs{ 0 }; + bool isDimAlign{ true }; + T1 lr{ 0.0f }; + + AscendC::TPipe pipe; + AscendC::GlobalTensor gradGm, outputGm, lrGm; + AscendC::GlobalTensor indicesGm; + AscendC::TQue gradQue, indicesQue; + AscendC::TQue outputQue; + AscendC::LocalTensor gradUb, outputUb; + AscendC::LocalTensor indicesUb; + + event_t eventIdMTE2ToS; +}; + +#endif \ No newline at end of file diff --git a/cust_op/fused_sgd/run.sh b/cust_op/fused_sgd/run.sh new file mode 100644 index 00000000..b053d8cc --- /dev/null +++ b/cust_op/fused_sgd/run.sh @@ -0,0 +1,58 @@ +#!/bin/bash +# Copyright 2024. Huawei Technologies Co.,Ltd. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============================================================================== + +set -e + +source /etc/profile + +# 查找msopgen的路径,加入到环境变量PATH中 +msopgen_path=$(find /usr/local/Ascend/ -name msopgen | grep bin) +parent_dir=$(dirname "$msopgen_path") +export PATH=$parent_dir:$PATH + +echo $PATH +# 利用msopgen生成可编译文件 +rm -rf ./sgd +msopgen gen -i sgd.json -f tf -c ai_core-Ascend910B1 -lan cpp -out ./sgd -m 0 -op Sgd + +cp -rf op_kernel sgd/ +cp -rf op_host sgd/ + +cd sgd + +# 判断当前目录下是否存在CMakePresets.json文件 +if [ ! -f "CMakePresets.json" ]; then + echo "ERROR, CMakePresets.json file not exist." + exit 1 +fi + +# 禁止生成CRC校验和 +sed -i 's/--nomd5/--nomd5 --nocrc/g' ./cmake/makeself.cmake + +# 修改cann安装路径 +sed -i 's:"/usr/local/Ascend/latest":"/usr/local/Ascend/ascend-toolkit/latest":g' CMakePresets.json +# 修改vendor_name 防止覆盖之前vendor_name为customize的算子; +# vendor_name需要和aclnn中的CMakeLists.txt中的CUST_PKG_PATH值同步,不同步aclnn会调用失败; +# vendor_name字段值不能包含customize;包含会导致多算子部署场景CANN的vendors路径下config.ini文件内容截取错误 +sed -i 's:"customize":"mxrec_sgd":g' CMakePresets.json + +bash build.sh + +# 安装编译成功的算子包 +bash ./build_out/custom_opp*.run + +cd .. +rm -rf ./sgd \ No newline at end of file diff --git a/cust_op/fused_sgd/sgd.json b/cust_op/fused_sgd/sgd.json new file mode 100644 index 00000000..14ae00d9 --- /dev/null +++ b/cust_op/fused_sgd/sgd.json @@ -0,0 +1,68 @@ +[ + { + "op": "Sgd", + "language": "cpp", + "input_desc": [ + { + "name": "gradient", + "param_type": "required", + "format": [ + "ND" + ], + "type": [ + "fp32" + ] + }, + { + "name": "indices", + "param_type": "required", + "format": [ + "ND" + ], + "type": [ + "int32" + ] + }, + { + "name": "inputVar", + "param_type": "required", + "format": [ + "ND" + ], + "type": [ + "fp32" + ] + }, + { + "name": "learningRate", + "param_type": "required", + "format": [ + "ND" + ], + "type": [ + "fp32" + ] + } + ], + "output_desc": [ + { + "name": "inputVar", + "param_type": "required", + "format": [ + "ND" + ], + "type": [ + "fp32" + ] + } + ], + "attr": [ + { + "name": "weightDecay", + "param_type": "optional", + "type": "float", + "default_value": 0.0 + } + ] + } +] \ No newline at end of file diff --git a/examples/dlrm/model/config.py b/examples/dlrm/model/config.py index c06d0f92..b9029c30 100644 --- a/examples/dlrm/model/config.py +++ b/examples/dlrm/model/config.py @@ -1,6 +1,6 @@ # coding=utf-8 -# Copyright 2024. Huawei Technologies Co.,Ltd. All rights reserved. -# +# Copyright 2025. Huawei Technologies Co.,Ltd. All rights reserved. +# Copyright (c) 2021; NVIDIA CORPORATION. All rights reserved. # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at @@ -132,6 +132,7 @@ class Config: self.USE_PIPELINE_TEST = False # False indicates use SGD optimizer, else use LazyAdam. If True, is incompatible with dynamic_expansion self.use_lazy_adam_optimizer = False + self.use_fusion_optim = False # 动态学习率 GLOBAL_BATCH_SIZE = 8192 * 8 diff --git a/examples/dlrm/model/gradient_descent_w.py b/examples/dlrm/model/gradient_descent_w.py deleted file mode 100644 index 1a82c588..00000000 --- a/examples/dlrm/model/gradient_descent_w.py +++ /dev/null @@ -1,72 +0,0 @@ -# coding=utf-8 -# Copyright 2024. Huawei Technologies Co.,Ltd. All rights reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# ============================================================================== - -from __future__ import absolute_import -from __future__ import division -from __future__ import print_function - -from collections import defaultdict - -import tensorflow as tf -from tensorflow.python.ops import math_ops -from tensorflow.python.training import gradient_descent -from mx_rec.optimizers.base import CustomizedOptimizer -from mx_rec.util.initialize import ConfigInitializer - -from demo_logger import logger - - -def create_hash_optimizer(learning_rate, weight_decay=0.0001, use_locking=False, name="GradientDescent"): - optimizer = CustomizedGradientDescentWithWeighDecay(learning_rate=learning_rate, - weight_decay=weight_decay, - use_locking=use_locking, - name=name) - ConfigInitializer.get_instance().optimizer_config.optimizer_instance = optimizer - return optimizer - - -class CustomizedGradientDescentWithWeighDecay(gradient_descent.GradientDescentOptimizer, CustomizedOptimizer): - name_counter = defaultdict(int) - - def __init__(self, learning_rate, weight_decay, use_locking=False, name="GradientDescent"): - self.optimizer_type = "gradient_descent_with_weight_decay" - self.weight_decay = weight_decay - super(CustomizedGradientDescentWithWeighDecay, self)._get_name(name=name) - super(CustomizedGradientDescentWithWeighDecay, self).__init__( - learning_rate=learning_rate, use_locking=use_locking, name=self.unique_name - ) - self._slot_num = 0 - self._derivative = 1 - - def get_slot_init_values(self): - logger.info("no slot for gradient descent") - return [] - - def _apply_sparse_duplicate_indices(self, grad, var): - logger.debug(">>>> Enter _apply_sparse_duplicate_indices") - nd_indices = tf.expand_dims(grad.indices, 1) - logger.info(f"weigh_decay={self.weight_decay}") - if self.weight_decay is None: - nd_value = grad.values * math_ops.cast(self._learning_rate_tensor, var.dtype.base_dtype) - else: - nd_value = (grad.values + math_ops.cast(self.weight_decay, var.dtype.base_dtype) * - tf.gather(var, grad.indices)) * math_ops.cast(self._learning_rate_tensor, var.dtype.base_dtype) - var_update_op = tf.scatter_nd_add(var, nd_indices, -nd_value, use_locking=self._use_locking) - return var_update_op - - def _apply_dense(self, grad, var): - logger.debug(">>>> Enter _apply_dense") - raise NotImplementedError("You are using a wrong type of variable.") diff --git a/examples/dlrm/model/optimizer.py b/examples/dlrm/model/optimizer.py index 18dbe288..4acb0aa0 100644 --- a/examples/dlrm/model/optimizer.py +++ b/examples/dlrm/model/optimizer.py @@ -1,5 +1,5 @@ # coding=utf-8 -# Copyright 2024. Huawei Technologies Co.,Ltd. All rights reserved. +# Copyright 2025. Huawei Technologies Co.,Ltd. All rights reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -17,9 +17,9 @@ import tensorflow as tf from delay_loss_scale import DenseLossScaleOptimizer, SparseLossScaleOptimizer -from gradient_descent_w import create_hash_optimizer from mx_rec.util.initialize import ConfigInitializer from mx_rec.optimizers.gradient_descent_by_addr import create_hash_optimizer_by_addr +from mx_rec.optimizers.gradient_descent import create_hash_optimizer from mx_rec.optimizers import lazy_adam @@ -38,7 +38,8 @@ def get_dense_and_sparse_optimizer(cfg): if use_dynamic_expansion: sparse_optimizer = create_hash_optimizer_by_addr(learning_rate=cfg.learning_rate[1], weight_decay=0.0001) else: - sparse_optimizer = create_hash_optimizer(learning_rate=cfg.learning_rate[1], weight_decay=0.0001) + sparse_optimizer = create_hash_optimizer( + learning_rate=cfg.learning_rate[1], weight_decay=0.0001, use_fusion_optim=cfg.use_fusion_optim) loss_scale = 1024 sparse_optimizer = SparseLossScaleOptimizer(sparse_optimizer, loss_scale) dense_optimizer = DenseLossScaleOptimizer(dense_optimizer, loss_scale) diff --git a/mx_rec/optimizers/gradient_descent.py b/mx_rec/optimizers/gradient_descent.py index e9b8f7b1..857e8806 100644 --- a/mx_rec/optimizers/gradient_descent.py +++ b/mx_rec/optimizers/gradient_descent.py @@ -1,6 +1,6 @@ #!/usr/bin/env python3 # -*- coding: utf-8 -*- -# Copyright 2024. Huawei Technologies Co.,Ltd. All rights reserved. +# Copyright 2025. Huawei Technologies Co.,Ltd. All rights reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -28,7 +28,11 @@ from tensorflow.python.training import gradient_descent from mx_rec.optimizers.base import CustomizedOptimizer from mx_rec.util.initialize import ConfigInitializer -from mx_rec.validator.validator import para_checker_decorator, StringValidator, ClassValidator, LearningRateValidator +from mx_rec.util.ops import import_host_pipeline_ops +from mx_rec.validator.validator import ( + para_checker_decorator, StringValidator, ClassValidator, LearningRateValidator, + OptionalFloatValidator +) @para_checker_decorator( @@ -36,15 +40,25 @@ from mx_rec.validator.validator import para_checker_decorator, StringValidator, ("learning_rate", LearningRateValidator, {"min_value": 0.0, "max_value": 10.0}, ["check_value"]), ("use_locking", ClassValidator, {"classes": (bool,)}), ("name", StringValidator, {"min_len": 1, "max_len": 200}, ["check_string_length"]), + ("use_fusion_optim", ClassValidator, {"classes": (bool,)}), + ("weight_decay", OptionalFloatValidator, {"min_value": 1e-5, "max_value": 1e-2}, ["check_value"]), ] ) -def create_hash_optimizer(learning_rate, use_locking=False, name="GradientDescent"): +def create_hash_optimizer( + learning_rate, use_locking=False, name="GradientDescent", use_fusion_optim=False, weight_decay=None + ): if ConfigInitializer.get_instance().use_dynamic_expansion: raise ValueError( "The dynamic expansion mode is not compatible with the optimizer, please config dynamic " "expansion mode and optimizer correctly." ) - optimizer = CustomizedGradientDescent(learning_rate=learning_rate, use_locking=use_locking, name=name) + optimizer = CustomizedGradientDescent( + learning_rate=learning_rate, + use_locking=use_locking, + name=name, + use_fusion_optim=use_fusion_optim, + weight_decay=weight_decay + ) ConfigInitializer.get_instance().optimizer_config.optimizer_instance = optimizer return optimizer @@ -52,7 +66,9 @@ def create_hash_optimizer(learning_rate, use_locking=False, name="GradientDescen class CustomizedGradientDescent(gradient_descent.GradientDescentOptimizer, CustomizedOptimizer): name_counter = defaultdict(int) - def __init__(self, learning_rate, use_locking=False, name="GradientDescent"): + def __init__( + self, learning_rate, use_locking=False, name="GradientDescent", use_fusion_optim=False, weight_decay=None + ): self.optimizer_type = "gradient_descent" self.optim_param_list = [] super(CustomizedGradientDescent, self)._get_name(name=name) @@ -61,6 +77,8 @@ class CustomizedGradientDescent(gradient_descent.GradientDescentOptimizer, Custo ) self._slot_num = 0 self._derivative = 1 + self._use_fusion_optim = use_fusion_optim + self._weight_decay = weight_decay def get_slot_init_values(self): return [] @@ -78,7 +96,19 @@ class CustomizedGradientDescent(gradient_descent.GradientDescentOptimizer, Custo ) nd_indices = tf.expand_dims(grad.indices, 1) - nd_value = grad.values * math_ops.cast(self._learning_rate_tensor, var.dtype.base_dtype) + if self._use_fusion_optim: + if table_instance.padding_keys_mask: + raise RuntimeError("The padding keys mode does not yet support fusion optimizer.") + var_update_op = import_host_pipeline_ops().sgd( + grad.values, nd_indices, var, self._learning_rate_tensor, self._weight_decay + ) + return var_update_op + + if self._weight_decay is None: + nd_value = grad.values * math_ops.cast(self._learning_rate_tensor, var.dtype.base_dtype) + else: + nd_value = (grad.values + math_ops.cast(self._weight_decay, var.dtype.base_dtype) * + tf.gather(var, grad.indices)) * math_ops.cast(self._learning_rate_tensor, var.dtype.base_dtype) nd_value = self._process_grad_value_mask(var, nd_value) var_update_op = tf.compat.v1.scatter_nd_add(var, nd_indices, -nd_value, use_locking=self._use_locking) return var_update_op diff --git a/mx_rec/validator/validator.py b/mx_rec/validator/validator.py index c634c5d7..59f5de5a 100644 --- a/mx_rec/validator/validator.py +++ b/mx_rec/validator/validator.py @@ -620,6 +620,20 @@ class OptionalIntValidator(IntValidator): invalid_options, constrained_options, msg) +class OptionalFloatValidator(FloatValidator): + """ + Float type validator if value is not None + """ + + def __init__(self, name: str, value: float, min_value: float = None, max_value: float = None, + invalid_options: List = None, constrained_options: List = None, msg: str = ""): + if not isinstance(value, float): + super(OptionalFloatValidator, self).__init__(name, 0.0, None, None, None, None, msg) + else: + super(OptionalFloatValidator, self).__init__(name, value, min_value, max_value, + invalid_options, constrained_options, msg) + + class Convert2intValidator(IntValidator): """ check whether a variable can be converted to int or not. diff --git a/src/ops_tf/hybrid_dataset_ops.cpp b/src/ops_tf/hybrid_dataset_ops.cpp index fd9474a8..2cc7e813 100644 --- a/src/ops_tf/hybrid_dataset_ops.cpp +++ b/src/ops_tf/hybrid_dataset_ops.cpp @@ -683,6 +683,17 @@ namespace tensorflow { .SetShapeFn(::tensorflow::shape_inference::UnknownShape); REGISTER_KERNEL_BUILDER(Name("LazyAdam").Device(DEVICE_CPU), MxRec::CustOps); + REGISTER_OP("Sgd") + .Input("gradient: float32") + .Input("indices: int32") + .Input("input_var: float32") + .Input("lr: float32") + .Attr("weightDecay: float = 0.0") + .Output("output_var: float32") + .SetIsStateful() + .SetShapeFn(::tensorflow::shape_inference::UnknownShape); + REGISTER_KERNEL_BUILDER(Name("Sgd").Device(DEVICE_CPU), MxRec::CustOps); + REGISTER_OP("LcclAllToAll") .Input("send_data: float") .Input("send_count_matrix: int64") -- Gitee From db8dbaa9929bbd7515a8346219f6960d12633de2 Mon Sep 17 00:00:00 2001 From: 18608119613 Date: Thu, 29 May 2025 20:17:26 +0800 Subject: [PATCH 2/2] add test --- cust_op/op_example/sgd/README.MD | 17 +++ cust_op/op_example/sgd/test.py | 236 +++++++++++++++++++++++++++++++ 2 files changed, 253 insertions(+) create mode 100644 cust_op/op_example/sgd/README.MD create mode 100644 cust_op/op_example/sgd/test.py diff --git a/cust_op/op_example/sgd/README.MD b/cust_op/op_example/sgd/README.MD new file mode 100644 index 00000000..290cae0c --- /dev/null +++ b/cust_op/op_example/sgd/README.MD @@ -0,0 +1,17 @@ +## 适用场景说明 +> 需要满足以下件的场景,适合使用本示例: +- 已经在/mxrec/cust_op/下安装完fused_sgd,希望在Tensorflow中调用该算子 +- 需要完整安装mx_rec tf1安装包 + +## 使用方法 +- 执行脚本前需要首先定义MX_REC_PACKAGE_PATH +```shell +export LD_PRELOAD=/usr/local/gcc7.3.0/lib64/libgomp.so.1 +export LD_LIBRARY_PATH=/usr/local/python3.7.5/lib/python3.7/site-packages/mx_rec/libasc/:$LD_LIBRARY_PATH +export MX_REC_PACKAGE_PATH=/usr/local/python3.7.5/lib/python3.7/site-packages/mx_rec/ +``` +- 利用pytest 执行脚本 +```python +pytest test.py +``` +- 运行结束之后会在当前目录生成汇总文件(包含精度和性能) \ No newline at end of file diff --git a/cust_op/op_example/sgd/test.py b/cust_op/op_example/sgd/test.py new file mode 100644 index 00000000..244178cd --- /dev/null +++ b/cust_op/op_example/sgd/test.py @@ -0,0 +1,236 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +# Copyright 2024. Huawei Technologies Co.,Ltd. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============================================================================== + +import os +import time +from dataclasses import dataclass +import csv +import logging +import pytest + +import numpy as np +import tensorflow as tf +from tensorflow.python.framework import ops + +tf.compat.v1.disable_eager_execution() +if tf.__version__.startswith("1"): + from npu_bridge.npu_init import NPURunConfig, NPUEstimator, npu_hooks_append, DumpConfig +else: + import npu_device + +mx_rec_package_path = os.getenv('MX_REC_PACKAGE_PATH') +if not mx_rec_package_path: + raise EnvironmentError("please set env MX_REC_PACKAGE_PATH first") + +op_lib_path = os.path.join(mx_rec_package_path, "libasc", "libasc_ops.so") +tfOpLib = tf.load_op_library(op_lib_path) + + +@dataclass +class CollectInfo: + batch_size: int + table_size: int + dim_size: int + lr: float + weight_decay: float + golden_avg: float + npu_avg: float + time_diff: float + speedup: float + res: bool + + +def write_to_csv(collect_info: CollectInfo): + # 定义 CSV 文件名和表头 + csv_file = "performance_report.csv" + headers = [ + "batch_size", "table_size", "dim_size", "lr", + "weight_decay", "golden_avg(s)", "npu_avg(s)", + "time_diff(s)", "speedup(x)", "compare_result" + ] + + # 检查文件是否存在,不存在则写入表头 + file_exists = os.path.isfile(csv_file) + + if collect_info.weight_decay is None: + collect_info.weight_decay = 0.0 + + # 以追加模式打开文件 + with open(csv_file, mode='a', newline='', encoding='utf-8') as file: + writer = csv.writer(file) + + # 如果文件不存在,写入表头 + if not file_exists: + writer.writerow(headers) + + # 写入性能数据 + writer.writerow([ + collect_info.batch_size, + collect_info.table_size, + collect_info.dim_size, + collect_info.lr, + collect_info.weight_decay, + f"{collect_info.golden_avg:.6f}", # 保留6位小数 + f"{collect_info.npu_avg:.6f}", + f"{collect_info.time_diff:.6f}", + f"{collect_info.speedup:.2f}", + f"{collect_info.res}" + ]) + + +def generate_unique_mask_array(batch_size, table_size): + # 生成 [0, table_size) 范围内的不重复随机索引 + indices = np.random.choice( + np.arange(table_size, dtype=np.int32), # 索引范围 + size=batch_size, # 生成数量 = batch_size + replace=False # 强制不重复[1,3,6](@ref) + ) + + # 调整形状为 (batch_size, 1) 并指定 dtype=int32 + return indices.reshape(-1, 1).astype(np.int32) + + +def gen_test_data(batch_size, table_size, dim_size, lr): + # 使用Numpy的固定随机种子保证可复现性 + np.random.seed(42) + grad = np.random.uniform(0.0, 1.0, size=(batch_size, dim_size)).astype(np.float32) + var = np.random.uniform(0.0, 1.0, size=(table_size, dim_size)).astype(np.float32) + lr_tensor = np.full((batch_size, dim_size), lr, dtype=np.float32) + indices = generate_unique_mask_array(batch_size, table_size) + + return grad, indices, var, lr_tensor + + +def run_sgd_npu(grad, indices, var, lr, weight_decay): + result = tfOpLib.sgd(grad, indices, var, lr, weight_decay) + return result + + +def run_sgd_gloden(grad, indices, var, lr, weight_decay): + if weight_decay is None: + nd_value = grad * lr + else: + nd_value = (grad + weight_decay * tf.gather(var, tf.squeeze(indices, axis=1))) * lr + var_update_op = tf.scatter_nd_add(var, indices, -nd_value) + return var_update_op + + +@pytest.fixture(scope="session") +def tf_session(): + logging.getLogger().setLevel(logging.INFO) + + device_id = 4 + + os.environ["DEVICE_ID"] = str(0) + os.environ["ASCEND_DEVICE_ID"] = str(device_id) + os.environ["JOB_ID"] = "10086" + # 初始化 Session 配置 + config = tf.compat.v1.ConfigProto() + custom_op = config.graph_options.rewrite_options.custom_optimizers.add() + custom_op.name = "NpuOptimizer" + + # 创建 Session + sess = tf.compat.v1.Session(config=config) + tf.compat.v1.global_variables_initializer().run(session=sess) + + yield sess # 将会话传递给测试用例 + + # 清理资源 + sess.close() + + +@pytest.fixture(scope="function") +def static_graph(): + grad_ph = tf.placeholder(tf.float32, shape=[None, None], name="global_grad") + indices_ph = tf.placeholder(tf.int32, shape=[None, 1], name="global_indices") + lr_ph = tf.placeholder(tf.float32, shape=[None, None], name="global_lr") + return grad_ph, indices_ph, lr_ph + + +@pytest.fixture(scope="function") +def tf_vars(tf_session, batch_size, table_size, dim_size, lr): + input_grad, input_indices, input_var, input_lr = gen_test_data(batch_size, table_size, dim_size, lr) + + with tf.variable_scope("npu_vars"): + var_npu = tf.Variable(input_var, dtype=tf.float32, trainable=False) + with tf.variable_scope("golden_vars"): + var_golden = tf.Variable(input_var, dtype=tf.float32, trainable=False) + + tf_session.run(tf.variables_initializer([var_npu, var_golden])) + return var_npu, var_golden, input_grad, input_indices, input_lr + + +@pytest.mark.parametrize("batch_size, table_size", [(b, 5 * b) for b in [1, 15, 100, 1000]]) +@pytest.mark.parametrize("dim_size", [32, 88, 128]) +@pytest.mark.parametrize("lr", [0.1, 1.2]) +@pytest.mark.parametrize("weight_decay", [None, 0.012]) +def test_sgd(tf_session, static_graph, tf_vars, batch_size, table_size, dim_size, lr, weight_decay): + grad_ph, indices_ph, lr_ph = static_graph + var_npu, var_golden, input_grad, input_indices, input_lr = tf_vars + + # 动态绑定维度 + grad = tf.reshape(grad_ph, [batch_size, dim_size]) + indices = tf.reshape(indices_ph, [batch_size, 1]) + lr_tensor = tf.reshape(lr_ph, [batch_size, dim_size]) + + # 构建计算图 + golden_out = run_sgd_gloden(grad, indices, var_golden, input_lr, weight_decay) + npu_out = run_sgd_npu(grad, indices, var_npu, input_lr, weight_decay) + + # 预热10次 + for _ in range(10): + results = tf_session.run({ + "npu_result": npu_out, + "golden_result": golden_out, + "var_npu": var_npu, + "var_golden": var_golden + }, feed_dict={ + grad: input_grad, + indices: input_indices, + lr_tensor: input_lr + }) + + golden_times = [] + for _ in range(10): + start = time.perf_counter() + tf_session.run({"golden_result": golden_out, "var_golden": var_golden}, + feed_dict={grad: input_grad, indices: input_indices, lr_tensor: input_lr}) + golden_times.append(time.perf_counter() - start) + golden_avg = np.mean(golden_times) + + npu_times = [] + for _ in range(10): + start = time.perf_counter() + tf_session.run({"npu_result": npu_out, "var_npu": var_npu}, + feed_dict={grad: input_grad, indices: input_indices, lr_tensor: input_lr}) + npu_times .append(time.perf_counter() - start) + npu_avg = np.mean(npu_times) + + res = np.allclose(results["golden_result"], results["npu_result"], 1e-4, 1e-4) + + collect_info = CollectInfo( + batch_size, table_size, dim_size, + lr, weight_decay, golden_avg, + npu_avg, golden_avg - npu_avg, + golden_avg / npu_avg, res + ) + + write_to_csv(collect_info) + assert res + +## pytest -v -s test.py 直接执行 对比精度和性能 +## msprof --application="pytest test.py" --output=prof 用msprof工具测量性能 \ No newline at end of file -- Gitee