From c882196a3c87972486aa9e1c0c8686b4ef9dc31c Mon Sep 17 00:00:00 2001 From: mystarry-sky Date: Sat, 17 Sep 2022 20:01:41 +0800 Subject: [PATCH] =?UTF-8?q?=E6=A0=A1=E9=AA=8C=E6=B5=81=E7=A8=8B=E4=BB=A5?= =?UTF-8?q?=E5=8F=8A=E6=A0=A1=E9=AA=8C=E6=97=A5=E5=BF=97=E4=BC=98=E5=8C=96?= =?UTF-8?q?=EF=BC=8C=E5=A2=9E=E5=8A=A0=E8=A1=A8=E7=BC=BA=E5=A4=B1=E6=A0=A1?= =?UTF-8?q?=E9=AA=8C=E6=8A=A5=E5=91=8A=E7=94=9F=E6=88=90=EF=BC=8C=E8=A1=A8?= =?UTF-8?q?=E5=AD=97=E6=AE=B5=E6=A3=80=E6=9F=A5?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../check/cache/TableStatusRegister.java | 66 ++++- .../datachecker/check/config/AsyncConfig.java | 6 +- .../controller/TaskStatusController.java | 26 +- .../modules/bucket/BuilderBucketHandler.java | 2 +- .../check/AbstractCheckDiffResultBuilder.java | 15 ++ .../check/modules/check/CheckDiffResult.java | 29 ++- .../modules/check/DataCheckRunnable.java | 26 +- .../check/modules/check/DataCheckService.java | 18 -- .../check/modules/merkle/MerkleTree.java | 16 +- .../modules/task/TaskManagerService.java | 4 +- .../modules/task/TaskManagerServiceImpl.java | 6 +- .../service/CheckTableStructureService.java | 49 +++- .../service/EndpointMetaDataManager.java | 28 ++- .../check/service/impl/CheckServiceImpl.java | 17 +- .../common/entry/check/CheckProgress.java | 44 ++++ .../common/exception/CheckThreadFactory.java | 78 ++++++ .../extract/client/CheckingFeignClient.java | 16 +- .../extract/config/KafkaProducerConfig.java | 5 +- .../service/DataExtractServiceImpl.java | 41 +-- .../task/ResultSetHandlerSinkTest.java | 235 ++++++++++++++++++ .../extract/task/ResultSetHandlerTest.java | 200 +++++++++++++++ 21 files changed, 784 insertions(+), 143 deletions(-) create mode 100644 datachecker-common/src/main/java/org/opengauss/datachecker/common/entry/check/CheckProgress.java create mode 100644 datachecker-common/src/main/java/org/opengauss/datachecker/common/exception/CheckThreadFactory.java create mode 100644 datachecker-extract/src/test/java/org/opengauss/datachecker/extract/task/ResultSetHandlerSinkTest.java create mode 100644 datachecker-extract/src/test/java/org/opengauss/datachecker/extract/task/ResultSetHandlerTest.java diff --git a/datachecker-check/src/main/java/org/opengauss/datachecker/check/cache/TableStatusRegister.java b/datachecker-check/src/main/java/org/opengauss/datachecker/check/cache/TableStatusRegister.java index ade1685..156c411 100644 --- a/datachecker-check/src/main/java/org/opengauss/datachecker/check/cache/TableStatusRegister.java +++ b/datachecker-check/src/main/java/org/opengauss/datachecker/check/cache/TableStatusRegister.java @@ -17,6 +17,7 @@ package org.opengauss.datachecker.check.cache; import lombok.extern.slf4j.Slf4j; import org.opengauss.datachecker.common.constant.Constants.InitialCapacity; +import org.opengauss.datachecker.common.entry.check.CheckProgress; import org.opengauss.datachecker.common.entry.check.Pair; import org.opengauss.datachecker.common.exception.ExtractException; import org.opengauss.datachecker.common.util.ThreadUtil; @@ -24,6 +25,7 @@ import org.springframework.stereotype.Service; import javax.validation.constraints.NotEmpty; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Set; @@ -32,6 +34,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.IntStream; /** @@ -61,12 +64,12 @@ public class TableStatusRegister implements Cache { /** * Task status cache. The initial default value of status is 0 */ - private static final int TASK_STATUS_DEFAULT_VALUE = 0; + public static final int TASK_STATUS_DEFAULT_VALUE = 0; /** * Status self check thread name */ private static final String SELF_CHECK_THREAD_NAME = "task-status-manager"; - + private static final AtomicInteger CHECK_COUNT = new AtomicInteger(0); /** *
      * Data extraction task execution state cache
@@ -116,7 +119,7 @@ public class TableStatusRegister implements Cache {
      * @return boolean
      */
     public boolean isCheckCompleted() {
-        return TABLE_STATUS_CACHE.values().stream().filter(status -> status > TASK_STATUS_DEFAULT_VALUE)
+        return TABLE_STATUS_CACHE.values().stream().filter(status -> status >= TASK_STATUS_DEFAULT_VALUE)
                                  .allMatch(status -> status == TASK_STATUS_CONSUMER_VALUE);
     }
 
@@ -181,8 +184,8 @@ public class TableStatusRegister implements Cache {
      *
      * @return extract progress
      */
-    public Pair extractProgress() {
-        return Pair.of(extractCompletedCount(), cacheSize());
+    public CheckProgress extractProgress() {
+        return new CheckProgress(errorCount(), extractingCount(), extractCount(), checkCount());
     }
 
     /**
@@ -304,6 +307,24 @@ public class TableStatusRegister implements Cache {
         return TABLE_STATUS_CACHE.getOrDefault(key, -1);
     }
 
+    /**
+     * query all table status
+     *
+     * @return table status
+     */
+    public Map get() {
+        return Collections.unmodifiableMap(TABLE_STATUS_CACHE);
+    }
+
+    /**
+     * query all table partitions status
+     *
+     * @return table status
+     */
+    public Map> getTablePartitionsStatusCache() {
+        return Collections.unmodifiableMap(TABLE_PARTITIONS_STATUS_CACHE);
+    }
+
     /**
      * Get cache key set
      *
@@ -349,7 +370,7 @@ public class TableStatusRegister implements Cache {
      * @param scheduledExecutor scheduledExecutor
      */
     public void cleanAndShutdown(ScheduledExecutorService scheduledExecutor) {
-        if (isCheckCompleted()) {
+        if (doCheckingStatus() == cacheSize()) {
             removeAll();
             scheduledExecutor.shutdownNow();
             log.info("clean check status and shutdown {} thread", SELF_CHECK_THREAD_NAME);
@@ -379,11 +400,13 @@ public class TableStatusRegister implements Cache {
     /**
      * Check whether there is a completed data extraction task. If yes, update completed_ Table table
      * Check whether there is a completed data verification task. If yes, update consumer_ COMPLETED_ Table table
+     *
+     * @return check table count
      */
-    private void doCheckingStatus() {
+    private int doCheckingStatus() {
         Set keys = TABLE_STATUS_CACHE.keySet();
         if (keys.size() <= 0) {
-            return;
+            return 0;
         }
         List extractErrorList = new ArrayList<>();
         List notExtractCompleteList = new ArrayList<>();
@@ -403,7 +426,30 @@ public class TableStatusRegister implements Cache {
                 log.debug("process check status running");
             }
         });
-        log.debug("progress information: {} is being extracted, {} is being verified, {} is completed,and {} is error",
-            notExtractCompleteList, notCheckCompleteList, checkCompleteList, extractErrorList);
+        final int lastCheckCount = CHECK_COUNT.getAndSet(extractErrorList.size() + checkCompleteList.size());
+        if (CHECK_COUNT.get() > lastCheckCount) {
+            log.debug("progress info: {} is being extracted, {} is being verified, {} is completed,and {} is error",
+                notExtractCompleteList, notCheckCompleteList, checkCompleteList, extractErrorList);
+        }
+        return CHECK_COUNT.get();
+    }
+
+    private int errorCount() {
+        return (int) TABLE_STATUS_CACHE.values().stream().filter(status -> status < TASK_STATUS_DEFAULT_VALUE).count();
+    }
+
+    private int extractingCount() {
+        return (int) TABLE_STATUS_CACHE.values().stream().filter(
+            status -> status >= TASK_STATUS_DEFAULT_VALUE && status < TASK_STATUS_COMPLETED_VALUE).count();
+    }
+
+    private int extractCount() {
+        return (int) TABLE_STATUS_CACHE.values().stream().filter(status -> status == TASK_STATUS_COMPLETED_VALUE)
+                                       .count();
+    }
+
+    private int checkCount() {
+        return (int) TABLE_STATUS_CACHE.values().stream().filter(status -> status == TASK_STATUS_CONSUMER_VALUE)
+                                       .count();
     }
 }
diff --git a/datachecker-check/src/main/java/org/opengauss/datachecker/check/config/AsyncConfig.java b/datachecker-check/src/main/java/org/opengauss/datachecker/check/config/AsyncConfig.java
index 981c035..edbe76f 100644
--- a/datachecker-check/src/main/java/org/opengauss/datachecker/check/config/AsyncConfig.java
+++ b/datachecker-check/src/main/java/org/opengauss/datachecker/check/config/AsyncConfig.java
@@ -15,6 +15,7 @@
 
 package org.opengauss.datachecker.check.config;
 
+import org.opengauss.datachecker.common.exception.CheckThreadFactory;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
 import org.springframework.scheduling.annotation.EnableScheduling;
@@ -23,6 +24,8 @@ import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
 import java.util.concurrent.ThreadPoolExecutor;
 
 /**
+ * AsyncConfig
+ *
  * @author wang chao
  * @date 2022/5/8 19:17
  * @since 11
@@ -30,18 +33,17 @@ import java.util.concurrent.ThreadPoolExecutor;
 @Configuration
 @EnableScheduling
 public class AsyncConfig {
-
     @Bean("asyncCheckExecutor")
     public ThreadPoolTaskExecutor asyncCheckExecutor() {
         ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
         executor.setCorePoolSize(Runtime.getRuntime().availableProcessors() * 2);
         executor.setMaxPoolSize(Runtime.getRuntime().availableProcessors() * 4);
         executor.setQueueCapacity(Integer.MAX_VALUE);
+        executor.setThreadFactory(new CheckThreadFactory());
         executor.setKeepAliveSeconds(60);
         executor.setThreadNamePrefix("check-thread");
         executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());
         executor.initialize();
         return executor;
     }
-
 }
diff --git a/datachecker-check/src/main/java/org/opengauss/datachecker/check/controller/TaskStatusController.java b/datachecker-check/src/main/java/org/opengauss/datachecker/check/controller/TaskStatusController.java
index 85ea644..fd6280f 100644
--- a/datachecker-check/src/main/java/org/opengauss/datachecker/check/controller/TaskStatusController.java
+++ b/datachecker-check/src/main/java/org/opengauss/datachecker/check/controller/TaskStatusController.java
@@ -16,7 +16,6 @@
 package org.opengauss.datachecker.check.controller;
 
 import io.swagger.v3.oas.annotations.Operation;
-import io.swagger.v3.oas.annotations.Parameter;
 import io.swagger.v3.oas.annotations.tags.Tag;
 import org.opengauss.datachecker.check.modules.task.TaskManagerService;
 import org.opengauss.datachecker.common.entry.enums.Endpoint;
@@ -25,12 +24,10 @@ import org.springframework.lang.NonNull;
 import org.springframework.validation.annotation.Validated;
 import org.springframework.web.bind.annotation.GetMapping;
 import org.springframework.web.bind.annotation.PostMapping;
-import org.springframework.web.bind.annotation.RequestBody;
-import org.springframework.web.bind.annotation.RequestParam;
 import org.springframework.web.bind.annotation.RestController;
 
 import javax.validation.constraints.NotEmpty;
-import java.util.List;
+import java.util.Map;
 
 /**
  * @author :wangchao
@@ -59,25 +56,12 @@ public class TaskStatusController {
     }
 
     /**
-     * Initialize task status
+     * query check status of all table
      *
-     * @param tableNameList tableNameList
-     */
-    @Operation(summary = "Initialize task status")
-    @PostMapping("/table/extract/status/init")
-    public void initTableExtractStatus(
-        @Parameter(description = "tableNameList") @RequestBody @NotEmpty List tableNameList) {
-        taskManagerService.initTableExtractStatus(tableNameList);
-    }
-
-    /**
-     * query check status of current table
-     *
-     * @param tableName tableName
      * @return status
      */
-    @GetMapping("/query/table/status")
-    public int queryTableCheckStatus(@RequestParam("tableName") String tableName) {
-        return taskManagerService.queryTableCheckStatus(tableName);
+    @GetMapping("/query/all/table/status")
+    public Map queryTableCheckStatus() {
+        return taskManagerService.queryTableCheckStatus();
     }
 }
diff --git a/datachecker-check/src/main/java/org/opengauss/datachecker/check/modules/bucket/BuilderBucketHandler.java b/datachecker-check/src/main/java/org/opengauss/datachecker/check/modules/bucket/BuilderBucketHandler.java
index a7f611e..5f61ddb 100644
--- a/datachecker-check/src/main/java/org/opengauss/datachecker/check/modules/bucket/BuilderBucketHandler.java
+++ b/datachecker-check/src/main/java/org/opengauss/datachecker/check/modules/bucket/BuilderBucketHandler.java
@@ -119,7 +119,7 @@ public class BuilderBucketHandler {
      */
     private int calculateMaxBucketCount(int totalCount) {
         int bucketCount = totalCount / bucketCapacity;
-        int asInt = IntStream.range(0, 15).filter(idx -> BUCKET_COUNT_LIMITS[idx] > bucketCount).findFirst().orElse(15);
+        int asInt = IntStream.range(0, 15).filter(idx -> BUCKET_COUNT_LIMITS[idx] > bucketCount).findFirst().orElse(14);
         return BUCKET_COUNT_LIMITS[asInt];
     }
 
diff --git a/datachecker-check/src/main/java/org/opengauss/datachecker/check/modules/check/AbstractCheckDiffResultBuilder.java b/datachecker-check/src/main/java/org/opengauss/datachecker/check/modules/check/AbstractCheckDiffResultBuilder.java
index fe39222..0b0f527 100644
--- a/datachecker-check/src/main/java/org/opengauss/datachecker/check/modules/check/AbstractCheckDiffResultBuilder.java
+++ b/datachecker-check/src/main/java/org/opengauss/datachecker/check/modules/check/AbstractCheckDiffResultBuilder.java
@@ -45,6 +45,8 @@ public abstract class AbstractCheckDiffResultBuilder keyUpdateSet;
     private Set keyInsertSet;
@@ -98,6 +100,19 @@ public abstract class AbstractCheckDiffResultBuilder(InitialCapacity.EMPTY);
-            keyInsertSet = new HashSet<>(InitialCapacity.EMPTY);
-            keyDeleteSet = new HashSet<>(InitialCapacity.EMPTY);
-            repairUpdate = new ArrayList<>(InitialCapacity.EMPTY);
-            repairInsert = new ArrayList<>(InitialCapacity.EMPTY);
-            repairDelete = new ArrayList<>(InitialCapacity.EMPTY);
+            initEmptyCollections();
             resultTableStructureNotEquals();
         }
     }
 
+    private void initEmptyCollections() {
+        keyUpdateSet = new HashSet<>(InitialCapacity.EMPTY);
+        keyInsertSet = new HashSet<>(InitialCapacity.EMPTY);
+        keyDeleteSet = new HashSet<>(InitialCapacity.EMPTY);
+        repairUpdate = new ArrayList<>(InitialCapacity.EMPTY);
+        repairInsert = new ArrayList<>(InitialCapacity.EMPTY);
+        repairDelete = new ArrayList<>(InitialCapacity.EMPTY);
+    }
+
     private void resultTableStructureNotEquals() {
         result = "failed";
         message = "table structure is not equals , please check the database sync !";
     }
 
+    private void resultTableNotExist(Endpoint onlyExistEndpoint) {
+        result = "failed";
+        message =
+            "table [".concat(table).concat("] , ").concat(" only exist in ").concat(onlyExistEndpoint.getDescription())
+                     .concat("!");
+    }
+
     private void resultAnalysis() {
         if (CollectionUtils.isEmpty(keyInsertSet) && CollectionUtils.isEmpty(keyUpdateSet) && CollectionUtils
             .isEmpty(keyDeleteSet)) {
diff --git a/datachecker-check/src/main/java/org/opengauss/datachecker/check/modules/check/DataCheckRunnable.java b/datachecker-check/src/main/java/org/opengauss/datachecker/check/modules/check/DataCheckRunnable.java
index 9abfe5a..f820788 100644
--- a/datachecker-check/src/main/java/org/opengauss/datachecker/check/modules/check/DataCheckRunnable.java
+++ b/datachecker-check/src/main/java/org/opengauss/datachecker/check/modules/check/DataCheckRunnable.java
@@ -32,6 +32,7 @@ import org.opengauss.datachecker.common.entry.check.Pair;
 import org.opengauss.datachecker.common.entry.enums.Endpoint;
 import org.opengauss.datachecker.common.entry.extract.RowDataHash;
 import org.opengauss.datachecker.common.entry.extract.Topic;
+import org.opengauss.datachecker.common.exception.CheckingException;
 import org.opengauss.datachecker.common.exception.LargeDataDiffException;
 import org.opengauss.datachecker.common.exception.MerkleTreeDepthException;
 import org.springframework.lang.NonNull;
@@ -47,6 +48,7 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Locale;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 
@@ -115,13 +117,17 @@ public class DataCheckRunnable implements Runnable {
      */
     @Override
     public void run() {
-        paramInit();
-        checkTableData();
-        // Verification result verification repair report
-        checkResult();
-        cleanCheckThreadEnvironment();
-        statisticalService.statistics(getStatisticsName(tableName, partitions), start);
-        refreshCheckStatus();
+        try {
+            paramInit();
+            checkTableData();
+        } catch (CheckingException ex) {
+            log.error("happen before some error,", ex);
+        } finally {
+            checkResult();
+            statisticalService.statistics(getStatisticsName(tableName, partitions), start);
+            cleanCheckThreadEnvironment();
+            refreshCheckStatus();
+        }
     }
 
     private void checkTableData() {
@@ -299,8 +305,11 @@ public class DataCheckRunnable implements Runnable {
      * @param sink         Sink Merkel tree node
      * @param diffNodeList Difference node record
      */
-    private void compareMerkleTree(@NonNull Node source, @NonNull Node sink, List> diffNodeList) {
+    private void compareMerkleTree(Node source, Node sink, List> diffNodeList) {
         // If the nodes are the same, exit
+        if (Objects.isNull(source) || Objects.isNull(sink)) {
+            return;
+        }
         if (Arrays.equals(source.getSignature(), sink.getSignature())) {
             return;
         }
@@ -397,6 +406,7 @@ public class DataCheckRunnable implements Runnable {
         CheckDiffResult result =
             AbstractCheckDiffResultBuilder.builder(feignClient).table(tableName).topic(topic.getTopicName())
                                           .schema(sinkSchema).partitions(partitions).isTableStructureEquals(true)
+                                          .isExistTableMiss(false, null)
                                           .keyUpdateSet(difference.getDiffering().keySet())
                                           .keyInsertSet(difference.getOnlyOnLeft().keySet())
                                           .keyDeleteSet(difference.getOnlyOnRight().keySet()).build();
diff --git a/datachecker-check/src/main/java/org/opengauss/datachecker/check/modules/check/DataCheckService.java b/datachecker-check/src/main/java/org/opengauss/datachecker/check/modules/check/DataCheckService.java
index 279b167..301ac06 100644
--- a/datachecker-check/src/main/java/org/opengauss/datachecker/check/modules/check/DataCheckService.java
+++ b/datachecker-check/src/main/java/org/opengauss/datachecker/check/modules/check/DataCheckService.java
@@ -76,7 +76,6 @@ public class DataCheckService {
     public void incrementCheckTableData(Topic topic) {
         DataCheckParam checkParam = buildIncrementCheckParam(topic, dataCheckConfig);
         final IncrementCheckThread incrementCheck = new IncrementCheckThread(checkParam, dataCheckRunnableSupport);
-        incrementCheck.setUncaughtExceptionHandler(new DataCheckThreadExceptionHandler());
         checkAsyncExecutor.submit(incrementCheck);
     }
 
@@ -86,21 +85,4 @@ public class DataCheckService {
         return new DataCheckParam().setBucketCapacity(bucketCapacity).setTopic(topic).setPartitions(0)
                                    .setPath(checkResultPath);
     }
-
-    static class DataCheckThreadExceptionHandler implements Thread.UncaughtExceptionHandler {
-
-        /**
-         * Method invoked when the given thread terminates due to the
-         * given uncaught exception.
-         * 

Any exception thrown by this method will be ignored by the - * Java Virtual Machine. - * - * @param thread the thread - * @param throwable the exception - */ - @Override - public void uncaughtException(Thread thread, Throwable throwable) { - log.error(thread.getName() + " exception: " + throwable); - } - } } diff --git a/datachecker-check/src/main/java/org/opengauss/datachecker/check/modules/merkle/MerkleTree.java b/datachecker-check/src/main/java/org/opengauss/datachecker/check/modules/merkle/MerkleTree.java index 77feba1..219ce09 100644 --- a/datachecker-check/src/main/java/org/opengauss/datachecker/check/modules/merkle/MerkleTree.java +++ b/datachecker-check/src/main/java/org/opengauss/datachecker/check/modules/merkle/MerkleTree.java @@ -17,23 +17,29 @@ package org.opengauss.datachecker.check.modules.merkle; import lombok.Data; import lombok.experimental.Accessors; +import lombok.extern.slf4j.Slf4j; import org.opengauss.datachecker.check.modules.bucket.Bucket; import org.opengauss.datachecker.common.util.ByteUtil; import java.nio.ByteBuffer; -import java.util.*; +import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Queue; import java.util.zip.Adler32; /** + * MerkleTree + * * @author :wangchao * @date :Created in 2022/5/23 * @since :11 */ +@Slf4j @Data public class MerkleTree { - public static final int MAGIC_HDR = 0Xcdaace99; - private static final int INT_BYTE = 4; public static final int LONG_BYTE = 8; /** * Merkel tree node type leaf node @@ -43,6 +49,8 @@ public class MerkleTree { * Merkel tree node type internal node */ public static final byte INTERNAL_SIG_TYPE = 0x01; + + private static final int INT_BYTE = 4; /** * Serialization format :(magic header:int)(num nodes:int)(tree depth:int)(leaf length:int) * [(node type:byte)(signature length:int)(signature:byte)] @@ -79,7 +87,9 @@ public class MerkleTree { * @param bucketList bucketList */ public MerkleTree(List bucketList) { + log.info("MerkleTree init bucket size={}", bucketList.size()); constructTree(bucketList); + log.info("MerkleTree root node depth={},nnodes={}", depth, nnodes); } /** diff --git a/datachecker-check/src/main/java/org/opengauss/datachecker/check/modules/task/TaskManagerService.java b/datachecker-check/src/main/java/org/opengauss/datachecker/check/modules/task/TaskManagerService.java index 16b66b6..a7bac03 100644 --- a/datachecker-check/src/main/java/org/opengauss/datachecker/check/modules/task/TaskManagerService.java +++ b/datachecker-check/src/main/java/org/opengauss/datachecker/check/modules/task/TaskManagerService.java @@ -18,6 +18,7 @@ package org.opengauss.datachecker.check.modules.task; import org.opengauss.datachecker.common.entry.enums.Endpoint; import java.util.List; +import java.util.Map; /** * TaskManagerService @@ -51,8 +52,7 @@ public interface TaskManagerService { /** * query check status of current table * - * @param tableName tableName * @return status */ - int queryTableCheckStatus(String tableName); + Map queryTableCheckStatus(); } diff --git a/datachecker-check/src/main/java/org/opengauss/datachecker/check/modules/task/TaskManagerServiceImpl.java b/datachecker-check/src/main/java/org/opengauss/datachecker/check/modules/task/TaskManagerServiceImpl.java index 49f392b..e6d521c 100644 --- a/datachecker-check/src/main/java/org/opengauss/datachecker/check/modules/task/TaskManagerServiceImpl.java +++ b/datachecker-check/src/main/java/org/opengauss/datachecker/check/modules/task/TaskManagerServiceImpl.java @@ -25,6 +25,7 @@ import org.springframework.stereotype.Service; import java.util.HashSet; import java.util.List; +import java.util.Map; /** * TaskManagerServiceImpl @@ -83,11 +84,10 @@ public class TaskManagerServiceImpl implements TaskManagerService { /** * query check status of current table * - * @param tableName tableName * @return status */ @Override - public int queryTableCheckStatus(String tableName) { - return tableStatusRegister.get(tableName); + public Map queryTableCheckStatus() { + return tableStatusRegister.get(); } } diff --git a/datachecker-check/src/main/java/org/opengauss/datachecker/check/service/CheckTableStructureService.java b/datachecker-check/src/main/java/org/opengauss/datachecker/check/service/CheckTableStructureService.java index 9c6f692..d84319a 100644 --- a/datachecker-check/src/main/java/org/opengauss/datachecker/check/service/CheckTableStructureService.java +++ b/datachecker-check/src/main/java/org/opengauss/datachecker/check/service/CheckTableStructureService.java @@ -27,8 +27,8 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Service; -import java.util.ArrayList; import java.util.List; +import java.util.Objects; import java.util.stream.Collectors; /** @@ -65,26 +65,57 @@ public class CheckTableStructureService { * Table structure definition field name verification */ public void check() { - final List tableStructureChangeList = new ArrayList<>(); + checkMissTable(); + checkTableStructureChanged(); + } + + private void checkTableStructureChanged() { final List checkTableList = endpointMetaDataManager.getCheckTableList(); + taskManagerService.initTableExtractStatus(checkTableList); checkTableList.forEach(tableName -> { final TableMetadata sourceMeta = endpointMetaDataManager.getTableMetadata(Endpoint.SOURCE, tableName); final TableMetadata sinkMeta = endpointMetaDataManager.getTableMetadata(Endpoint.SINK, tableName); - final boolean isTableStructureEquals = isTableStructureEquals(sourceMeta, sinkMeta); - if (!isTableStructureEquals) { - tableStructureChangeList.add(tableName); - log.error("compared the field names in table[{}](case ignored) and the result is not match", tableName); - } + checkTableStructureChanged(tableName, sourceMeta, sinkMeta); + }); + } + + private void checkMissTable() { + final List missTableList = endpointMetaDataManager.getMissTableList(); + missTableList.forEach(missTable -> { + final TableMetadata sourceMeta = endpointMetaDataManager.getTableMetadata(Endpoint.SOURCE, missTable); + checkMissTable(missTable, sourceMeta); }); - tableStructureChangeList.forEach(tableName -> { + } + + private void checkTableStructureChanged(String tableName, TableMetadata sourceMeta, TableMetadata sinkMeta) { + final boolean isTableStructureEquals = isTableStructureEquals(sourceMeta, sinkMeta); + if (!isTableStructureEquals) { taskManagerService.refreshTableExtractStatus(tableName, Endpoint.CHECK, -1); CheckDiffResult result = CheckDiffResultBuilder.builder(null).table(tableName).isTableStructureEquals(false).build(); ExportCheckResult.export(checkResultPath, result); - }); + log.error("compared the field names in table[{}](case ignored) and the result is not match", tableName); + } + } + + private void checkMissTable(String tableName, TableMetadata sourceMeta) { + Endpoint onlyExistEndpoint = Objects.isNull(sourceMeta) ? Endpoint.SINK : Endpoint.SOURCE; + CheckDiffResult result = + CheckDiffResultBuilder.builder(null).table(tableName).isExistTableMiss(true, onlyExistEndpoint).build(); + ExportCheckResult.export(checkResultPath, result); + log.error("compared the field names in table[{}](case ignored) and the result is not match", tableName); + } + + private boolean isTableNotExist(TableMetadata sourceMeta, TableMetadata sinkMeta) { + // one or double endpoint table have not exists, then return false + return Objects.isNull(sourceMeta) || Objects.isNull(sinkMeta); } private boolean isTableStructureEquals(TableMetadata sourceMeta, TableMetadata sinkMeta) { + // one or double endpoint table have not exists, then return false + if (isTableNotExist(sourceMeta, sinkMeta)) { + return false; + } return tableStructureCompare.compare(sourceMeta.getPrimaryMetas(), sinkMeta.getPrimaryMetas()) && tableStructureCompare.compare(sourceMeta.getColumnsMetas(), sinkMeta.getColumnsMetas()); } diff --git a/datachecker-check/src/main/java/org/opengauss/datachecker/check/service/EndpointMetaDataManager.java b/datachecker-check/src/main/java/org/opengauss/datachecker/check/service/EndpointMetaDataManager.java index f4cf815..bc038db 100644 --- a/datachecker-check/src/main/java/org/opengauss/datachecker/check/service/EndpointMetaDataManager.java +++ b/datachecker-check/src/main/java/org/opengauss/datachecker/check/service/EndpointMetaDataManager.java @@ -44,6 +44,7 @@ import java.util.stream.Collectors; @RequiredArgsConstructor public class EndpointMetaDataManager { private static final List CHECK_TABLE_LIST = new ArrayList<>(); + private static final List MISS_TABLE_LIST = new ArrayList<>(); private static final Map SOURCE_METADATA = new HashMap<>(); private static final Map SINK_METADATA = new HashMap<>(); @@ -63,7 +64,9 @@ public class EndpointMetaDataManager { final List sourceTables = getEndpointTableNamesSortByTableRows(SOURCE_METADATA); final List sinkTables = getEndpointTableNamesSortByTableRows(SINK_METADATA); final List checkTables = compareAndFilterEndpointTables(sourceTables, sinkTables); + final List missTables = compareAndFilterMissTables(sourceTables, sinkTables); CHECK_TABLE_LIST.addAll(checkTables); + MISS_TABLE_LIST.addAll(missTables); log.info("Load endpoint metadata information"); } else { log.error("The metadata information is empty, and the verification is terminated abnormally," @@ -73,6 +76,17 @@ public class EndpointMetaDataManager { } } + private List compareAndFilterMissTables(List sourceTables, List sinkTables) { + List missList = new ArrayList<>(); + missList.addAll(diffList(sourceTables, sinkTables)); + missList.addAll(diffList(sinkTables, sourceTables)); + return missList; + } + + private List diffList(List source, List sink) { + return source.stream().filter(table -> !sink.contains(table)).collect(Collectors.toList()); + } + /** * Get the table metadata information of the specified endpoint * @@ -96,6 +110,7 @@ public class EndpointMetaDataManager { private void clearCache() { CHECK_TABLE_LIST.clear(); + MISS_TABLE_LIST.clear(); SOURCE_METADATA.clear(); SINK_METADATA.clear(); } @@ -119,11 +134,20 @@ public class EndpointMetaDataManager { } /** - * Return to the verification black and white list + * check table list * - * @return black and white list + * @return check table list */ public List getCheckTableList() { return CHECK_TABLE_LIST; } + + /** + * miss table list + * + * @return miss table list + */ + public List getMissTableList() { + return MISS_TABLE_LIST; + } } diff --git a/datachecker-check/src/main/java/org/opengauss/datachecker/check/service/impl/CheckServiceImpl.java b/datachecker-check/src/main/java/org/opengauss/datachecker/check/service/impl/CheckServiceImpl.java index 0e7830f..daf7780 100644 --- a/datachecker-check/src/main/java/org/opengauss/datachecker/check/service/impl/CheckServiceImpl.java +++ b/datachecker-check/src/main/java/org/opengauss/datachecker/check/service/impl/CheckServiceImpl.java @@ -27,8 +27,8 @@ import org.opengauss.datachecker.check.modules.check.ExportCheckResult; import org.opengauss.datachecker.check.service.CheckService; import org.opengauss.datachecker.check.service.CheckTableStructureService; import org.opengauss.datachecker.check.service.EndpointMetaDataManager; +import org.opengauss.datachecker.common.entry.check.CheckProgress; import org.opengauss.datachecker.common.entry.check.IncrementCheckConfig; -import org.opengauss.datachecker.common.entry.check.Pair; import org.opengauss.datachecker.common.entry.enums.CheckMode; import org.opengauss.datachecker.common.entry.enums.Endpoint; import org.opengauss.datachecker.common.entry.extract.ExtractTask; @@ -80,6 +80,7 @@ public class CheckServiceImpl implements CheckService { * Process signature */ private static final AtomicReference PROCESS_SIGNATURE = new AtomicReference<>(); + private static final AtomicReference CHECK_PROGRESS_REFERENCE = new AtomicReference<>(); /** * Verify Mode @@ -193,7 +194,6 @@ public class CheckServiceImpl implements CheckService { ScheduledExecutorService scheduledExecutor = ThreadUtil.newSingleThreadScheduledExecutor(); scheduledExecutor.scheduleWithFixedDelay(() -> { Thread.currentThread().setName(SELF_CHECK_POLL_THREAD_NAME); - log.debug("check polling processNo={}", PROCESS_SIGNATURE.get()); if (Objects.isNull(PROCESS_SIGNATURE.get())) { throw new CheckingPollingException("process is empty,stop check polling"); } @@ -283,12 +283,17 @@ public class CheckServiceImpl implements CheckService { } private void completeProgressBar(ScheduledExecutorService scheduledExecutor) { - Pair process = tableStatusRegister.extractProgress(); - log.info("current check process has task total=[{}] , complete=[{}]", process.getSink(), process.getSource()); - + CheckProgress process = CHECK_PROGRESS_REFERENCE.get(); + final CheckProgress newProcess = tableStatusRegister.extractProgress(); + if (!Objects.equals(process, newProcess)) { + CHECK_PROGRESS_REFERENCE.set(newProcess); + log.info("The verification is completed, reset status :{}", CHECK_PROGRESS_REFERENCE.get()); + } // The current task completes the verification and resets the task status if (tableStatusRegister.isCheckCompleted()) { - log.info("The current verification is completed, reset the task status!"); + log.info("The verification is completed, reset status :{}", tableStatusRegister.get()); + log.debug("The verification is completed, reset check partitions status: {}", + tableStatusRegister.getTablePartitionsStatusCache()); if (isAutoCleanEnvironment) { log.info("The current cycle task completes the verification and resets the check environment"); cleanCheck(); diff --git a/datachecker-common/src/main/java/org/opengauss/datachecker/common/entry/check/CheckProgress.java b/datachecker-common/src/main/java/org/opengauss/datachecker/common/entry/check/CheckProgress.java new file mode 100644 index 0000000..0aa4951 --- /dev/null +++ b/datachecker-common/src/main/java/org/opengauss/datachecker/common/entry/check/CheckProgress.java @@ -0,0 +1,44 @@ +/* + * Copyright (c) 2022-2022 Huawei Technologies Co.,Ltd. + * + * openGauss is licensed under Mulan PSL v2. + * You can use this software according to the terms and conditions of the Mulan PSL v2. + * You may obtain a copy of Mulan PSL v2 at: + * + * http://license.coscl.org.cn/MulanPSL2 + * + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PSL v2 for more details. + */ + +package org.opengauss.datachecker.common.entry.check; + +import lombok.AllArgsConstructor; +import lombok.EqualsAndHashCode; +import lombok.EqualsAndHashCode.Include; +import lombok.Getter; +import lombok.ToString; + +/** + * CheckProgress + * + * @author :wangchao + * @date :Created in 2022/9/16 + * @since :11 + */ +@ToString +@Getter +@EqualsAndHashCode +@AllArgsConstructor +public class CheckProgress { + @Include + private int errorCount; + @Include + private int extractingCount; + @Include + private int extractCount; + @Include + private int checkCount; +} diff --git a/datachecker-common/src/main/java/org/opengauss/datachecker/common/exception/CheckThreadFactory.java b/datachecker-common/src/main/java/org/opengauss/datachecker/common/exception/CheckThreadFactory.java new file mode 100644 index 0000000..4f5e412 --- /dev/null +++ b/datachecker-common/src/main/java/org/opengauss/datachecker/common/exception/CheckThreadFactory.java @@ -0,0 +1,78 @@ +/* + * Copyright (c) 2022-2022 Huawei Technologies Co.,Ltd. + * + * openGauss is licensed under Mulan PSL v2. + * You can use this software according to the terms and conditions of the Mulan PSL v2. + * You may obtain a copy of Mulan PSL v2 at: + * + * http://license.coscl.org.cn/MulanPSL2 + * + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PSL v2 for more details. + */ + +package org.opengauss.datachecker.common.exception; + +import lombok.extern.slf4j.Slf4j; + +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * Custom Check Thread Factory + * + * @author :wangchao + * @date :Created in 2022/9/18 + * @since :11 + */ +public class CheckThreadFactory implements ThreadFactory { + private static final AtomicInteger POOL_NUMBER = new AtomicInteger(1); + + private final ThreadGroup group; + private final AtomicInteger threadNumber = new AtomicInteger(1); + private final String namePrefix; + + /** + * CheckThreadFactory + */ + public CheckThreadFactory() { + SecurityManager securityManager = System.getSecurityManager(); + group = (securityManager != null) ? securityManager.getThreadGroup() : Thread.currentThread().getThreadGroup(); + namePrefix = "pool-" + POOL_NUMBER.getAndIncrement() + "-thread-"; + } + + /** + * newThread + * + * @param runnable runnable + * @return thread + */ + @SuppressWarnings("NullableProblems") + @Override + public Thread newThread(Runnable runnable) { + Thread thread = new Thread(group, runnable, namePrefix + threadNumber.getAndIncrement(), 0); + if (thread.isDaemon()) { + thread.setDaemon(false); + } + + // UncaughtExceptionHandler + thread.setUncaughtExceptionHandler(new CheckUncaughtExceptionHandler()); + if (thread.getPriority() != Thread.NORM_PRIORITY) { + thread.setPriority(Thread.NORM_PRIORITY); + } + return thread; + } +} + +/** + * Custom Check Thread Exception Handler + */ +@Slf4j +class CheckUncaughtExceptionHandler implements Thread.UncaughtExceptionHandler { + @Override + public void uncaughtException(Thread thread, Throwable throwable) { + log.error("{} exception: ", thread.getName(), throwable); + } +} diff --git a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/client/CheckingFeignClient.java b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/client/CheckingFeignClient.java index d2af7d5..c7307a7 100644 --- a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/client/CheckingFeignClient.java +++ b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/client/CheckingFeignClient.java @@ -26,6 +26,7 @@ import org.springframework.web.bind.annotation.RequestParam; import javax.validation.constraints.NotEmpty; import java.util.List; +import java.util.Map; /** *

@@ -55,14 +56,6 @@ public interface CheckingFeignClient {
     void refreshTableExtractStatus(@RequestParam(value = "tableName") @NotEmpty String tableName,
         @RequestParam(value = "endpoint") @NonNull Endpoint endpoint, @RequestParam(value = "status") int status);
 
-    /**
-     * Initializing task status
-     *
-     * @param tableNameList table name list
-     */
-    @PostMapping("/table/extract/status/init")
-    void initTableExtractStatus(@RequestBody @NotEmpty List tableNameList);
-
     /**
      * Incremental verification log notification
      *
@@ -78,11 +71,10 @@ public interface CheckingFeignClient {
     void health();
 
     /**
-     * query check status of current table
+     * query check status of all table
      *
-     * @param tableName tableName
      * @return table status
      */
-    @GetMapping("/query/table/status")
-    int queryTableCheckStatus(@RequestParam("tableName") String tableName);
+    @GetMapping("/query/all/table/status")
+    Map queryTableCheckStatus();
 }
\ No newline at end of file
diff --git a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/config/KafkaProducerConfig.java b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/config/KafkaProducerConfig.java
index 62fa5bc..973a98d 100644
--- a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/config/KafkaProducerConfig.java
+++ b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/config/KafkaProducerConfig.java
@@ -27,7 +27,6 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.Properties;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.stream.Collectors;
 
 /**
  * @author :wangchao
@@ -46,7 +45,7 @@ public class KafkaProducerConfig {
     private KafkaProperties properties;
 
     /**
-     *Obtaining a specified producer client based on topic.
+     * Obtaining a specified producer client based on topic.
      *
      * @param topic topic name
      * @return the topic corresponds to the producer client.
@@ -69,7 +68,7 @@ public class KafkaProducerConfig {
         // configuration information
         Properties props = new Properties();
         // kafka server address
-        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServers().stream().collect(Collectors.joining(",")));
+        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, String.join(",", properties.getBootstrapServers()));
         props.put(ProducerConfig.ACKS_CONFIG, properties.getProducer().getAcks());
         // sets the serialization processing class for data keys and values.
         props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, properties.getProducer().getKeySerializer());
diff --git a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/service/DataExtractServiceImpl.java b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/service/DataExtractServiceImpl.java
index 79a9c78..9c19d26 100644
--- a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/service/DataExtractServiceImpl.java
+++ b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/service/DataExtractServiceImpl.java
@@ -160,8 +160,6 @@ public class DataExtractServiceImpl implements DataExtractService {
             taskReference.set(taskList);
             log.info("build extract task process={} count={}", processNo, taskList.size());
             atomicProcessNo.set(processNo);
-
-            initTableExtractStatus(new ArrayList<>(tableNames));
             return taskList;
         } else {
             log.error("process={} is running extract task , {} please wait ... ", atomicProcessNo.get(), processNo);
@@ -209,10 +207,6 @@ public class DataExtractServiceImpl implements DataExtractService {
             });
             // Initialization data extraction task execution status
             TableExtractStatusCache.init(taskCountMap);
-
-            // Check the sink tables whether there are differences between the source tables
-            checkDifferencesTables(processNo, taskList, tableNames);
-
         } else {
             log.error("process={} is running extract task , {} please wait ... ", atomicProcessNo.get(), processNo);
             throw new ProcessMultipleException("process {" + atomicProcessNo.get() + "} is running extract task");
@@ -224,20 +218,6 @@ public class DataExtractServiceImpl implements DataExtractService {
         extractTask.setTableMetadata(MetaDataCache.get(tableName));
     }
 
-    private void checkDifferencesTables(String processNo, List sourceTaskList,
-        Set sinkTableNames) {
-        final List sinkDiffList =
-            sourceTaskList.stream().filter(task -> !sinkTableNames.contains(task.getTableName()))
-                          .map(ExtractTask::getTableName).distinct().collect(Collectors.toList());
-        if (CollectionUtils.isNotEmpty(sinkDiffList)) {
-            log.info("process={} ,the sink tables have differences between the source tables: [{}]", processNo,
-                sinkDiffList);
-            for (String tableName : sinkDiffList) {
-                checkingFeignClient.refreshTableExtractStatus(tableName, Endpoint.SINK, -1);
-            }
-        }
-    }
-
     /**
      * Clean up the current build task
      */
@@ -314,12 +294,12 @@ public class DataExtractServiceImpl implements DataExtractService {
             if (CollectionUtils.isEmpty(taskList)) {
                 return;
             }
+            Map tableCheckStatus = checkingFeignClient.queryTableCheckStatus();
             List> taskFutureList = new ArrayList<>();
             taskList.forEach(task -> {
                 log.info("Perform data extraction tasks {}", task.getTaskName());
                 final String tableName = task.getTableName();
-                final int tableCheckStatus = checkingFeignClient.queryTableCheckStatus(tableName);
-                if (tableCheckStatus == -1) {
+                if (!tableCheckStatus.containsKey(tableName) || tableCheckStatus.get(tableName) == -1) {
                     log.info("Abnormal table[{}] status, ignoring the current table data extraction task", tableName);
                     return;
                 }
@@ -404,13 +384,10 @@ public class DataExtractServiceImpl implements DataExtractService {
         Map taskCount = new HashMap<>(Constants.InitialCapacity.EMPTY);
         createTaskCountMapping(tableNameList, taskCount);
         TableExtractStatusCache.init(taskCount);
-        initTableExtractStatus(tableNameList);
     }
 
     private void createTaskCountMapping(List tableNameList, Map taskCount) {
-        tableNameList.forEach(table -> {
-            taskCount.put(table, 1);
-        });
+        tableNameList.forEach(table -> taskCount.put(table, 1));
     }
 
     /**
@@ -418,17 +395,16 @@ public class DataExtractServiceImpl implements DataExtractService {
      */
     @Override
     public void execExtractIncrementTaskByLogs() {
-
         List taskList = incrementTaskReference.get();
         if (CollectionUtils.isEmpty(taskList)) {
             log.info("endpoint [{}] task is empty!", extractProperties.getEndpoint().getDescription());
             return;
         }
+        Map tableCheckStatus = checkingFeignClient.queryTableCheckStatus();
         taskList.forEach(task -> {
             log.info("Perform data extraction increment tasks:{}", task.getTaskName());
             final String tableName = task.getTableName();
-            final int tableCheckStatus = checkingFeignClient.queryTableCheckStatus(tableName);
-            if (tableCheckStatus == -1) {
+            if (!tableCheckStatus.containsKey(tableName) || tableCheckStatus.get(tableName) == -1) {
                 log.info("Abnormal table[{}] status, ignoring the current table increment data extraction", tableName);
                 return;
             }
@@ -473,11 +449,4 @@ public class DataExtractServiceImpl implements DataExtractService {
     public String queryDatabaseSchema() {
         return extractProperties.getSchema();
     }
-
-    private void initTableExtractStatus(List tableNameList) {
-        if (Objects.equals(extractProperties.getEndpoint(), Endpoint.SOURCE)) {
-            checkingFeignClient.initTableExtractStatus(new ArrayList<>(tableNameList));
-            log.info("Notify the verification service to initialize the extraction task status:{}", tableNameList);
-        }
-    }
 }
diff --git a/datachecker-extract/src/test/java/org/opengauss/datachecker/extract/task/ResultSetHandlerSinkTest.java b/datachecker-extract/src/test/java/org/opengauss/datachecker/extract/task/ResultSetHandlerSinkTest.java
new file mode 100644
index 0000000..7638c95
--- /dev/null
+++ b/datachecker-extract/src/test/java/org/opengauss/datachecker/extract/task/ResultSetHandlerSinkTest.java
@@ -0,0 +1,235 @@
+/*
+ * Copyright (c) 2022-2022 Huawei Technologies Co.,Ltd.
+ *
+ * openGauss is licensed under Mulan PSL v2.
+ * You can use this software according to the terms and conditions of the Mulan PSL v2.
+ * You may obtain a copy of Mulan PSL v2 at:
+ *
+ *           http://license.coscl.org.cn/MulanPSL2
+ *
+ * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
+ * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
+ * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
+ * See the Mulan PSL v2 for more details.
+ */
+
+package org.opengauss.datachecker.extract.task;
+
+import lombok.extern.slf4j.Slf4j;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.DisplayName;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInstance;
+import org.junit.jupiter.api.TestInstance.Lifecycle;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.opengauss.datachecker.common.constant.Constants.InitialCapacity;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.jdbc.core.JdbcTemplate;
+import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate;
+import org.springframework.test.context.ActiveProfiles;
+import org.springframework.test.context.junit.jupiter.SpringExtension;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+
+@Slf4j
+@TestInstance(Lifecycle.PER_CLASS)
+@ActiveProfiles("sink")
+@SpringBootTest
+@ExtendWith(SpringExtension.class)
+class ResultSetHandlerSinkTest {
+    @Autowired
+    private JdbcTemplate jdbcTemplateOne;
+    private static NamedParameterJdbcTemplate jdbc;
+    private static ResultSetHandler resultSetHandler;
+
+    @BeforeAll
+    private void createTable() {
+        createEnv_0034_02();
+        createEnv_0037_01();
+        createEnv_0052_05();
+        createEnv_0052_10();
+        jdbc = new NamedParameterJdbcTemplate(jdbcTemplateOne);
+        resultSetHandler = new ResultSetHandler();
+    }
+
+    private void createEnv_0034_02() {
+        jdbcTemplateOne.execute(CreateTableSql.DROP);
+        jdbcTemplateOne.execute(CreateTableSql.CREATE);
+        jdbcTemplateOne.execute(CreateTableSql.INSERT);
+    }
+
+    private void createEnv_0052_10() {
+        jdbcTemplateOne.execute(CreateTableSql.DROP_0052_10);
+        jdbcTemplateOne.execute(CreateTableSql.CREATE_0052_10);
+        jdbcTemplateOne.execute(CreateTableSql.INSERT_0052_10);
+    }
+
+    private void createEnv_0052_05() {
+        jdbcTemplateOne.execute(CreateTableSql.DROP_0052_05);
+        jdbcTemplateOne.execute(CreateTableSql.CREATE_0052_05);
+        jdbcTemplateOne.execute(CreateTableSql.INSERT_0052_05);
+    }
+
+    private void createEnv_0037_01() {
+        jdbcTemplateOne.execute(CreateTableSql.DROP_0037_01);
+        jdbcTemplateOne.execute(CreateTableSql.CREATE_0037_01);
+        jdbcTemplateOne.execute(CreateTableSql.INSERT_0037_01);
+    }
+
+    interface CreateTableSql {
+        // byte binary type
+        String DROP = "DROP TABLE IF EXISTS t_data_checker_0034_02;";
+        String CREATE = "CREATE TABLE t_data_checker_0034_02 (c_1 bytea NOT NULL, c_2 bytea,"
+            + "c_3 blob,  c_4 blob, c_5 blob, c_6 blob)  WITH (orientation=row, compression=no);"
+            + "ALTER TABLE t_data_checker_0034_02 ADD CONSTRAINT pk_t_data_checker_0034_02_1662889877_1 PRIMARY KEY (c_1)";
+        String INSERT = "INSERT INTO t_data_checker_0034_02 (c_1,c_2,c_3,c_4,c_5,c_6) VALUES"
+            + "('\u00013','\u000E','02AA','FF','0E','FF'),('1','10',null,null,null,null),"
+            + "('saa','sing','6D657461','636F636F','6170706C65','70656E63696C');";
+        String QUERY_ONE = "select * from t_data_checker_0034_02 where c_1='\u00013'";
+        String QUERY_ONE0x310000 = "select * from t_data_checker_0034_02 where c_1='1'";
+
+        // JSON
+        String DROP_0052_10 = " drop table if exists t_data_checker_0052_10;";
+        String CREATE_0052_10 = "CREATE TABLE t_data_checker_0052_10(c1 JSON, c2 int primary key);";
+        String INSERT_0052_10 =
+            "insert into t_data_checker_0052_10 values" + "('{\"key1\": \"value1\", \"key2\": \"value2\"}',1),"
+                + "('{\"m\": 17, \"n\": \"red\"}',2), ('{\"x\": 17, \"y\": \"red\", \"z\": [3, 5, 7]}',3);";
+        String QUERY_0052_10 = "select * from t_data_checker_0052_10 where c2=1";
+
+        // char
+        String DROP_0052_05 = "drop table if  exists t_data_checker_0052_05;";
+        String CREATE_0052_05 =
+            "create table t_data_checker_0052_05(c_1 character(255) NOT NULL,c_2 character varying(255),c_3 text, c_4 text, c_5 text, c_6 text );"
+                + " ALTER TABLE t_data_checker_0052_05 ADD CONSTRAINT pk_t_data_checker_0052_05_1663125177_1 PRIMARY KEY (c_1);";
+        String INSERT_0052_05 =
+            "insert into t_data_checker_0052_05 values ('数据校验工具使用','MySQL数据库','文本信息','滕王阁','王勃','落霞与孤鹜齐飞'),"
+                + "('测试工具','openGauss数据库','字符串','将进酒','李白','呼儿将出换美酒'),('102','tom','wangyu@163.com','2022-09-04','sales','Hi')";
+        String QUERY_0052_05 = "select * from t_data_checker_0052_05 where c_1='测试工具'";
+
+        String DROP_0037_01 = "drop table if  exists t_data_checker_0037_01;";
+        String CREATE_0037_01 =
+            "create table t_data_checker_0037_01 (c_1 character(255) , c_2 character varying(255),c_3 text, c_4 text, c_5 text, c_6 text,"
+                + " store_id integer NOT NULL)  partition by range (store_id) (partition p0 values less than (3) TABLESPACE pg_default,"
+                + " partition p1 values less than (5) TABLESPACE pg_default,partition p2 values less than (7) TABLESPACE pg_default,"
+                + " partition p3 values less than (MAXVALUE) TABLESPACE pg_default) ENABLE ROW MOVEMENT;"
+                + " ALTER TABLE t_data_checker_0037_01 ADD CONSTRAINT pk_t_data_checker_0037_01_1663125177_0 PRIMARY KEY (store_id);";
+        String INSERT_0037_01 = "insert into t_data_checker_0037_01(c_1, c_2,c_3,c_4,c_5,c_6,store_id) values"
+            + "('102','tom','wangyu@163.com','2022-09-04','sales','Hi',2),"
+            + "('101','jack','chenyu@163.com','2022-09-03','sales','hello',1)";
+        String QUERY_0037_01 = "select * from t_data_checker_0037_01 where c_1='101'";
+
+    }
+
+    @DisplayName("query binary c1=0x013300")
+    @Test
+    void testPutOneResultSetToMap() throws Exception {
+        // Setup
+        final List> result =
+            jdbc.query(CreateTableSql.QUERY_ONE, new HashMap<>(InitialCapacity.EMPTY),
+                (rs, rowNum) -> resultSetHandler.putOneResultSetToMap(rs));
+        final Map map = result.get(0);
+        log.info("testPutOneResultSetToMap {}", map.toString());
+        //Verify the results
+        assertEquals("1,51,0", map.get("c_1"));
+        assertEquals("14", map.get("c_2"));
+        assertEquals("2,-86", map.get("c_3"));
+        assertEquals("-1", map.get("c_4"));
+        assertEquals("14", map.get("c_5"));
+        assertEquals("-1", map.get("c_6"));
+    }
+
+    @DisplayName("query sink binary c1=0x310000")
+    @Test
+    void testPutOneResultSetToMap_0x310000() throws Exception {
+        // Setup
+        final List> result =
+            jdbc.query(CreateTableSql.QUERY_ONE0x310000, new HashMap<>(InitialCapacity.EMPTY),
+                (rs, rowNum) -> resultSetHandler.putOneResultSetToMap(rs));
+        final Map map = result.get(0);
+        log.info("testPutOneResultSetToMap_0x310000 {}", map.toString());
+
+        //Verify the results
+        assertEquals("49,0,0", map.get("c_1"));
+        assertEquals("49,48", map.get("c_2"));
+        assertNull(map.get("c_3"));
+        assertNull(map.get("c_4"));
+        assertNull(map.get("c_5"));
+        assertNull(map.get("c_6"));
+    }
+
+    @DisplayName("query sink binary c1=0x310000")
+    @Test
+    void test_byte_blob_0x310000() {
+        // Setup
+        final List> result =
+            jdbc.query(CreateTableSql.QUERY_ONE0x310000, new HashMap<>(InitialCapacity.EMPTY),
+                (rs, rowNum) -> resultSetHandler.putOneResultSetToMap(rs));
+        final Map map = result.get(0);
+        log.info("testPutOneResultSetToMap_0x310000 {}", map.toString());
+
+        //Verify the results
+        assertEquals("49,0,0", map.get("c_1"));
+        assertEquals("49,48", map.get("c_2"));
+        assertNull(map.get("c_3"));
+        assertNull(map.get("c_4"));
+        assertNull(map.get("c_5"));
+        assertNull(map.get("c_6"));
+    }
+
+    @DisplayName("query sink json 0052_10 c2=1")
+    @Test
+    void test_query_json_0052_10() {
+        // Setup
+        final List> result =
+            jdbc.query(CreateTableSql.QUERY_0052_10, new HashMap<>(InitialCapacity.EMPTY),
+                (rs, rowNum) -> resultSetHandler.putOneResultSetToMap(rs));
+        final Map map = result.get(0);
+        log.info("test_t_data_checker_0052_10 {}", map.toString());
+        //Verify the results
+        assertEquals("{\"key1\": \"value1\", \"key2\": \"value2\"}", map.get("c1"));
+        assertEquals("1", map.get("c2"));
+    }
+
+    @DisplayName("query source char 0052_05 c_1=测试工具")
+    @Test
+    void test_query_char_0052_05() {
+        // Setup
+        final List> result =
+            jdbc.query(ResultSetHandlerTest.CreateTableSql.QUERY_0052_05, new HashMap<>(InitialCapacity.EMPTY),
+                (rs, rowNum) -> resultSetHandler.putOneResultSetToMap(rs));
+        final Map map = result.get(0);
+        log.info("test_t_data_checker_0052_05 {}", map.toString());
+        //Verify the results
+        assertEquals("测试工具", map.get("c_1"));
+        assertEquals("openGauss数据库", map.get("c_2"));
+        assertEquals("字符串", map.get("c_3"));
+        assertEquals("将进酒", map.get("c_4"));
+        assertEquals("李白", map.get("c_5"));
+        assertEquals("呼儿将出换美酒", map.get("c_6"));
+    }
+
+    @DisplayName("query source char 0037_01")
+    @Test
+    void test_query_char_0037_01() {
+        // Setup
+        final List> result =
+            jdbc.query(ResultSetHandlerTest.CreateTableSql.QUERY_0037_01, new HashMap<>(InitialCapacity.EMPTY),
+                (rs, rowNum) -> resultSetHandler.putOneResultSetToMap(rs));
+        final Map map = result.get(0);
+        log.info("test_t_data_checker_0037_01 {}", map.toString());
+        //Verify the results
+        assertEquals("101", map.get("c_1"));
+        assertEquals("jack", map.get("c_2"));
+        assertEquals("chenyu@163.com", map.get("c_3"));
+        assertEquals("2022-09-03", map.get("c_4"));
+        assertEquals("sales", map.get("c_5"));
+        assertEquals("hello", map.get("c_6"));
+        assertEquals("1", map.get("store_id"));
+    }
+}
diff --git a/datachecker-extract/src/test/java/org/opengauss/datachecker/extract/task/ResultSetHandlerTest.java b/datachecker-extract/src/test/java/org/opengauss/datachecker/extract/task/ResultSetHandlerTest.java
new file mode 100644
index 0000000..20718cf
--- /dev/null
+++ b/datachecker-extract/src/test/java/org/opengauss/datachecker/extract/task/ResultSetHandlerTest.java
@@ -0,0 +1,200 @@
+/*
+ * Copyright (c) 2022-2022 Huawei Technologies Co.,Ltd.
+ *
+ * openGauss is licensed under Mulan PSL v2.
+ * You can use this software according to the terms and conditions of the Mulan PSL v2.
+ * You may obtain a copy of Mulan PSL v2 at:
+ *
+ *           http://license.coscl.org.cn/MulanPSL2
+ *
+ * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
+ * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
+ * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
+ * See the Mulan PSL v2 for more details.
+ */
+
+package org.opengauss.datachecker.extract.task;
+
+import lombok.extern.slf4j.Slf4j;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.DisplayName;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInstance;
+import org.junit.jupiter.api.TestInstance.Lifecycle;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.opengauss.datachecker.common.constant.Constants.InitialCapacity;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.jdbc.core.JdbcTemplate;
+import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate;
+import org.springframework.test.context.ActiveProfiles;
+import org.springframework.test.context.junit.jupiter.SpringExtension;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+
+@Slf4j
+@TestInstance(Lifecycle.PER_CLASS)
+@ActiveProfiles("source")
+@SpringBootTest
+@ExtendWith(SpringExtension.class)
+class ResultSetHandlerTest {
+    @Autowired
+    private JdbcTemplate jdbcTemplateOne;
+    private NamedParameterJdbcTemplate jdbc;
+    private ResultSetHandler resultSetHandler;
+
+    @BeforeAll
+    private void createTable() {
+        jdbcTemplateOne.execute(CreateTableSql.DROP_0034_02);
+        jdbcTemplateOne.execute(CreateTableSql.CREATE_0034_02);
+        jdbcTemplateOne.execute(CreateTableSql.INSERT_0034_02);
+
+        jdbcTemplateOne.execute(CreateTableSql.DROP_0052_10);
+        jdbcTemplateOne.execute(CreateTableSql.CREATE_0052_10);
+        jdbcTemplateOne.execute(CreateTableSql.INSERT_0052_10);
+
+        jdbcTemplateOne.execute(CreateTableSql.DROP_0052_05);
+        jdbcTemplateOne.execute(CreateTableSql.CREATE_0052_05);
+        jdbcTemplateOne.execute(CreateTableSql.INSERT_0052_05);
+
+        jdbcTemplateOne.execute(CreateTableSql.DROP_0037_01);
+        jdbcTemplateOne.execute(CreateTableSql.CREATE_0037_01);
+        jdbcTemplateOne.execute(CreateTableSql.INSERT_0037_01);
+
+        jdbc = new NamedParameterJdbcTemplate(jdbcTemplateOne);
+        resultSetHandler = new ResultSetHandler();
+    }
+
+    interface CreateTableSql {
+        // byte binary type
+        String DROP_0034_02 = "DROP TABLE IF EXISTS `t_data_checker_0034_02`;";
+        String CREATE_0034_02 = "CREATE TABLE IF NOT EXISTS `t_data_checker_0034_02` (`c_1` binary(3) NOT NULL,"
+            + "  `c_2` varbinary(10) DEFAULT NULL,  `c_3` tinyblob,  `c_4` blob, `c_5` mediumblob,"
+            + "  `c_6` longblob, PRIMARY KEY (`c_1`)) ENGINE=InnoDB";
+        String INSERT_0034_02 = "INSERT INTO `t_data_checker_0034_02` (`c_1`, `c_2`, `c_3`, `c_4`, `c_5`, `c_6`) VALUES"
+            + "(0x013300, 0x0E, 0x02AA,0xFF, 0x0E, 0xFF), (0x310000, 0x3130, NULL, NULL, NULL, NULL),"
+            + "(0x736161, 0x73696E67, 0x6D657461, 0x636F636F, 0x6170706C65,0x70656E63696C);";
+        String QUERY_ONE = "select * from t_data_checker_0034_02 where c_1=0x013300";
+        String QUERY_ONE0x310000 = "select * from t_data_checker_0034_02 where c_1=0x310000";
+
+        // JSON
+        String DROP_0052_10 = " drop table if exists t_data_checker_0052_10;";
+        String CREATE_0052_10 = "CREATE TABLE t_data_checker_0052_10(c1 JSON, c2 int primary key);";
+        String INSERT_0052_10 =
+            "insert into t_data_checker_0052_10 values" + "('{\"key1\": \"value1\", \"key2\": \"value2\"}',1),"
+                + "('{\"m\": 17, \"n\": \"red\"}',2), ('{\"x\": 17, \"y\": \"red\", \"z\": [3, 5, 7]}',3);";
+        String QUERY_0052_10 = "select * from t_data_checker_0052_10 where c2=1";
+
+        // char
+        String DROP_0052_05 = "drop table if  exists t_data_checker_0052_05;";
+        String CREATE_0052_05 = "create table t_data_checker_0052_05(c_1 char(255) primary key, c_2 varchar(255),"
+            + "c_3 text, c_4 mediumtext, c_5 tinytext, c_6 longtext )default charset=utf8;";
+        String INSERT_0052_05 =
+            "insert into t_data_checker_0052_05 values ('数据校验工具使用','MySQL数据库','文本信息','滕王阁','王勃','落霞与孤鹜齐飞'),"
+                + "('测试工具','openGauss数据库','字符串','将进酒','李白','呼儿将出换美酒'),('102','tom','wangyu@163.com','2022-09-04','sales','Hi')";
+        String QUERY_0052_05 = "select * from t_data_checker_0052_05 where c_1='测试工具'";
+
+        String DROP_0037_01 = "drop table if  exists t_data_checker_0037_01;";
+        String CREATE_0037_01 =
+            "create table t_data_checker_0037_01 (c_1 char(255),c_2 varchar(255),c_3 text, c_4 mediumtext,"
+                + " c_5 tinytext,c_6 longtext, store_id int not null primary key)  partition by range (store_id) (partition p0 values less than (3),"
+                + " partition p1 values less than (5),partition p2 values less than (7),partition p3 values less than maxvalue);";
+        String INSERT_0037_01 = "insert into t_data_checker_0037_01(c_1, c_2,c_3,c_4,c_5,c_6,store_id) values"
+            + "('102','tom','wangyu@163.com','2022-09-04','sales','Hi',2),"
+            + "('101','jack','chenyu@163.com','2022-09-03','sales','hello',1)";
+        String QUERY_0037_01 = "select * from t_data_checker_0037_01 where store_id=1";
+
+    }
+
+    @DisplayName("query source binary 0034_02 c1=0x013300")
+    @Test
+    void testPutOneResultSetToMap() {
+        // Setup
+        final List> result =
+            jdbc.query(CreateTableSql.QUERY_ONE, new HashMap<>(InitialCapacity.EMPTY),
+                (rs, rowNum) -> resultSetHandler.putOneResultSetToMap(rs));
+        final Map map = result.get(0);
+        log.info("query source binary 0034_02 c1=0x013300 {}", map.toString());
+        //Verify the results
+        assertEquals("1,51,0", map.get("c_1"));
+        assertEquals("14", map.get("c_2"));
+        assertEquals("2,-86", map.get("c_3"));
+        assertEquals("-1", map.get("c_4"));
+        assertEquals("14", map.get("c_5"));
+        assertEquals("-1", map.get("c_6"));
+    }
+
+    @DisplayName("query source binary 0034_02 c1=0x310000")
+    @Test
+    void testPutOneResultSetToMap_0x310000() {
+        // Setup
+        final List> result =
+            jdbc.query(CreateTableSql.QUERY_ONE0x310000, new HashMap<>(InitialCapacity.EMPTY),
+                (rs, rowNum) -> resultSetHandler.putOneResultSetToMap(rs));
+        final Map map = result.get(0);
+        log.info("query source binary 0034_02 c1=0x310000 {}", map.toString());
+        //Verify the results
+        assertEquals("49,0,0", map.get("c_1"));
+        assertEquals("49,48", map.get("c_2"));
+        assertNull(map.get("c_3"));
+        assertNull(map.get("c_4"));
+        assertNull(map.get("c_5"));
+        assertNull(map.get("c_6"));
+    }
+
+    @DisplayName("query source json 0052_10 c2=1")
+    @Test
+    void test_query_json_0052_10() {
+        // Setup
+        final List> result =
+            jdbc.query(CreateTableSql.QUERY_0052_10, new HashMap<>(InitialCapacity.EMPTY),
+                (rs, rowNum) -> resultSetHandler.putOneResultSetToMap(rs));
+        final Map map = result.get(0);
+        log.info("query source json 0052_10 c2=1 {}", map.toString());
+        //Verify the results
+        assertEquals("{\"key1\": \"value1\", \"key2\": \"value2\"}", map.get("c1"));
+        assertEquals("1", map.get("c2"));
+    }
+
+    @DisplayName("query source char 0052_05 c_1=1")
+    @Test
+    void test_query_char_0052_05() {
+        // Setup
+        final List> result =
+            jdbc.query(CreateTableSql.QUERY_0052_05, new HashMap<>(InitialCapacity.EMPTY),
+                (rs, rowNum) -> resultSetHandler.putOneResultSetToMap(rs));
+        final Map map = result.get(0);
+        log.info("query source char 0052_05 c_1=1 {}", map.toString());
+        //Verify the results
+        assertEquals("测试工具", map.get("c_1"));
+        assertEquals("openGauss数据库", map.get("c_2"));
+        assertEquals("字符串", map.get("c_3"));
+        assertEquals("将进酒", map.get("c_4"));
+        assertEquals("李白", map.get("c_5"));
+        assertEquals("呼儿将出换美酒", map.get("c_6"));
+    }
+
+    @DisplayName("query source char 0037_01")
+    @Test
+    void test_query_char_0037_01() {
+        // Setup
+        final List> result =
+            jdbc.query(CreateTableSql.QUERY_0037_01, new HashMap<>(InitialCapacity.EMPTY),
+                (rs, rowNum) -> resultSetHandler.putOneResultSetToMap(rs));
+        final Map map = result.get(0);
+        log.info("query source char 0037_01 {}", map.toString());
+        //Verify the results
+        assertEquals("101", map.get("c_1"));
+        assertEquals("jack", map.get("c_2"));
+        assertEquals("chenyu@163.com", map.get("c_3"));
+        assertEquals("2022-09-03", map.get("c_4"));
+        assertEquals("sales", map.get("c_5"));
+        assertEquals("hello", map.get("c_6"));
+        assertEquals("1", map.get("store_id"));
+    }
+}
-- 
Gitee