From cba67d0ae6337c5acec6503d489e85c8067edd5d Mon Sep 17 00:00:00 2001 From: wangchao Date: Fri, 23 Sep 2022 01:06:54 +0800 Subject: [PATCH] =?UTF-8?q?=E6=A0=A1=E9=AA=8C=E7=BB=93=E6=9E=9C=E5=A2=9E?= =?UTF-8?q?=E5=8A=A0=E9=94=99=E8=AF=AF=E7=8E=87=E9=AA=8C=E8=AF=81=EF=BC=8C?= =?UTF-8?q?=E5=A4=A7=E4=BA=8E=E6=8C=87=E5=AE=9A=E9=94=99=E8=AF=AF=E7=8E=87?= =?UTF-8?q?=EF=BC=8C=E5=88=99=E4=B8=8D=E8=BF=9B=E8=A1=8CSQL=E6=9E=84?= =?UTF-8?q?=E5=BB=BA?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../check/client/ExtractFallbackFactory.java | 47 ++++++++--- .../check/client/ExtractFeignClient.java | 43 ++++++---- .../check/client/FeignClientService.java | 62 +++++++++----- .../check/config/DataCheckProperties.java | 2 + .../controller/CheckStartController.java | 17 +--- .../check/AbstractCheckDiffResultBuilder.java | 81 +++++++++++++++---- .../check/modules/check/CheckDiffResult.java | 31 ++++--- .../modules/check/DataCheckRunnable.java | 38 +++++---- .../check/modules/check/DataCheckService.java | 19 +++-- .../check/service/CheckService.java | 8 -- .../check/service/impl/CheckServiceImpl.java | 50 ------------ .../src/main/resources/application.yml | 2 + .../modules/bucket/CheckDiffResultTest.java | 67 +++++++++++++++ .../common/entry/check/DataCheckParam.java | 2 + .../BuildRepairStatementException.java | 34 ++++++++ .../extract/controller/ExtractController.java | 43 +++++++--- .../extract/service/DataExtractService.java | 27 ++++++- .../service/DataExtractServiceImpl.java | 49 ++++++----- .../src/main/resources/application.yml | 8 ++ 19 files changed, 423 insertions(+), 207 deletions(-) create mode 100644 datachecker-check/src/test/java/org/opengauss/datachecker/check/modules/bucket/CheckDiffResultTest.java create mode 100644 datachecker-common/src/main/java/org/opengauss/datachecker/common/exception/BuildRepairStatementException.java diff --git a/datachecker-check/src/main/java/org/opengauss/datachecker/check/client/ExtractFallbackFactory.java b/datachecker-check/src/main/java/org/opengauss/datachecker/check/client/ExtractFallbackFactory.java index fc40142..11240c4 100644 --- a/datachecker-check/src/main/java/org/opengauss/datachecker/check/client/ExtractFallbackFactory.java +++ b/datachecker-check/src/main/java/org/opengauss/datachecker/check/client/ExtractFallbackFactory.java @@ -15,9 +15,7 @@ package org.opengauss.datachecker.check.client; -import org.opengauss.datachecker.common.entry.check.IncrementCheckConfig; import org.opengauss.datachecker.common.entry.enums.CheckBlackWhiteMode; -import org.opengauss.datachecker.common.entry.enums.DML; import org.opengauss.datachecker.common.entry.extract.ExtractTask; import org.opengauss.datachecker.common.entry.extract.RowDataHash; import org.opengauss.datachecker.common.entry.extract.SourceDataLog; @@ -109,8 +107,45 @@ public class ExtractFallbackFactory implements FallbackFactory> buildRepairDml(String schema, String tableName, DML dml, Set diffSet) { + public Result> buildRepairStatementUpdateDml(String schema, String tableName, + Set diffSet) { + return Result.error("Remote call, build and repair statement exceptions according to parameters"); + } + + /** + * Build repair statements based on parameters + * + * @param schema The corresponding schema of the end DB to be repaired + * @param tableName table Name + * @param diffSet Differential primary key set + * @return Return to repair statement collection + */ + @Override + public Result> buildRepairStatementInsertDml(String schema, String tableName, + Set diffSet) { + return Result.error("Remote call, build and repair statement exceptions according to parameters"); + } + + /** + * Build repair statements based on parameters + * + * @param schema The corresponding schema of the end DB to be repaired + * @param tableName table Name + * @param diffSet Differential primary key set + * @return Return to repair statement collection + */ + @Override + public Result> buildRepairStatementDeleteDml(String schema, String tableName, + Set diffSet) { return Result.error("Remote call, build and repair statement exceptions according to parameters"); } @@ -138,11 +173,5 @@ public class ExtractFallbackFactory implements FallbackFactory tableList) { } - - @Override - public Result configIncrementCheckEnvironment(IncrementCheckConfig config) { - return Result.error("Remote call, configuration incremental verification scenario, " - + "configuration information related to debezium is abnormal"); - } } } diff --git a/datachecker-check/src/main/java/org/opengauss/datachecker/check/client/ExtractFeignClient.java b/datachecker-check/src/main/java/org/opengauss/datachecker/check/client/ExtractFeignClient.java index 04f1eb3..48648fc 100644 --- a/datachecker-check/src/main/java/org/opengauss/datachecker/check/client/ExtractFeignClient.java +++ b/datachecker-check/src/main/java/org/opengauss/datachecker/check/client/ExtractFeignClient.java @@ -15,9 +15,7 @@ package org.opengauss.datachecker.check.client; -import org.opengauss.datachecker.common.entry.check.IncrementCheckConfig; import org.opengauss.datachecker.common.entry.enums.CheckBlackWhiteMode; -import org.opengauss.datachecker.common.entry.enums.DML; import org.opengauss.datachecker.common.entry.extract.ExtractTask; import org.opengauss.datachecker.common.entry.extract.RowDataHash; import org.opengauss.datachecker.common.entry.extract.SourceDataLog; @@ -145,14 +143,36 @@ public interface ExtractFeignClient { * * @param schema The corresponding schema of the end DB to be repaired * @param tableName table Name - * @param dml Repair type {@link DML} * @param diffSet Differential primary key set * @return Return to repair statement collection */ - @PostMapping("/extract/build/repairDML") - Result> buildRepairDml(@RequestParam(name = "schema") String schema, - @RequestParam(name = "tableName") String tableName, @RequestParam(name = "dml") DML dml, - @RequestBody Set diffSet); + @PostMapping("/extract/build/repair/statement/update") + Result> buildRepairStatementUpdateDml(@RequestParam(name = "schema") String schema, + @RequestParam(name = "tableName") String tableName, @RequestBody Set diffSet); + + /** + * Build repair statements based on parameters + * + * @param schema The corresponding schema of the end DB to be repaired + * @param tableName table Name + * @param diffSet Differential primary key set + * @return Return to repair statement collection + */ + @PostMapping("/extract/build/repair/statement/insert") + Result> buildRepairStatementInsertDml(@RequestParam(name = "schema") String schema, + @RequestParam(name = "tableName") String tableName, @RequestBody Set diffSet); + + /** + * Build repair statements based on parameters + * + * @param schema The corresponding schema of the end DB to be repaired + * @param tableName table Name + * @param diffSet Differential primary key set + * @return Return to repair statement collection + */ + @PostMapping("/extract/build/repair/statement/delete") + Result> buildRepairStatementDeleteDml(@RequestParam(name = "schema") String schema, + @RequestParam(name = "tableName") String tableName, @RequestBody Set diffSet); /** * Issue incremental log data @@ -196,13 +216,4 @@ public interface ExtractFeignClient { */ @PostMapping("/extract/refresh/black/white/list") void refreshBlackWhiteList(@RequestParam CheckBlackWhiteMode mode, @RequestBody List tableList); - - /** - * Configure the configuration information related to debezium in the incremental verification scenario - * - * @param config Debezium related configurations - * @return Request results - */ - @PostMapping("/extract/debezium/topic/config") - Result configIncrementCheckEnvironment(IncrementCheckConfig config); } diff --git a/datachecker-check/src/main/java/org/opengauss/datachecker/check/client/FeignClientService.java b/datachecker-check/src/main/java/org/opengauss/datachecker/check/client/FeignClientService.java index 5471fb2..40fe464 100644 --- a/datachecker-check/src/main/java/org/opengauss/datachecker/check/client/FeignClientService.java +++ b/datachecker-check/src/main/java/org/opengauss/datachecker/check/client/FeignClientService.java @@ -15,20 +15,18 @@ package org.opengauss.datachecker.check.client; -import org.opengauss.datachecker.common.entry.check.IncrementCheckConfig; -import org.opengauss.datachecker.common.entry.enums.DML; import org.opengauss.datachecker.common.entry.enums.Endpoint; import org.opengauss.datachecker.common.entry.extract.ExtractTask; import org.opengauss.datachecker.common.entry.extract.SourceDataLog; import org.opengauss.datachecker.common.entry.extract.TableMetadata; import org.opengauss.datachecker.common.entry.extract.Topic; -import org.opengauss.datachecker.common.exception.CheckingException; import org.opengauss.datachecker.common.exception.DispatchClientException; import org.opengauss.datachecker.common.web.Result; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.lang.NonNull; import org.springframework.stereotype.Service; +import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Set; @@ -200,17 +198,54 @@ public class FeignClientService { * @param endpoint endpoint type * @param schema The corresponding schema of the end DB to be repaired * @param tableName table Name - * @param dml Repair type {@link DML} * @param diffSet Differential primary key set * @return Return to repair statement collection */ - public List buildRepairDml(Endpoint endpoint, String schema, String tableName, DML dml, + public List buildRepairStatementInsertDml(Endpoint endpoint, String schema, String tableName, Set diffSet) { - Result> result = getClient(endpoint).buildRepairDml(schema, tableName, dml, diffSet); + Result> result = getClient(endpoint).buildRepairStatementInsertDml(schema, tableName, diffSet); if (result.isSuccess()) { return result.getData(); } else { - return null; + return new ArrayList<>(); + } + } + + /** + * Build repair statements based on parameters + * + * @param endpoint endpoint type + * @param schema The corresponding schema of the end DB to be repaired + * @param tableName table Name + * @param diffSet Differential primary key set + * @return Return to repair statement collection + */ + public List buildRepairStatementDeleteDml(Endpoint endpoint, String schema, String tableName, + Set diffSet) { + Result> result = getClient(endpoint).buildRepairStatementDeleteDml(schema, tableName, diffSet); + if (result.isSuccess()) { + return result.getData(); + } else { + return new ArrayList<>(); + } + } + + /** + * Build repair statements based on parameters + * + * @param endpoint endpoint type + * @param schema The corresponding schema of the end DB to be repaired + * @param tableName table Name + * @param diffSet Differential primary key set + * @return Return to repair statement collection + */ + public List buildRepairStatementUpdateDml(Endpoint endpoint, String schema, String tableName, + Set diffSet) { + Result> result = getClient(endpoint).buildRepairStatementUpdateDml(schema, tableName, diffSet); + if (result.isSuccess()) { + return result.getData(); + } else { + return new ArrayList<>(); } } @@ -238,17 +273,4 @@ public class FeignClientService { return null; } } - - /** - * Configure the configuration information related to debezium in the incremental verification scenario - * - * @param endpoint endpoint type - * @param conifg Debezium related configurations - */ - public void configIncrementCheckEnvironment(Endpoint endpoint, IncrementCheckConfig conifg) { - Result result = getClient(endpoint).configIncrementCheckEnvironment(conifg); - if (!result.isSuccess()) { - throw new CheckingException(result.getMessage()); - } - } } diff --git a/datachecker-check/src/main/java/org/opengauss/datachecker/check/config/DataCheckProperties.java b/datachecker-check/src/main/java/org/opengauss/datachecker/check/config/DataCheckProperties.java index c749692..47cfca9 100644 --- a/datachecker-check/src/main/java/org/opengauss/datachecker/check/config/DataCheckProperties.java +++ b/datachecker-check/src/main/java/org/opengauss/datachecker/check/config/DataCheckProperties.java @@ -91,4 +91,6 @@ public class DataCheckProperties { * If set to true, the environment will be cleaned automatically after the full verification process is completed. */ private boolean canAutoCleanEnvironment; + + private int errorRate; } diff --git a/datachecker-check/src/main/java/org/opengauss/datachecker/check/controller/CheckStartController.java b/datachecker-check/src/main/java/org/opengauss/datachecker/check/controller/CheckStartController.java index 12ff537..e70e112 100644 --- a/datachecker-check/src/main/java/org/opengauss/datachecker/check/controller/CheckStartController.java +++ b/datachecker-check/src/main/java/org/opengauss/datachecker/check/controller/CheckStartController.java @@ -19,14 +19,12 @@ import io.swagger.v3.oas.annotations.Operation; import io.swagger.v3.oas.annotations.Parameter; import io.swagger.v3.oas.annotations.tags.Tag; import org.opengauss.datachecker.check.service.CheckService; -import org.opengauss.datachecker.common.entry.check.IncrementCheckConfig; import org.opengauss.datachecker.common.entry.enums.CheckMode; import org.opengauss.datachecker.common.web.Result; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.validation.annotation.Validated; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.PostMapping; -import org.springframework.web.bind.annotation.RequestBody; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController; @@ -57,20 +55,7 @@ public class CheckStartController { CheckMode checkMode) { return Result.success(checkService.start(checkMode)); } - - /** - * Incremental verification configuration initialization - * - * @param config Debezium incremental migration verification initialization configuration - * @return request result - */ - @Operation(summary = "Incremental verification configuration initialization") - @PostMapping("/increment/check/config") - public Result incrementCheckConfig(@RequestBody IncrementCheckConfig config) { - checkService.incrementCheckConfig(config); - return Result.success(); - } - + /** *
      * Stop the verification service and clean up the verification service.
diff --git a/datachecker-check/src/main/java/org/opengauss/datachecker/check/modules/check/AbstractCheckDiffResultBuilder.java b/datachecker-check/src/main/java/org/opengauss/datachecker/check/modules/check/AbstractCheckDiffResultBuilder.java
index 0b0f527..ac76ae6 100644
--- a/datachecker-check/src/main/java/org/opengauss/datachecker/check/modules/check/AbstractCheckDiffResultBuilder.java
+++ b/datachecker-check/src/main/java/org/opengauss/datachecker/check/modules/check/AbstractCheckDiffResultBuilder.java
@@ -20,8 +20,6 @@ import lombok.extern.slf4j.Slf4j;
 import org.opengauss.datachecker.check.client.FeignClientService;
 import org.opengauss.datachecker.common.entry.enums.DML;
 import org.opengauss.datachecker.common.entry.enums.Endpoint;
-import org.opengauss.datachecker.common.exception.DispatchClientException;
-import org.springframework.util.CollectionUtils;
 
 import java.time.LocalDateTime;
 import java.util.ArrayList;
@@ -38,10 +36,14 @@ import java.util.Set;
 @Slf4j
 @Getter
 public abstract class AbstractCheckDiffResultBuilder> {
+    private static final int MAX_DIFF_REPAIR_SIZE = 5000;
+
     private final FeignClientService feignClient;
 
     private String table;
     private int partitions;
+    private int rowCount;
+    private int errorRate;
     private String topic;
     private String schema;
     private boolean isTableStructureEquals;
@@ -146,6 +148,16 @@ public abstract class AbstractCheckDiffResultBuilder keyUpdateSet) {
         this.keyUpdateSet = keyUpdateSet;
-        repairUpdate = checkRepairSinkDiff(DML.REPLACE, schema, table, this.keyUpdateSet);
+        repairUpdate = checkRepairUpdateSinkDiff(schema, table, this.keyUpdateSet);
         return self();
     }
 
@@ -166,7 +178,7 @@ public abstract class AbstractCheckDiffResultBuilder keyInsertSet) {
         this.keyInsertSet = keyInsertSet;
-        repairInsert = checkRepairSinkDiff(DML.INSERT, schema, table, this.keyInsertSet);
+        repairInsert = checkRepairInsertSinkDiff(schema, table, this.keyInsertSet);
         return self();
     }
 
@@ -178,7 +190,7 @@ public abstract class AbstractCheckDiffResultBuilder keyDeleteSet) {
         this.keyDeleteSet = keyDeleteSet;
-        repairDelete = checkRepairSinkDiff(DML.DELETE, schema, table, this.keyDeleteSet);
+        repairDelete = checkRepairDeleteSinkDiff(schema, table, this.keyDeleteSet);
         return self();
     }
 
@@ -213,16 +225,57 @@ public abstract class AbstractCheckDiffResultBuilder checkRepairSinkDiff(DML dml, String schema, String tableName, Set sinkDiffSet) {
-        if (!CollectionUtils.isEmpty(sinkDiffSet)) {
-            try {
-                return feignClient.buildRepairDml(Endpoint.SOURCE, schema, tableName, dml, sinkDiffSet);
-            } catch (DispatchClientException exception) {
-                log.error("check table[{}] Repair [{}] Diff build Repair DML Error", tableName, dml, exception);
-                return new ArrayList<>();
-            }
+    protected boolean isNotLargeDiffKeys() {
+        int totalRepair = keyDeleteSet.size() + keyInsertSet.size() + keyUpdateSet.size();
+        int curErrorRate = (totalRepair * 100 / rowCount);
+        if (totalRepair <= MAX_DIFF_REPAIR_SIZE && curErrorRate <= errorRate) {
+            return true;
         } else {
-            return new ArrayList<>();
+            log.info("check table[{}] diff-count={},error-rate={}%, error is too large ,not to build repair dml", table,
+                keyUpdateSet.size(), curErrorRate);
+            return false;
+        }
+    }
+
+    protected List checkRepairUpdateSinkDiff(String schema, String table, Set keyUpdateSet) {
+        try {
+            if (keyUpdateSet.size() > 0 && isNotLargeDiffKeys()) {
+                log.info("check table[{}] repair [{}] diff-count={} build repair dml", table,
+                    DML.REPLACE.getDescription(), keyUpdateSet.size());
+                return feignClient.buildRepairStatementUpdateDml(Endpoint.SOURCE, schema, table, keyUpdateSet);
+            }
+        } catch (Exception exception) {
+            log.error("check table[{}] repair [{}] diff-count={} build repair dml error", schema,
+                DML.REPLACE.getDescription(), keyUpdateSet.size(), exception);
+        }
+        return new ArrayList<>();
+    }
+
+    protected List checkRepairInsertSinkDiff(String schema, String table, Set keyInsertSet) {
+        try {
+            if (keyInsertSet.size() > 0 && isNotLargeDiffKeys()) {
+                log.info("check table[{}] repair [{}] diff-count={} build repair dml", table,
+                    DML.INSERT.getDescription(), keyInsertSet.size());
+                return feignClient.buildRepairStatementInsertDml(Endpoint.SOURCE, schema, table, keyInsertSet);
+            }
+        } catch (Exception exception) {
+            log.error("check table[{}] repair [{}] diff-count={} build repair dml error", schema,
+                DML.INSERT.getDescription(), keyInsertSet.size(), exception);
+        }
+        return new ArrayList<>();
+    }
+
+    protected List checkRepairDeleteSinkDiff(String schema, String table, Set keyDeleteSet) {
+        try {
+            if (keyDeleteSet.size() > 0 && isNotLargeDiffKeys()) {
+                log.info("check table[{}] repair [{}] diff-count={} build repair dml", table,
+                    DML.DELETE.getDescription(), keyDeleteSet.size());
+                return feignClient.buildRepairStatementDeleteDml(Endpoint.SOURCE, schema, table, keyDeleteSet);
+            }
+        } catch (Exception exception) {
+            log.error("check table[{}] repair [{}] diff-count={} build repair dml error", schema,
+                DML.DELETE.getDescription(), keyDeleteSet.size(), exception);
         }
+        return new ArrayList<>();
     }
 }
diff --git a/datachecker-check/src/main/java/org/opengauss/datachecker/check/modules/check/CheckDiffResult.java b/datachecker-check/src/main/java/org/opengauss/datachecker/check/modules/check/CheckDiffResult.java
index cec4d23..298e040 100644
--- a/datachecker-check/src/main/java/org/opengauss/datachecker/check/modules/check/CheckDiffResult.java
+++ b/datachecker-check/src/main/java/org/opengauss/datachecker/check/modules/check/CheckDiffResult.java
@@ -37,12 +37,14 @@ import java.util.Set;
  */
 @Data
 @JSONType(orders = {"schema", "table", "topic", "partitions", "result", "message", "createTime", "keyInsertSet",
-    "keyUpdateSet", "keyDeleteSet", "repairInsert", "repairUpdate", "repairDelete"})
+    "keyUpdateSet", "keyDeleteSet", "repairInsert", "repairUpdate", "repairDelete"},
+    ignores = {"totalRepair", "buildRepairDml", "isBuildRepairDml"})
 public class CheckDiffResult {
     private String schema;
     private String table;
     private String topic;
     private int partitions;
+    private int totalRepair;
     private LocalDateTime createTime;
     private String result;
     private String message;
@@ -77,10 +79,12 @@ public class CheckDiffResult {
             keyUpdateSet = builder.getKeyUpdateSet();
             keyInsertSet = builder.getKeyInsertSet();
             keyDeleteSet = builder.getKeyDeleteSet();
-            repairUpdate = builder.getRepairUpdate();
-            repairInsert = builder.getRepairInsert();
-            repairDelete = builder.getRepairDelete();
-            resultAnalysis();
+            if (builder.isNotLargeDiffKeys()) {
+                repairUpdate = builder.getRepairUpdate();
+                repairInsert = builder.getRepairInsert();
+                repairDelete = builder.getRepairDelete();
+            }
+            resultAnalysis(builder.isNotLargeDiffKeys());
         } else {
             initEmptyCollections();
             resultTableStructureNotEquals();
@@ -108,18 +112,21 @@ public class CheckDiffResult {
                      .concat("!");
     }
 
-    private void resultAnalysis() {
+    private void resultAnalysis(boolean isNotLargeDiffKeys) {
+        message = schema + "." + table + "_[" + partitions + "] check ";
         if (CollectionUtils.isEmpty(keyInsertSet) && CollectionUtils.isEmpty(keyUpdateSet) && CollectionUtils
             .isEmpty(keyDeleteSet)) {
             result = "success";
-            message = schema.concat(".").concat(table).concat("_[").concat(String.valueOf(partitions))
-                            .concat("] check success");
+            message += result;
         } else {
             result = "failed";
-            message =
-                schema.concat(".").concat(table).concat("_[").concat(String.valueOf(partitions)).concat("] check : ")
-                      .concat(" insert=" + keyInsertSet.size()).concat(" update=" + keyUpdateSet.size())
-                      .concat(" delete=" + keyDeleteSet.size());
+            message += result;
+            message +=
+                "( insert=" + keyInsertSet.size() + " update=" + keyUpdateSet.size() + " delete=" + keyDeleteSet.size()
+                    + " )";
+            if (totalRepair > 0 && !isNotLargeDiffKeys) {
+                message += " data error is too large , please check the database sync !";
+            }
         }
     }
 }
diff --git a/datachecker-check/src/main/java/org/opengauss/datachecker/check/modules/check/DataCheckRunnable.java b/datachecker-check/src/main/java/org/opengauss/datachecker/check/modules/check/DataCheckRunnable.java
index 903d9a2..5b85b3c 100644
--- a/datachecker-check/src/main/java/org/opengauss/datachecker/check/modules/check/DataCheckRunnable.java
+++ b/datachecker-check/src/main/java/org/opengauss/datachecker/check/modules/check/DataCheckRunnable.java
@@ -68,7 +68,7 @@ public class DataCheckRunnable implements Runnable {
     private final List sourceBucketList = Collections.synchronizedList(new ArrayList<>());
     private final List sinkBucketList = Collections.synchronizedList(new ArrayList<>());
     private final DifferencePair, Map, Map>>
-        difference = DifferencePair.of(new HashMap<>(), new HashMap<>(), new HashMap<>());
+            difference = DifferencePair.of(new HashMap<>(), new HashMap<>(), new HashMap<>());
     private final Map> bucketNumberDiffMap = new HashMap<>();
     private final FeignClientService feignClient;
     private final StatisticalService statisticalService;
@@ -80,6 +80,8 @@ public class DataCheckRunnable implements Runnable {
     private Topic topic;
     private String tableName;
     private int partitions;
+    private int rowCount;
+    private int errorRate;
     private int bucketCapacity;
     private String path;
 
@@ -101,7 +103,7 @@ public class DataCheckRunnable implements Runnable {
     private KafkaConsumerHandler buildKafkaHandler(DataCheckRunnableSupport support) {
         KafkaConsumerService kafkaConsumerService = support.getKafkaConsumerService();
         return new KafkaConsumerHandler(kafkaConsumerService.buildKafkaConsumer(false),
-            kafkaConsumerService.getRetryFetchRecordTimes());
+                kafkaConsumerService.getRetryFetchRecordTimes());
     }
 
     /**
@@ -145,9 +147,9 @@ public class DataCheckRunnable implements Runnable {
             if (sourceTree.getDepth() != sinkTree.getDepth()) {
                 refreshCheckStatus();
                 throw new MerkleTreeDepthException(String.format(Locale.ROOT,
-                    "source & sink data have large different, Please synchronize data again! "
-                        + "merkel tree depth different,source depth=[%d],sink depth=[%d]", sourceTree.getDepth(),
-                    sinkTree.getDepth()));
+                        "source & sink data have large different, Please synchronize data again! "
+                                + "merkel tree depth different,source depth=[%d],sink depth=[%d]", sourceTree.getDepth(),
+                        sinkTree.getDepth()));
             }
             // Recursively compare two Merkel trees and return the difference record.
             compareMerkleTree(sourceTree, sinkTree);
@@ -159,6 +161,8 @@ public class DataCheckRunnable implements Runnable {
         topic = checkParam.getTopic();
         tableName = topic.getTableName();
         partitions = checkParam.getPartitions();
+        rowCount = 0;
+        errorRate = checkParam.getErrorRate();
         path = checkParam.getPath();
         bucketCapacity = checkParam.getBucketCapacity();
         resetThreadName(tableName, partitions);
@@ -195,7 +199,7 @@ public class DataCheckRunnable implements Runnable {
         sortBuckets(sourceBucketList);
         sortBuckets(sinkBucketList);
         log.info("Initialize the verification data and the bucket construction is currently completed of table [{}-{}]",
-            tableName, partitions);
+                tableName, partitions);
     }
 
     /**
@@ -231,11 +235,12 @@ public class DataCheckRunnable implements Runnable {
         Map bucketMap = new ConcurrentHashMap<>(Constants.InitialCapacity.EMPTY);
         // Use feign client to pull Kafka data
         List dataList = getTopicPartitionsData(endpoint, partitions);
+        rowCount = rowCount + dataList.size();
         if (CollectionUtils.isEmpty(dataList)) {
             return;
         }
         log.info("Initialize the verification thread data, and pull the total number of [{}-{}-{}] data records to {}",
-            endpoint.getDescription(), tableName, partitions, dataList.size());
+                endpoint.getDescription(), tableName, partitions, dataList.size());
         BuilderBucketHandler bucketBuilder = new BuilderBucketHandler(bucketCapacity);
 
         // Use the pulled data to build the bucket list
@@ -388,7 +393,7 @@ public class DataCheckRunnable implements Runnable {
             } else {
                 // sourceSize is less than thresholdMinBucketSize, that is, there is only one bucket. Compare
                 DifferencePair subDifference =
-                    compareBucket(sourceBucketList.get(0), sinkBucketList.get(0));
+                        compareBucket(sourceBucketList.get(0), sinkBucketList.get(0));
                 difference.getDiffering().putAll(subDifference.getDiffering());
                 difference.getOnlyOnLeft().putAll(subDifference.getOnlyOnLeft());
                 difference.getOnlyOnRight().putAll(subDifference.getOnlyOnRight());
@@ -397,19 +402,20 @@ public class DataCheckRunnable implements Runnable {
         } else {
             refreshCheckStatus();
             throw new LargeDataDiffException(String.format(
-                "table[%s] source & sink data have large different," + "source-bucket-count=[%s] sink-bucket-count=[%s]"
-                    + " Please synchronize data again! ", tableName, sourceBucketCount, sinkBucketCount));
+                    "table[%s] source & sink data have large different," + "source-bucket-count=[%s] sink-bucket-count=[%s]"
+                            + " Please synchronize data again! ", tableName, sourceBucketCount, sinkBucketCount));
         }
     }
 
     private void checkResult() {
         CheckDiffResult result =
-            AbstractCheckDiffResultBuilder.builder(feignClient).table(tableName).topic(topic.getTopicName())
-                                          .schema(sinkSchema).partitions(partitions).isTableStructureEquals(true)
-                                          .isExistTableMiss(false, null)
-                                          .keyUpdateSet(difference.getDiffering().keySet())
-                                          .keyInsertSet(difference.getOnlyOnLeft().keySet())
-                                          .keyDeleteSet(difference.getOnlyOnRight().keySet()).build();
+                AbstractCheckDiffResultBuilder.builder(feignClient).table(tableName).topic(topic.getTopicName())
+                        .schema(sinkSchema).partitions(partitions).isTableStructureEquals(true)
+                        .isExistTableMiss(false, null)
+                        .rowCount(rowCount).errorRate(20)
+                        .keyUpdateSet(difference.getDiffering().keySet())
+                        .keyInsertSet(difference.getOnlyOnLeft().keySet())
+                        .keyDeleteSet(difference.getOnlyOnRight().keySet()).build();
         ExportCheckResult.export(path, result);
         log.info("Complete the output of data verification results of table [{}-{}]", tableName, partitions);
     }
diff --git a/datachecker-check/src/main/java/org/opengauss/datachecker/check/modules/check/DataCheckService.java b/datachecker-check/src/main/java/org/opengauss/datachecker/check/modules/check/DataCheckService.java
index 301ac06..5c59ffc 100644
--- a/datachecker-check/src/main/java/org/opengauss/datachecker/check/modules/check/DataCheckService.java
+++ b/datachecker-check/src/main/java/org/opengauss/datachecker/check/modules/check/DataCheckService.java
@@ -63,26 +63,29 @@ public class DataCheckService {
 
     private DataCheckParam buildCheckParam(Topic topic, int partitions, DataCheckConfig dataCheckConfig) {
         final int bucketCapacity = dataCheckConfig.getBucketCapacity();
+        final int errorRate = dataCheckConfig.getDataCheckProperties().getErrorRate();
         final String checkResultPath = dataCheckConfig.getCheckResultPath();
         return new DataCheckParam().setBucketCapacity(bucketCapacity).setTopic(topic).setPartitions(partitions)
-                                   .setProperties(kafkaProperties).setPath(checkResultPath);
+                .setProperties(kafkaProperties).setPath(checkResultPath).setErrorRate(errorRate);
     }
 
     /**
      * incrementCheckTableData
      *
-     * @param topic topic
+     * @param tableName      tableName
+     * @param checkDataCount
      */
-    public void incrementCheckTableData(Topic topic) {
-        DataCheckParam checkParam = buildIncrementCheckParam(topic, dataCheckConfig);
-        final IncrementCheckThread incrementCheck = new IncrementCheckThread(checkParam, dataCheckRunnableSupport);
+    public void incrementCheckTableData(String tableName, int checkDataCount) {
+        DataCheckParam checkParam = buildIncrementCheckParam(tableName, dataCheckConfig);
+        final IncrementCheckThread incrementCheck =
+            new IncrementCheckThread(checkParam, checkDataCount, dataCheckRunnableSupport);
         checkAsyncExecutor.submit(incrementCheck);
     }
 
-    private DataCheckParam buildIncrementCheckParam(Topic topic, DataCheckConfig dataCheckConfig) {
+    private DataCheckParam buildIncrementCheckParam(String tableName, DataCheckConfig dataCheckConfig) {
         final int bucketCapacity = dataCheckConfig.getBucketCapacity();
         final String checkResultPath = dataCheckConfig.getCheckResultPath();
-        return new DataCheckParam().setBucketCapacity(bucketCapacity).setTopic(topic).setPartitions(0)
-                                   .setPath(checkResultPath);
+        return new DataCheckParam().setTableName(tableName).setBucketCapacity(bucketCapacity).setPartitions(0)
+                .setPath(checkResultPath);
     }
 }
diff --git a/datachecker-check/src/main/java/org/opengauss/datachecker/check/service/CheckService.java b/datachecker-check/src/main/java/org/opengauss/datachecker/check/service/CheckService.java
index 9c5b4cb..76ed089 100644
--- a/datachecker-check/src/main/java/org/opengauss/datachecker/check/service/CheckService.java
+++ b/datachecker-check/src/main/java/org/opengauss/datachecker/check/service/CheckService.java
@@ -15,7 +15,6 @@
 
 package org.opengauss.datachecker.check.service;
 
-import org.opengauss.datachecker.common.entry.check.IncrementCheckConfig;
 import org.opengauss.datachecker.common.entry.enums.CheckMode;
 
 /**
@@ -44,11 +43,4 @@ public interface CheckService {
      * Clean up the verification environment
      */
     void cleanCheck();
-
-    /**
-     * Incremental verification configuration initialization
-     *
-     * @param config Initialize configuration
-     */
-    void incrementCheckConfig(IncrementCheckConfig config);
 }
diff --git a/datachecker-check/src/main/java/org/opengauss/datachecker/check/service/impl/CheckServiceImpl.java b/datachecker-check/src/main/java/org/opengauss/datachecker/check/service/impl/CheckServiceImpl.java
index e8e6894..eda786c 100644
--- a/datachecker-check/src/main/java/org/opengauss/datachecker/check/service/impl/CheckServiceImpl.java
+++ b/datachecker-check/src/main/java/org/opengauss/datachecker/check/service/impl/CheckServiceImpl.java
@@ -28,7 +28,6 @@ import org.opengauss.datachecker.check.service.CheckService;
 import org.opengauss.datachecker.check.service.CheckTableStructureService;
 import org.opengauss.datachecker.check.service.EndpointMetaDataManager;
 import org.opengauss.datachecker.common.entry.check.CheckProgress;
-import org.opengauss.datachecker.common.entry.check.IncrementCheckConfig;
 import org.opengauss.datachecker.common.entry.enums.CheckMode;
 import org.opengauss.datachecker.common.entry.enums.Endpoint;
 import org.opengauss.datachecker.common.entry.extract.ExtractTask;
@@ -136,8 +135,6 @@ public class CheckServiceImpl implements CheckService {
                     startCheckFullMode();
                     // Wait for the task construction to complete, and start the task polling thread
                     startCheckPollingThread();
-                } else {
-                    startCheckIncrementMode();
                 }
             } catch (CheckingException ex) {
                 cleanCheck();
@@ -208,43 +205,6 @@ public class CheckServiceImpl implements CheckService {
         }
     }
 
-    /**
-     * Enable incremental verification mode
-     */
-    private void startCheckIncrementMode() {
-        //  Enable incremental verification mode - polling thread start
-        if (Objects.equals(CHECK_MODE_REF.getAcquire(), CheckMode.INCREMENT)) {
-            ScheduledExecutorService scheduledExecutor = ThreadUtil.newSingleThreadScheduledExecutor();
-            scheduledExecutor.scheduleWithFixedDelay(() -> {
-                Thread.currentThread().setName(SELF_CHECK_POLL_THREAD_NAME);
-                log.debug("check polling check mode=[{}]", CHECK_MODE_REF.get());
-                //  Check whether there is a table to complete data extraction
-                if (tableStatusRegister.hasExtractCompleted()) {
-                    // Get the table name that completes data extraction
-                    String tableName = tableStatusRegister.completedTablePoll();
-                    if (Objects.isNull(tableName)) {
-                        return;
-                    }
-                    Topic topic = feignClientService.getIncrementTopicInfo(Endpoint.SOURCE, tableName);
-
-                    if (Objects.nonNull(topic)) {
-                        log.info("kafka consumer topic=[{}]", topic.toString());
-                        // Verify the data according to the table name and Kafka partition
-                        dataCheckService.incrementCheckTableData(topic);
-                    }
-                    completeProgressBar(scheduledExecutor);
-                }
-                // The current cycle task completes the verification and resets the task status
-                if (tableStatusRegister.isCheckCompleted()) {
-                    log.info("The current cycle verification is completed, reset the task status!");
-                    tableStatusRegister.rest();
-                    feignClientService.cleanTask(Endpoint.SOURCE);
-                    feignClientService.cleanTask(Endpoint.SINK);
-                }
-            }, 5, 2, TimeUnit.SECONDS);
-        }
-    }
-
     private void checkTableWithExtractEnd() {
         if (tableStatusRegister.isExtractCompleted() && CHECKING.get()) {
             log.info("check polling processNo={}, extract task complete. start checking....", PROCESS_SIGNATURE.get());
@@ -331,16 +291,6 @@ public class CheckServiceImpl implements CheckService {
         log.info("clear and reset the current verification service!");
     }
 
-    /**
-     * Increment Check Initialize configuration
-     *
-     * @param config Initialize configuration
-     */
-    @Override
-    public void incrementCheckConfig(IncrementCheckConfig config) {
-        feignClientService.configIncrementCheckEnvironment(Endpoint.SOURCE, config);
-    }
-
     private void cleanBuildTask(String processNo) {
         try {
             feignClientService.cleanEnvironment(Endpoint.SOURCE, processNo);
diff --git a/datachecker-check/src/main/resources/application.yml b/datachecker-check/src/main/resources/application.yml
index 751674f..b50fe4d 100644
--- a/datachecker-check/src/main/resources/application.yml
+++ b/datachecker-check/src/main/resources/application.yml
@@ -65,6 +65,8 @@ data:
     auto-clean-environment: true
     check-with-sync-extracting: true
     retry-fetch-record-times: 5
+    error-rate: 30
+
 
 
 
diff --git a/datachecker-check/src/test/java/org/opengauss/datachecker/check/modules/bucket/CheckDiffResultTest.java b/datachecker-check/src/test/java/org/opengauss/datachecker/check/modules/bucket/CheckDiffResultTest.java
new file mode 100644
index 0000000..74351f1
--- /dev/null
+++ b/datachecker-check/src/test/java/org/opengauss/datachecker/check/modules/bucket/CheckDiffResultTest.java
@@ -0,0 +1,67 @@
+package org.opengauss.datachecker.check.modules.bucket.check;
+
+import com.alibaba.fastjson.JSONObject;
+import org.apache.kafka.common.protocol.types.Field;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.DisplayName;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.junit.jupiter.MockitoExtension;
+import org.opengauss.datachecker.check.client.FeignClientService;
+import org.opengauss.datachecker.check.modules.check.AbstractCheckDiffResultBuilder;
+import org.opengauss.datachecker.check.modules.check.CheckDiffResult;
+import org.opengauss.datachecker.common.entry.extract.Topic;
+import org.opengauss.datachecker.common.util.JsonObjectUtil;
+
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * CheckDiffResultTest
+ *
+ * @author :wangchao
+ * @date :Created in 2022/9/2
+ * @since :11
+ */
+@ExtendWith(MockitoExtension.class)
+public class CheckDiffResultTest {
+    private FeignClientService feignClient;
+    private Topic topic;
+    private String tableName;
+    private String sinkSchema;
+    private int partitions;
+    private int rowCount;
+    private Set updateSet;
+    private Set insertSet;
+    private Set deleteSet;
+
+    @BeforeEach
+    void setUp() {
+        feignClient = new FeignClientService();
+        topic = new Topic().setTopicName("topic_t_check_test");
+        tableName = "t_check_test";
+        sinkSchema = "test";
+        partitions = 0;
+        rowCount = 10000;
+        updateSet = new HashSet<>();
+        insertSet = new HashSet<>();
+        deleteSet = new HashSet<>();
+    }
+
+    /**
+     * testBuilder
+     */
+    @DisplayName("openGauss no divisions single primary select SQL build")
+    @Test
+    void testSelectNoDivisionsSqlBuilder() {
+        CheckDiffResult result =
+                AbstractCheckDiffResultBuilder.builder(feignClient).table(tableName).topic(topic.getTopicName())
+                        .schema(sinkSchema).partitions(partitions).isTableStructureEquals(true)
+                        .isExistTableMiss(false, null)
+                        .rowCount(rowCount).errorRate(20)
+                        .keyUpdateSet(updateSet)
+                        .keyInsertSet(insertSet)
+                        .keyDeleteSet(deleteSet).build();
+        System.out.println(JsonObjectUtil.format(result));
+    }
+}
diff --git a/datachecker-common/src/main/java/org/opengauss/datachecker/common/entry/check/DataCheckParam.java b/datachecker-common/src/main/java/org/opengauss/datachecker/common/entry/check/DataCheckParam.java
index 9e4fcf5..52f9323 100644
--- a/datachecker-common/src/main/java/org/opengauss/datachecker/common/entry/check/DataCheckParam.java
+++ b/datachecker-common/src/main/java/org/opengauss/datachecker/common/entry/check/DataCheckParam.java
@@ -30,6 +30,7 @@ import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
 @Data
 @Accessors(chain = true)
 public class DataCheckParam {
+    private String tableName;
     /**
      * Build bucket capacity parameters
      */
@@ -43,6 +44,7 @@ public class DataCheckParam {
      * Verify topic partition
      */
     private int partitions;
+    private int errorRate;
     /**
      * Verification result output path
      */
diff --git a/datachecker-common/src/main/java/org/opengauss/datachecker/common/exception/BuildRepairStatementException.java b/datachecker-common/src/main/java/org/opengauss/datachecker/common/exception/BuildRepairStatementException.java
new file mode 100644
index 0000000..4b8498a
--- /dev/null
+++ b/datachecker-common/src/main/java/org/opengauss/datachecker/common/exception/BuildRepairStatementException.java
@@ -0,0 +1,34 @@
+/*
+ * Copyright (c) 2022-2022 Huawei Technologies Co.,Ltd.
+ *
+ * openGauss is licensed under Mulan PSL v2.
+ * You can use this software according to the terms and conditions of the Mulan PSL v2.
+ * You may obtain a copy of Mulan PSL v2 at:
+ *
+ *           http://license.coscl.org.cn/MulanPSL2
+ *
+ * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
+ * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
+ * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
+ * See the Mulan PSL v2 for more details.
+ */
+
+package org.opengauss.datachecker.common.exception;
+
+/**
+ * BuildRepairStatementException
+ *
+ * @author :wangchao
+ * @date :Created in 2022/5/23
+ * @since :11
+ */
+public class BuildRepairStatementException extends ExtractException {
+    private static final long serialVersionUID = 414115892399622074L;
+
+    public BuildRepairStatementException(String message) {
+        super(message);
+    }
+
+    public BuildRepairStatementException() {
+    }
+}
diff --git a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/controller/ExtractController.java b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/controller/ExtractController.java
index 6a6af27..7c27e72 100644
--- a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/controller/ExtractController.java
+++ b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/controller/ExtractController.java
@@ -19,7 +19,6 @@ import io.swagger.v3.oas.annotations.Operation;
 import io.swagger.v3.oas.annotations.Parameter;
 import io.swagger.v3.oas.annotations.tags.Tag;
 import org.opengauss.datachecker.common.entry.enums.CheckBlackWhiteMode;
-import org.opengauss.datachecker.common.entry.enums.DML;
 import org.opengauss.datachecker.common.entry.extract.ExtractTask;
 import org.opengauss.datachecker.common.entry.extract.RowDataHash;
 import org.opengauss.datachecker.common.entry.extract.SourceDataLog;
@@ -172,21 +171,39 @@ public class ExtractController {
      * DML statements required to generate a repair report
      *
      * @param tableName table name
-     * @param dml       dml type
      * @param diffSet   primary key set
      * @return DML statement
      */
-    @Operation(summary = "DML statements required to generate a repair report")
-    @PostMapping("/extract/build/repairDML")
-    Result> buildRepairDml(
-        @NotEmpty(message = "the schema to which the table to be repaired belongs cannot be empty")
-        @RequestParam(name = "schema") String schema,
-        @NotEmpty(message = "the name of the table to be repaired belongs cannot be empty")
-        @RequestParam(name = "tableName") String tableName,
-        @NotNull(message = "the DML type to be repaired belongs cannot be empty") @RequestParam(name = "dml") DML dml,
-        @NotEmpty(message = "the primary key set to be repaired belongs cannot be empty") @RequestBody
-            Set diffSet) {
-        return Result.success(dataExtractService.buildRepairDml(schema, tableName, dml, diffSet));
+    @PostMapping("/extract/build/repair/statement/update")
+    Result> buildRepairStatementUpdateDml(@NotEmpty @RequestParam(name = "schema") String schema,
+        @NotEmpty @RequestParam(name = "tableName") String tableName, @NotEmpty @RequestBody Set diffSet) {
+        return Result.success(dataExtractService.buildRepairStatementUpdateDml(schema, tableName, diffSet));
+    }
+
+    /**
+     * DML statements required to generate a repair report
+     *
+     * @param tableName table name
+     * @param diffSet   primary key set
+     * @return DML statement
+     */
+    @PostMapping("/extract/build/repair/statement/insert")
+    Result> buildRepairStatementInsertDml(@NotEmpty @RequestParam(name = "schema") String schema,
+        @NotEmpty @RequestParam(name = "tableName") String tableName, @NotEmpty @RequestBody Set diffSet) {
+        return Result.success(dataExtractService.buildRepairStatementInsertDml(schema, tableName, diffSet));
+    }
+
+    /**
+     * DML statements required to generate a repair report
+     *
+     * @param tableName table name
+     * @param diffSet   primary key set
+     * @return DML statement
+     */
+    @PostMapping("/extract/build/repair/statement/delete")
+    Result> buildRepairStatementDeleteDml(@NotEmpty @RequestParam(name = "schema") String schema,
+        @NotEmpty @RequestParam(name = "tableName") String tableName, @NotEmpty @RequestBody Set diffSet) {
+        return Result.success(dataExtractService.buildRepairStatementDeleteDml(schema, tableName, diffSet));
     }
 
     /**
diff --git a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/service/DataExtractService.java b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/service/DataExtractService.java
index e0251be..0890f06 100644
--- a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/service/DataExtractService.java
+++ b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/service/DataExtractService.java
@@ -15,7 +15,6 @@
 
 package org.opengauss.datachecker.extract.service;
 
-import org.opengauss.datachecker.common.entry.enums.DML;
 import org.opengauss.datachecker.common.entry.extract.ExtractTask;
 import org.opengauss.datachecker.common.entry.extract.RowDataHash;
 import org.opengauss.datachecker.common.entry.extract.SourceDataLog;
@@ -29,7 +28,7 @@ import java.util.Set;
 
 /**
  * @author wang chao
- * @description 数据抽取服务
+ * @description DataExtractService
  * @date 2022/5/8 19:27
  * @since 11
  **/
@@ -81,11 +80,30 @@ public interface DataExtractService {
      *
      * @param schema    schema
      * @param tableName tableName
-     * @param dml       dml
      * @param diffSet   Primary key set to be generated
      * @return DML statement
      */
-    List buildRepairDml(String schema, String tableName, DML dml, Set diffSet);
+    List buildRepairStatementUpdateDml(String schema, String tableName, Set diffSet);
+
+    /**
+     * DML statement generating repair report
+     *
+     * @param schema    schema
+     * @param tableName tableName
+     * @param diffSet   Primary key set to be generated
+     * @return DML statement
+     */
+    List buildRepairStatementInsertDml(String schema, String tableName, Set diffSet);
+
+    /**
+     * DML statement generating repair report
+     *
+     * @param schema    schema
+     * @param tableName tableName
+     * @param diffSet   Primary key set to be generated
+     * @return DML statement
+     */
+    List buildRepairStatementDeleteDml(String schema, String tableName, Set diffSet);
 
     /**
      * Query table data
@@ -130,4 +148,5 @@ public interface DataExtractService {
      * @return database schema
      */
     String queryDatabaseSchema();
+
 }
diff --git a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/service/DataExtractServiceImpl.java b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/service/DataExtractServiceImpl.java
index 3f504e6..be70fc4 100644
--- a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/service/DataExtractServiceImpl.java
+++ b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/service/DataExtractServiceImpl.java
@@ -28,6 +28,7 @@ import org.opengauss.datachecker.common.entry.extract.SourceDataLog;
 import org.opengauss.datachecker.common.entry.extract.TableMetadata;
 import org.opengauss.datachecker.common.entry.extract.TableMetadataHash;
 import org.opengauss.datachecker.common.entry.extract.Topic;
+import org.opengauss.datachecker.common.exception.BuildRepairStatementException;
 import org.opengauss.datachecker.common.exception.ProcessMultipleException;
 import org.opengauss.datachecker.common.exception.TableNotExistException;
 import org.opengauss.datachecker.common.exception.TaskNotFoundException;
@@ -53,7 +54,6 @@ import org.springframework.scheduling.annotation.Async;
 import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
 import org.springframework.stereotype.Service;
 
-import javax.validation.constraints.NotEmpty;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
@@ -320,32 +320,39 @@ public class DataExtractServiceImpl implements DataExtractService {
         }
     }
 
-    /**
-     * DML statement generating repair report
-     *
-     * @param tableName tableName
-     * @param dml       dml
-     * @param diffSet   Primary key set to be generated
-     * @return DML statement
-     */
     @Override
-    public List buildRepairDml(String schema, @NotEmpty String tableName, @NonNull DML dml,
-        @NotEmpty Set diffSet) {
+    public List buildRepairStatementUpdateDml(String schema, String tableName, Set diffSet) {
+        log.info("check table[{}] repair [{}] diff-count={} build repair dml", schema, DML.REPLACE.getDescription(),
+            diffSet.size());
+        if (CollectionUtils.isEmpty(diffSet)) {
+            return new ArrayList<>();
+        }
+        final TableMetadata metadata = MetaDataCache.get(tableName);
+        return dataManipulationService.buildReplace(schema, tableName, diffSet, metadata);
+    }
+
+    @Override
+    public List buildRepairStatementInsertDml(String schema, String tableName, Set diffSet) {
+        log.info("check table[{}] repair [{}] diff-count={} build repair dml", schema, DML.INSERT.getDescription(),
+            diffSet.size());
         if (CollectionUtils.isEmpty(diffSet)) {
             return new ArrayList<>();
         }
-        List resultList = new ArrayList<>();
         final TableMetadata metadata = MetaDataCache.get(tableName);
-        final List primaryMetas = metadata.getPrimaryMetas();
-
-        if (Objects.equals(dml, DML.DELETE)) {
-            resultList.addAll(dataManipulationService.buildDelete(schema, tableName, diffSet, primaryMetas));
-        } else if (Objects.equals(dml, DML.INSERT)) {
-            resultList.addAll(dataManipulationService.buildInsert(schema, tableName, diffSet, metadata));
-        } else if (Objects.equals(dml, DML.REPLACE)) {
-            resultList.addAll(dataManipulationService.buildReplace(schema, tableName, diffSet, metadata));
+        return dataManipulationService.buildInsert(schema, tableName, diffSet, metadata);
+    }
+
+    @Override
+    public List buildRepairStatementDeleteDml(String schema, String tableName, Set diffSet) {
+        log.info("check table[{}] repair [{}] diff-count={} build repair dml", schema, DML.DELETE.getDescription(),
+            diffSet.size());
+        final TableMetadata metadata = MetaDataCache.get(tableName);
+        if (Objects.nonNull(metadata)) {
+            final List primaryMetas = metadata.getPrimaryMetas();
+            return dataManipulationService.buildDelete(schema, tableName, diffSet, primaryMetas);
+        } else {
+            throw new BuildRepairStatementException(tableName);
         }
-        return resultList;
     }
 
     /**
diff --git a/datachecker-extract/src/main/resources/application.yml b/datachecker-extract/src/main/resources/application.yml
index ee6bb24..556f776 100644
--- a/datachecker-extract/src/main/resources/application.yml
+++ b/datachecker-extract/src/main/resources/application.yml
@@ -27,8 +27,16 @@ spring:
       fetch-min-size: 1
       max-poll-records: 20000
       fetch-max-bytes: 536870912  # 512M
+  mvc:
+    async:
+      request-timeout: 60000
 
 feign:
+  client:
+    config:
+      default:
+      connectTimeout: 50000
+      readTimeout: 50000
   okhttp:
     enabled: true
 
-- 
Gitee