diff --git a/datachecker-check/src/main/java/org/opengauss/datachecker/check/cache/TableStatusRegister.java b/datachecker-check/src/main/java/org/opengauss/datachecker/check/cache/TableStatusRegister.java index 156c4111505ad5f88422276f42dcc43324db511b..f84f749a2cbf2981965a9417c8d3c52df79ad980 100644 --- a/datachecker-check/src/main/java/org/opengauss/datachecker/check/cache/TableStatusRegister.java +++ b/datachecker-check/src/main/java/org/opengauss/datachecker/check/cache/TableStatusRegister.java @@ -361,6 +361,7 @@ public class TableStatusRegister implements Cache { public void removeAll() { TABLE_STATUS_CACHE.clear(); COMPLETED_TABLE_QUEUE.clear(); + TABLE_PARTITIONS_STATUS_CACHE.clear(); log.info("table status register cache information clearing"); } diff --git a/datachecker-check/src/main/java/org/opengauss/datachecker/check/client/ExtractFallbackFactory.java b/datachecker-check/src/main/java/org/opengauss/datachecker/check/client/ExtractFallbackFactory.java index 11240c45f8b2854660f177d4f2e99e167cf72c4e..6f055090a805278758659eb330fbeb5737a62234 100644 --- a/datachecker-check/src/main/java/org/opengauss/datachecker/check/client/ExtractFallbackFactory.java +++ b/datachecker-check/src/main/java/org/opengauss/datachecker/check/client/ExtractFallbackFactory.java @@ -149,10 +149,6 @@ public class ExtractFallbackFactory implements FallbackFactory dataLogList) { - } - @Override public Result queryTableMetadataHash(String tableName) { return Result.error("Remote call, query table metadata hash information exception"); diff --git a/datachecker-check/src/main/java/org/opengauss/datachecker/check/client/ExtractFeignClient.java b/datachecker-check/src/main/java/org/opengauss/datachecker/check/client/ExtractFeignClient.java index 48648fc8981aa8e0faa558c5087fb937b578d098..1a3a943ee3cd97e332c1b516e6abadcfe35c4845 100644 --- a/datachecker-check/src/main/java/org/opengauss/datachecker/check/client/ExtractFeignClient.java +++ b/datachecker-check/src/main/java/org/opengauss/datachecker/check/client/ExtractFeignClient.java @@ -174,14 +174,6 @@ public interface ExtractFeignClient { Result> buildRepairStatementDeleteDml(@RequestParam(name = "schema") String schema, @RequestParam(name = "tableName") String tableName, @RequestBody Set diffSet); - /** - * Issue incremental log data - * - * @param dataLogList incremental log data - */ - @PostMapping("/extract/increment/logs/data") - void notifyIncrementDataLogs(List dataLogList); - /** * Query table metadata hash information * diff --git a/datachecker-check/src/main/java/org/opengauss/datachecker/check/client/FeignClientService.java b/datachecker-check/src/main/java/org/opengauss/datachecker/check/client/FeignClientService.java index 40fe4642285042915e821fee731bda3f550cc2f6..6e70d65887e935a8c0b50b8486507a75d473c60d 100644 --- a/datachecker-check/src/main/java/org/opengauss/datachecker/check/client/FeignClientService.java +++ b/datachecker-check/src/main/java/org/opengauss/datachecker/check/client/FeignClientService.java @@ -17,7 +17,6 @@ package org.opengauss.datachecker.check.client; import org.opengauss.datachecker.common.entry.enums.Endpoint; import org.opengauss.datachecker.common.entry.extract.ExtractTask; -import org.opengauss.datachecker.common.entry.extract.SourceDataLog; import org.opengauss.datachecker.common.entry.extract.TableMetadata; import org.opengauss.datachecker.common.entry.extract.Topic; import org.opengauss.datachecker.common.exception.DispatchClientException; @@ -249,16 +248,6 @@ public class FeignClientService { } } - /** - * Issue incremental log data - * - * @param endpoint endpoint type - * @param dataLogList incremental log data - */ - public void notifyIncrementDataLogs(Endpoint endpoint, List dataLogList) { - getClient(endpoint).notifyIncrementDataLogs(dataLogList); - } - /** * Query the schema information of the extraction end database * diff --git a/datachecker-check/src/main/java/org/opengauss/datachecker/check/controller/IncrementManagerController.java b/datachecker-check/src/main/java/org/opengauss/datachecker/check/controller/IncrementManagerController.java index e5ca3bc23302e686187bc631595be7c920e67b41..e1ad4445b88283a203761958617f7a238fdb04a1 100644 --- a/datachecker-check/src/main/java/org/opengauss/datachecker/check/controller/IncrementManagerController.java +++ b/datachecker-check/src/main/java/org/opengauss/datachecker/check/controller/IncrementManagerController.java @@ -16,7 +16,6 @@ package org.opengauss.datachecker.check.controller; import io.swagger.v3.oas.annotations.Operation; -import io.swagger.v3.oas.annotations.Parameter; import io.swagger.v3.oas.annotations.tags.Tag; import org.opengauss.datachecker.check.service.IncrementManagerService; import org.opengauss.datachecker.common.entry.extract.SourceDataLog; @@ -30,6 +29,8 @@ import javax.validation.constraints.NotEmpty; import java.util.List; /** + * IncrementManagerController + * * @author :wangchao * @date :Created in 2022/5/25 * @since :11 @@ -49,8 +50,7 @@ public class IncrementManagerController { */ @Operation(summary = "Incremental verification log notification") @PostMapping("/notify/source/increment/data/logs") - public void notifySourceIncrementDataLogs(@Parameter(description = "Incremental verification log") @RequestBody - @NotEmpty List dataLogList) { + public void notifySourceIncrementDataLogs(@RequestBody @NotEmpty List dataLogList) { incrementManagerService.notifySourceIncrementDataLogs(dataLogList); } diff --git a/datachecker-check/src/main/java/org/opengauss/datachecker/check/modules/check/AbstractCheckDiffResultBuilder.java b/datachecker-check/src/main/java/org/opengauss/datachecker/check/modules/check/AbstractCheckDiffResultBuilder.java index ac76ae6c9dbe750fab3bf109fde33cb46ab4e871..e80e5c62ce2be1195e95002679f5d5cd2cf1de20 100644 --- a/datachecker-check/src/main/java/org/opengauss/datachecker/check/modules/check/AbstractCheckDiffResultBuilder.java +++ b/datachecker-check/src/main/java/org/opengauss/datachecker/check/modules/check/AbstractCheckDiffResultBuilder.java @@ -18,12 +18,14 @@ package org.opengauss.datachecker.check.modules.check; import lombok.Getter; import lombok.extern.slf4j.Slf4j; import org.opengauss.datachecker.check.client.FeignClientService; +import org.opengauss.datachecker.common.entry.enums.CheckMode; import org.opengauss.datachecker.common.entry.enums.DML; import org.opengauss.datachecker.common.entry.enums.Endpoint; import java.time.LocalDateTime; import java.util.ArrayList; import java.util.List; +import java.util.Objects; import java.util.Set; /** @@ -49,6 +51,7 @@ public abstract class AbstractCheckDiffResultBuilder keyUpdateSet; private Set keyInsertSet; @@ -158,6 +161,11 @@ public abstract class AbstractCheckDiffResultBuilder sourceBucketList = Collections.synchronizedList(new ArrayList<>()); private final List sinkBucketList = Collections.synchronizedList(new ArrayList<>()); private final DifferencePair, Map, Map>> - difference = DifferencePair.of(new HashMap<>(), new HashMap<>(), new HashMap<>()); + difference = DifferencePair.of(new HashMap<>(), new HashMap<>(), new HashMap<>()); private final Map> bucketNumberDiffMap = new HashMap<>(); private final FeignClientService feignClient; private final StatisticalService statisticalService; @@ -103,7 +104,7 @@ public class DataCheckRunnable implements Runnable { private KafkaConsumerHandler buildKafkaHandler(DataCheckRunnableSupport support) { KafkaConsumerService kafkaConsumerService = support.getKafkaConsumerService(); return new KafkaConsumerHandler(kafkaConsumerService.buildKafkaConsumer(false), - kafkaConsumerService.getRetryFetchRecordTimes()); + kafkaConsumerService.getRetryFetchRecordTimes()); } /** @@ -147,9 +148,9 @@ public class DataCheckRunnable implements Runnable { if (sourceTree.getDepth() != sinkTree.getDepth()) { refreshCheckStatus(); throw new MerkleTreeDepthException(String.format(Locale.ROOT, - "source & sink data have large different, Please synchronize data again! " - + "merkel tree depth different,source depth=[%d],sink depth=[%d]", sourceTree.getDepth(), - sinkTree.getDepth())); + "source & sink data have large different, Please synchronize data again! " + + "merkel tree depth different,source depth=[%d],sink depth=[%d]", sourceTree.getDepth(), + sinkTree.getDepth())); } // Recursively compare two Merkel trees and return the difference record. compareMerkleTree(sourceTree, sinkTree); @@ -199,7 +200,7 @@ public class DataCheckRunnable implements Runnable { sortBuckets(sourceBucketList); sortBuckets(sinkBucketList); log.info("Initialize the verification data and the bucket construction is currently completed of table [{}-{}]", - tableName, partitions); + tableName, partitions); } /** @@ -240,7 +241,7 @@ public class DataCheckRunnable implements Runnable { return; } log.info("Initialize the verification thread data, and pull the total number of [{}-{}-{}] data records to {}", - endpoint.getDescription(), tableName, partitions, dataList.size()); + endpoint.getDescription(), tableName, partitions, dataList.size()); BuilderBucketHandler bucketBuilder = new BuilderBucketHandler(bucketCapacity); // Use the pulled data to build the bucket list @@ -393,7 +394,7 @@ public class DataCheckRunnable implements Runnable { } else { // sourceSize is less than thresholdMinBucketSize, that is, there is only one bucket. Compare DifferencePair subDifference = - compareBucket(sourceBucketList.get(0), sinkBucketList.get(0)); + compareBucket(sourceBucketList.get(0), sinkBucketList.get(0)); difference.getDiffering().putAll(subDifference.getDiffering()); difference.getOnlyOnLeft().putAll(subDifference.getOnlyOnLeft()); difference.getOnlyOnRight().putAll(subDifference.getOnlyOnRight()); @@ -402,20 +403,19 @@ public class DataCheckRunnable implements Runnable { } else { refreshCheckStatus(); throw new LargeDataDiffException(String.format( - "table[%s] source & sink data have large different," + "source-bucket-count=[%s] sink-bucket-count=[%s]" - + " Please synchronize data again! ", tableName, sourceBucketCount, sinkBucketCount)); + "table[%s] source & sink data have large different," + "source-bucket-count=[%s] sink-bucket-count=[%s]" + + " Please synchronize data again! ", tableName, sourceBucketCount, sinkBucketCount)); } } private void checkResult() { CheckDiffResult result = - AbstractCheckDiffResultBuilder.builder(feignClient).table(tableName).topic(topic.getTopicName()) - .schema(sinkSchema).partitions(partitions).isTableStructureEquals(true) - .isExistTableMiss(false, null) - .rowCount(rowCount).errorRate(20) - .keyUpdateSet(difference.getDiffering().keySet()) - .keyInsertSet(difference.getOnlyOnLeft().keySet()) - .keyDeleteSet(difference.getOnlyOnRight().keySet()).build(); + AbstractCheckDiffResultBuilder.builder(feignClient).table(tableName).topic(topic.getTopicName()) + .schema(sinkSchema).partitions(partitions).isTableStructureEquals(true) + .isExistTableMiss(false, null).rowCount(rowCount).errorRate(20) + .checkMode(CheckMode.FULL).keyUpdateSet(difference.getDiffering().keySet()) + .keyInsertSet(difference.getOnlyOnLeft().keySet()) + .keyDeleteSet(difference.getOnlyOnRight().keySet()).build(); ExportCheckResult.export(path, result); log.info("Complete the output of data verification results of table [{}-{}]", tableName, partitions); } diff --git a/datachecker-check/src/main/java/org/opengauss/datachecker/check/modules/check/DataCheckService.java b/datachecker-check/src/main/java/org/opengauss/datachecker/check/modules/check/DataCheckService.java index 5c59ffc7d85574ce20e04aa9b5f84466d51c2248..4c30c25cff48084fa691bf45dbaa3fd2c4641c03 100644 --- a/datachecker-check/src/main/java/org/opengauss/datachecker/check/modules/check/DataCheckService.java +++ b/datachecker-check/src/main/java/org/opengauss/datachecker/check/modules/check/DataCheckService.java @@ -18,6 +18,8 @@ package org.opengauss.datachecker.check.modules.check; import lombok.extern.slf4j.Slf4j; import org.opengauss.datachecker.check.config.DataCheckConfig; import org.opengauss.datachecker.common.entry.check.DataCheckParam; +import org.opengauss.datachecker.common.entry.check.IncrementDataCheckParam; +import org.opengauss.datachecker.common.entry.extract.SourceDataLog; import org.opengauss.datachecker.common.entry.extract.Topic; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; @@ -66,26 +68,27 @@ public class DataCheckService { final int errorRate = dataCheckConfig.getDataCheckProperties().getErrorRate(); final String checkResultPath = dataCheckConfig.getCheckResultPath(); return new DataCheckParam().setBucketCapacity(bucketCapacity).setTopic(topic).setPartitions(partitions) - .setProperties(kafkaProperties).setPath(checkResultPath).setErrorRate(errorRate); + .setProperties(kafkaProperties).setPath(checkResultPath).setErrorRate(errorRate); } /** * incrementCheckTableData * - * @param tableName tableName - * @param checkDataCount + * @param tableName tableName + * @param process process + * @param dataLog dataLog */ - public void incrementCheckTableData(String tableName, int checkDataCount) { - DataCheckParam checkParam = buildIncrementCheckParam(tableName, dataCheckConfig); - final IncrementCheckThread incrementCheck = - new IncrementCheckThread(checkParam, checkDataCount, dataCheckRunnableSupport); + public void incrementCheckTableData(String tableName, String process, SourceDataLog dataLog) { + IncrementDataCheckParam checkParam = buildIncrementCheckParam(tableName, dataCheckConfig); + checkParam.setDataLog(dataLog).setProcess(process); + final IncrementCheckThread incrementCheck = new IncrementCheckThread(checkParam, dataCheckRunnableSupport); checkAsyncExecutor.submit(incrementCheck); } - private DataCheckParam buildIncrementCheckParam(String tableName, DataCheckConfig dataCheckConfig) { + private IncrementDataCheckParam buildIncrementCheckParam(String tableName, DataCheckConfig dataCheckConfig) { final int bucketCapacity = dataCheckConfig.getBucketCapacity(); final String checkResultPath = dataCheckConfig.getCheckResultPath(); - return new DataCheckParam().setTableName(tableName).setBucketCapacity(bucketCapacity).setPartitions(0) - .setPath(checkResultPath); + return new IncrementDataCheckParam().setTableName(tableName).setBucketCapacity(bucketCapacity) + .setPath(checkResultPath); } } diff --git a/datachecker-check/src/main/java/org/opengauss/datachecker/check/modules/check/IncrementCheckThread.java b/datachecker-check/src/main/java/org/opengauss/datachecker/check/modules/check/IncrementCheckThread.java index 8b401ac2c5bf324854c1ec29b8a93af871e8e267..c257fd362a3d539923d3317df279fc360d1b7a04 100644 --- a/datachecker-check/src/main/java/org/opengauss/datachecker/check/modules/check/IncrementCheckThread.java +++ b/datachecker-check/src/main/java/org/opengauss/datachecker/check/modules/check/IncrementCheckThread.java @@ -24,14 +24,14 @@ import org.opengauss.datachecker.check.modules.bucket.BuilderBucketHandler; import org.opengauss.datachecker.check.modules.merkle.MerkleTree; import org.opengauss.datachecker.check.modules.merkle.MerkleTree.Node; import org.opengauss.datachecker.common.constant.Constants.InitialCapacity; -import org.opengauss.datachecker.common.entry.check.DataCheckParam; import org.opengauss.datachecker.common.entry.check.DifferencePair; +import org.opengauss.datachecker.common.entry.check.IncrementDataCheckParam; import org.opengauss.datachecker.common.entry.check.Pair; +import org.opengauss.datachecker.common.entry.enums.CheckMode; import org.opengauss.datachecker.common.entry.enums.Endpoint; import org.opengauss.datachecker.common.entry.extract.RowDataHash; import org.opengauss.datachecker.common.entry.extract.SourceDataLog; import org.opengauss.datachecker.common.entry.extract.TableMetadataHash; -import org.opengauss.datachecker.common.entry.extract.Topic; import org.opengauss.datachecker.common.exception.DispatchClientException; import org.opengauss.datachecker.common.exception.MerkleTreeDepthException; import org.opengauss.datachecker.common.web.Result; @@ -58,13 +58,12 @@ import java.util.Set; */ @Slf4j public class IncrementCheckThread extends Thread { - private static final int THRESHOLD_MIN_BUCKET_SIZE = 2; + private static final int THRESHOLD_MIN_BUCKET_SIZE = 20; private static final String THREAD_NAME_PRIFEX = "increment-data-check-"; - private final Topic topic; private final String tableName; - private final int partitions; private final int bucketCapacity; + private final int rowCount; private final String path; private final FeignClientService feignClient; private final List sourceBucketList = new ArrayList<>(); @@ -73,7 +72,12 @@ public class IncrementCheckThread extends Thread { difference = DifferencePair.of(new HashMap<>(), new HashMap<>(), new HashMap<>()); private final Map> bucketNumberDiffMap = new HashMap<>(); private final QueryRowDataWapper queryRowDataWapper; + private final SourceDataLog dataLog; + private final String process; private String sinkSchema; + private boolean isTableStructureEquals; + private boolean isExistTableMiss; + private Endpoint onlyExistEndpoint; /** * IncrementCheckThread constructor method @@ -81,11 +85,12 @@ public class IncrementCheckThread extends Thread { * @param checkParam Data Check Param * @param support Data Check Runnable Support */ - public IncrementCheckThread(@NonNull DataCheckParam checkParam, @NonNull DataCheckRunnableSupport support) { - super.setName(buildThreadName()); - topic = checkParam.getTopic(); - tableName = topic.getTableName(); - partitions = checkParam.getPartitions(); + public IncrementCheckThread(@NonNull IncrementDataCheckParam checkParam, + @NonNull DataCheckRunnableSupport support) { + dataLog = checkParam.getDataLog(); + process = checkParam.getProcess(); + rowCount = dataLog.getCompositePrimaryValues().size(); + tableName = checkParam.getTableName(); path = checkParam.getPath(); bucketCapacity = checkParam.getBucketCapacity(); feignClient = support.getFeignClientService(); @@ -105,19 +110,27 @@ public class IncrementCheckThread extends Thread { */ @Override public void run() { - sinkSchema = feignClient.getDatabaseSchema(Endpoint.SINK); - // Metadata verification - if (!checkTableMetadata()) { - return; + try { + setName(buildThreadName()); + sinkSchema = feignClient.getDatabaseSchema(Endpoint.SINK); + // Metadata verification + isTableStructureEquals = checkTableMetadata(); + if (isTableStructureEquals) { + // Initial verification + firstCheckCompare(); + // Analyze the initial verification results + List diffIdList = parseDiffResult(); + // Conduct secondary verification according to the initial verification results + secondaryCheckCompare(diffIdList); + } else { + log.error("check table metadata error"); + } + // Verification result verification repair report + checkResult(); + log.info("increment process {} check end", process); + } catch (Exception ex) { + log.error("check error", ex); } - // Initial verification - firstCheckCompare(); - // Analyze the initial verification results - List diffIdList = parseDiffResult(); - // Conduct secondary verification according to the initial verification results - secondaryCheckCompare(diffIdList); - // Verification result verification repair report - checkResult(); } /** @@ -162,7 +175,7 @@ public class IncrementCheckThread extends Thread { } private void initSecondaryCheckBucketList(List diffIdList) { - SourceDataLog dataLog = new SourceDataLog().setTableName(tableName).setCompositePrimaryValues(diffIdList); + dataLog.setCompositePrimaryValues(diffIdList); buildBucket(Endpoint.SOURCE, dataLog); // Align the source and destination bucket list alignAllBuckets(); @@ -188,6 +201,7 @@ public class IncrementCheckThread extends Thread { difference.getOnlyOnLeft().putAll(subDifference.getOnlyOnLeft()); difference.getOnlyOnRight().putAll(subDifference.getOnlyOnRight()); } + return; } // Construct Merkel tree constraint: bucketList cannot be empty, and size > =2 @@ -234,7 +248,20 @@ public class IncrementCheckThread extends Thread { private boolean checkTableMetadata() { TableMetadataHash sourceTableHash = queryTableMetadataHash(Endpoint.SOURCE, tableName); TableMetadataHash sinkTableHash = queryTableMetadataHash(Endpoint.SINK, tableName); - return Objects.equals(sourceTableHash, sinkTableHash); + boolean isEqual = Objects.equals(sourceTableHash, sinkTableHash); + if (!isEqual) { + isExistTableMiss = true; + if (sourceTableHash.getTableHash() == -1) { + onlyExistEndpoint = Endpoint.SINK; + } else if (sinkTableHash.getTableHash() == -1) { + onlyExistEndpoint = Endpoint.SOURCE; + } else { + onlyExistEndpoint = null; + } + } else { + isExistTableMiss = false; + } + return isEqual; } private TableMetadataHash queryTableMetadataHash(Endpoint endpoint, String tableName) { @@ -309,7 +336,7 @@ public class IncrementCheckThread extends Thread { * @param bucketList bucket list */ private void initFirstCheckBucketList(Endpoint endpoint, List bucketList) { - List dataList = getTopicPartitionsData(endpoint); + List dataList = queryRowDataWapper.queryRowData(endpoint, dataLog); buildBucket(dataList, endpoint, bucketList); } @@ -363,17 +390,6 @@ public class IncrementCheckThread extends Thread { }); } - /** - * Pull the Kafka partition {@code partitions} data - * of the table {@code tableName} of the specified endpoint {@code endpoint} - * - * @param endpoint endpoint - * @return Specify table Kafka partition data - */ - private List getTopicPartitionsData(Endpoint endpoint) { - return queryRowDataWapper.queryIncrementRowData(endpoint, tableName); - } - private List getSecondaryCheckRowData(Endpoint endpoint, SourceDataLog dataLog) { return queryRowDataWapper.queryRowData(endpoint, dataLog); } @@ -429,15 +445,21 @@ public class IncrementCheckThread extends Thread { private void checkResult() { CheckDiffResult result = - AbstractCheckDiffResultBuilder.builder(feignClient).table(tableName).topic(topic.getTopicName()) - .schema(sinkSchema).partitions(partitions) + AbstractCheckDiffResultBuilder.builder(feignClient).table(tableName).topic(buildResultFileName()) + .schema(sinkSchema).partitions(0).rowCount(rowCount) + .isExistTableMiss(isExistTableMiss, onlyExistEndpoint) + .checkMode(CheckMode.INCREMENT).isTableStructureEquals(isTableStructureEquals) .keyUpdateSet(difference.getDiffering().keySet()) - .keyInsertSet(difference.getOnlyOnRight().keySet()) - .keyDeleteSet(difference.getOnlyOnLeft().keySet()).build(); + .keyInsertSet(difference.getOnlyOnLeft().keySet()) + .keyDeleteSet(difference.getOnlyOnRight().keySet()).build(); ExportCheckResult.export(path, result); } private String buildThreadName() { - return THREAD_NAME_PRIFEX + topic.getTopicName(); + return THREAD_NAME_PRIFEX + tableName; + } + + private String buildResultFileName() { + return process + "_" + tableName; } } diff --git a/datachecker-check/src/main/java/org/opengauss/datachecker/check/service/IncrementManagerService.java b/datachecker-check/src/main/java/org/opengauss/datachecker/check/service/IncrementManagerService.java index b701f5275dc8dcce33923473c51870719fb63038..4772aad056b4477f0b3c94749e958b08448c987f 100644 --- a/datachecker-check/src/main/java/org/opengauss/datachecker/check/service/IncrementManagerService.java +++ b/datachecker-check/src/main/java/org/opengauss/datachecker/check/service/IncrementManagerService.java @@ -18,16 +18,18 @@ package org.opengauss.datachecker.check.service; import com.alibaba.fastjson.JSONException; import com.alibaba.fastjson.JSONObject; import lombok.extern.slf4j.Slf4j; -import org.opengauss.datachecker.check.client.FeignClientService; +import org.apache.commons.collections4.CollectionUtils; import org.opengauss.datachecker.check.modules.check.CheckDiffResult; -import org.opengauss.datachecker.common.entry.enums.Endpoint; +import org.opengauss.datachecker.check.modules.check.DataCheckService; +import org.opengauss.datachecker.common.entry.enums.CheckMode; import org.opengauss.datachecker.common.entry.extract.SourceDataLog; import org.opengauss.datachecker.common.exception.CheckingException; import org.opengauss.datachecker.common.util.FileUtils; -import org.springframework.beans.factory.annotation.Autowired; +import org.opengauss.datachecker.common.util.IdGenerator; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Service; +import javax.annotation.Resource; import java.io.File; import java.nio.file.Path; import java.util.ArrayList; @@ -36,6 +38,7 @@ import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.atomic.AtomicReference; /** * IncrementManagerService @@ -47,10 +50,11 @@ import java.util.Set; @Slf4j @Service public class IncrementManagerService { + private static final AtomicReference PROCESS_SIGNATURE = new AtomicReference<>(); @Value("${data.check.data-path}") private String path; - @Autowired - private FeignClientService feignClientService; + @Resource + private DataCheckService dataCheckService; /** * Incremental verification log notification @@ -58,10 +62,23 @@ public class IncrementManagerService { * @param dataLogList Incremental verification log */ public void notifySourceIncrementDataLogs(List dataLogList) { + if (CollectionUtils.isEmpty(dataLogList)) { + return; + } + PROCESS_SIGNATURE.set(IdGenerator.nextId36()); // Collect the last verification results and build an incremental verification log dataLogList.addAll(collectLastResults()); - feignClientService.notifyIncrementDataLogs(Endpoint.SOURCE, dataLogList); - feignClientService.notifyIncrementDataLogs(Endpoint.SINK, dataLogList); + incrementDataLogsChecking(dataLogList); + } + + private void incrementDataLogsChecking(List dataLogList) { + String processNo = PROCESS_SIGNATURE.get(); + dataLogList.forEach(dataLog -> { + log.debug("increment data checking {} mode=[{}],{}", processNo, CheckMode.INCREMENT, + dataLog.getTableName()); + // Verify the data according to the table name and Kafka partition + dataCheckService.incrementCheckTableData(dataLog.getTableName(), processNo, dataLog); + }); } /** diff --git a/datachecker-common/src/main/java/org/opengauss/datachecker/common/entry/check/IncrementDataCheckParam.java b/datachecker-common/src/main/java/org/opengauss/datachecker/common/entry/check/IncrementDataCheckParam.java new file mode 100644 index 0000000000000000000000000000000000000000..fd8417114c8bcdf169f9fb12ef918cb02ea89d43 --- /dev/null +++ b/datachecker-common/src/main/java/org/opengauss/datachecker/common/entry/check/IncrementDataCheckParam.java @@ -0,0 +1,48 @@ +/* + * Copyright (c) 2022-2022 Huawei Technologies Co.,Ltd. + * + * openGauss is licensed under Mulan PSL v2. + * You can use this software according to the terms and conditions of the Mulan PSL v2. + * You may obtain a copy of Mulan PSL v2 at: + * + * http://license.coscl.org.cn/MulanPSL2 + * + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PSL v2 for more details. + */ + +package org.opengauss.datachecker.common.entry.check; + +import lombok.Data; +import lombok.experimental.Accessors; +import org.opengauss.datachecker.common.entry.extract.SourceDataLog; + +/** + * Data verification thread parameters + * + * @author :wangchao + * @date :Created in 2022/6/10 + * @since :11 + */ +@Data +@Accessors(chain = true) +public class IncrementDataCheckParam { + private String tableName; + private String process; + /** + * Build bucket capacity parameters + */ + private int bucketCapacity; + /** + * Verify topic partition + */ + private int errorRate; + /** + * Verification result output path + */ + private String path; + private String schema; + private SourceDataLog dataLog; +} diff --git a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/config/KafkaConsumerConfig.java b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/config/KafkaConsumerConfig.java index 502dbac7831a2dda554d832f9ccbc3db0d1dc3bf..530b1da3d27403e4575a570b46b2e2d6c871d14f 100644 --- a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/config/KafkaConsumerConfig.java +++ b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/config/KafkaConsumerConfig.java @@ -21,10 +21,12 @@ import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.serialization.StringDeserializer; import org.opengauss.datachecker.extract.constants.ExtConstants; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.autoconfigure.kafka.KafkaProperties; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.stereotype.Component; +import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Properties; @@ -44,6 +46,10 @@ public class KafkaConsumerConfig { private static final Object LOCK = new Object(); private static final Map> CONSUMER_MAP = new ConcurrentHashMap<>(); + @Value("${spring.extract.debezium-groupId}") + private String debeziumGroupId; + @Value("${spring.extract.debezium-topic}") + private String debeziumTopic; @Autowired private KafkaProperties properties; @@ -72,18 +78,19 @@ public class KafkaConsumerConfig { /** * Obtaining a specified consumer client based on topic. * - * @param groupId groupId * @return consumer client. */ - public KafkaConsumer getDebeziumConsumer(String groupId) { + public KafkaConsumer getDebeziumConsumer() { Properties props = new Properties(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, String.join(ExtConstants.DELIMITER, properties.getBootstrapServers())); - props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); + props.put(ConsumerConfig.GROUP_ID_CONFIG, debeziumGroupId); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, properties.getConsumer().getAutoOffsetReset()); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); - return new KafkaConsumer<>(props); + final KafkaConsumer consumer = new KafkaConsumer<>(props); + consumer.subscribe(List.of(debeziumTopic)); + return consumer; } private KafkaConsumer buildKafkaConsumer() { diff --git a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/controller/ExtractController.java b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/controller/ExtractController.java index 7c27e725743317f0fa7d7b678d15eebc625c1753..f934ae3f018450ea157864b35e18fc4346a931cf 100644 --- a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/controller/ExtractController.java +++ b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/controller/ExtractController.java @@ -38,7 +38,6 @@ import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController; import javax.validation.constraints.NotEmpty; -import javax.validation.constraints.NotNull; import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -223,21 +222,6 @@ public class ExtractController { return Result.success(dataExtractService.queryTableColumnValues(tableName, new ArrayList<>(compositeKeySet))); } - /** - * creating an incremental extraction task based on data change logs - * - * @param sourceDataLogList data change logs list - * @return interface invoking result - */ - @Operation(summary = "creating an incremental extraction task based on data change logs") - @PostMapping("/extract/increment/logs/data") - Result notifyIncrementDataLogs( - @RequestBody @NotNull(message = "Data change log cannot be empty") List sourceDataLogList) { - dataExtractService.buildExtractIncrementTaskByLogs(sourceDataLogList); - dataExtractService.execExtractIncrementTaskByLogs(); - return Result.success(); - } - @Operation(summary = "query the metadata of the current table structure and perform hash calculation.") @PostMapping("/extract/query/table/metadata/hash") Result queryTableMetadataHash(@RequestParam(name = "tableName") String tableName) { diff --git a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/debe/DataConsolidationService.java b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/debe/DataConsolidationService.java index 4b1d301241647334b37b773b26bce67ed61f0b3e..dc6290c6b82ab82234e9a960ee3c713eb720b8f4 100644 --- a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/debe/DataConsolidationService.java +++ b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/debe/DataConsolidationService.java @@ -16,7 +16,6 @@ package org.opengauss.datachecker.extract.debe; import org.opengauss.datachecker.common.entry.check.IncrementCheckConfig; -import org.opengauss.datachecker.common.entry.check.IncrementCheckTopic; import org.opengauss.datachecker.common.entry.extract.SourceDataLog; import java.util.List; @@ -32,24 +31,17 @@ public interface DataConsolidationService { /** * Get the topic records of debezium, and analyze and merge the topic records * - * @param topicName topic name + * @param fetchOffset fetchOffset * @return topic records */ - List getDebeziumTopicRecords(String topicName); - - /** - * Get the message record information of the topic corresponding to the debrizum listening table - * - * @return Return message consumption record - */ - IncrementCheckTopic getDebeziumTopicRecordOffSet(); - + List getDebeziumTopicRecords(int fetchOffset); + /** * Get the debezium listening table and record the offset information of the message corresponding to the topic * * @return return offset */ - long getDebeziumTopicRecordEndOffSet(); + int getDebeziumTopicRecordEndOffSet(); /** * Check whether the current extraction end is the source end diff --git a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/debe/DataConsolidationServiceImpl.java b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/debe/DataConsolidationServiceImpl.java index a3be8b1225aeef6460b27a1e5187618f4b077ebf..2c43795910a363805183b5e9ea731a45d0437f1e 100644 --- a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/debe/DataConsolidationServiceImpl.java +++ b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/debe/DataConsolidationServiceImpl.java @@ -15,16 +15,12 @@ package org.opengauss.datachecker.extract.debe; -import com.alibaba.fastjson.JSONException; import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.clients.consumer.ConsumerRecords; -import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.apache.kafka.common.TopicPartition; import org.opengauss.datachecker.common.entry.check.IncrementCheckConfig; -import org.opengauss.datachecker.common.entry.check.IncrementCheckTopic; import org.opengauss.datachecker.common.entry.enums.Endpoint; import org.opengauss.datachecker.common.entry.extract.SourceDataLog; import org.opengauss.datachecker.common.exception.DebeziumConfigException; +import org.opengauss.datachecker.common.util.ThreadUtil; import org.opengauss.datachecker.extract.cache.MetaDataCache; import org.opengauss.datachecker.extract.config.ExtractProperties; import org.opengauss.datachecker.extract.config.KafkaConsumerConfig; @@ -38,7 +34,6 @@ import org.springframework.util.CollectionUtils; import javax.annotation.PostConstruct; import javax.validation.constraints.NotEmpty; import javax.validation.constraints.NotNull; -import java.time.Duration; import java.util.ArrayList; import java.util.List; import java.util.Objects; @@ -56,17 +51,13 @@ import java.util.stream.Collectors; @Service public class DataConsolidationServiceImpl implements DataConsolidationService { private static final IncrementCheckConfig INCREMENT_CHECK_CONIFG = new IncrementCheckConfig(); - - private final Object lock = new Object(); - private final DebeziumDataHandler debeziumDataHandler = new DebeziumDataHandler(); - - private KafkaConsumer debeziumTopicOffSetConsumer = null; - @Autowired - private KafkaConsumerConfig consumerConfig; + private DebeziumConsumerListener debeziumListener; @Autowired private KafkaAdminService kafkaAdminService; @Autowired + private KafkaConsumerConfig kafkaConfig; + @Autowired private ExtractProperties extractProperties; @Autowired private MetaDataService metaDataService; @@ -78,105 +69,29 @@ public class DataConsolidationServiceImpl implements DataConsolidationService { public void initIncrementConfig() { if (extractProperties.isDebeziumEnable()) { metaDataService.init(); - INCREMENT_CHECK_CONIFG.setDebeziumTopic(extractProperties.getDebeziumTopic()) - .setDebeziumTables(extractProperties.getDebeziumTables()) - .setPartitions(extractProperties.getDebeziumTopicPartitions()) - .setGroupId(extractProperties.getDebeziumGroupId()); - getDebeziumTopicRecordOffSet(); + ThreadUtil.newSingleThreadExecutor().submit(new DebeziumWorker(debeziumListener, kafkaConfig)); } } /** * Get the topic records of debezium, and analyze and merge the topic records * - * @param topicName topic name * @return topic records */ @Override - public List getDebeziumTopicRecords(String topicName) { + public List getDebeziumTopicRecords(int fetchOffset) { checkIncrementCheckEnvironment(); - IncrementCheckTopic topic = getDebeziumTopic(); - // if test service reset the group id is IdGenerator.nextId36() - topic.setTopic(topicName); - KafkaConsumer kafkaConsumer = consumerConfig.getDebeziumConsumer(topic.getGroupId()); - kafkaConsumer.subscribe(List.of(topicName)); - log.info("kafka debezium topic consumer topic=[{}]", topicName); - // Consume a partition data of a topic - List dataList = new ArrayList<>(); - consumerAllRecords(kafkaConsumer, dataList); - log.info("kafka consumer topic=[{}] dataList=[{}]", topicName, dataList.size()); - return dataList; - } - - private void consumerAllRecords(KafkaConsumer kafkaConsumer, List dataList) { + int begin = 0; DebeziumDataLogs debeziumDataLogs = new DebeziumDataLogs(); - int consumerRecords = getConsumerRecords(kafkaConsumer, debeziumDataLogs); - while (consumerRecords > 0) { - consumerRecords = getConsumerRecords(kafkaConsumer, debeziumDataLogs); - } - dataList.addAll(debeziumDataLogs.values()); - } - - /** - * Consume and process Kafka consumer client data - * - * @param kafkaConsumer consumer - * @param debeziumDataLogs Processing results - * @return Number of consumer records - */ - private int getConsumerRecords(KafkaConsumer kafkaConsumer, DebeziumDataLogs debeziumDataLogs) { - ConsumerRecords consumerRecords = kafkaConsumer.poll(Duration.ofMillis(200)); - consumerRecords.forEach(record -> { - try { - debeziumDataHandler.handler(record.value(), debeziumDataLogs); - } catch (DebeziumConfigException | JSONException ex) { - // Abnormal message structure, ignoring the current message - log.error("Abnormal message structure, ignoring the current message,{},{}", record.value(), - ex.getMessage()); - } - }); - return consumerRecords.count(); - } - - /** - * Get the message record information of the topic corresponding to the debrizum listening table - * - * @return Return message consumption record - */ - @Override - public IncrementCheckTopic getDebeziumTopicRecordOffSet() { - checkIncrementCheckEnvironment(); - IncrementCheckTopic topic = getDebeziumTopic(); - final TopicPartition topicPartition = new TopicPartition(topic.getTopic(), 0); - List partitionList = List.of(topicPartition); - debeziumTopicOffSetConsumer = getDebeziumTopicOffSetConsumer(); - - // View topic current message consumption starting position - debeziumTopicOffSetConsumer.seekToBeginning(partitionList); - topic.setBegin(debeziumTopicOffSetConsumer.position(topicPartition)); - - // View topic current message consumption deadline - debeziumTopicOffSetConsumer.seekToEnd(partitionList); - topic.setEnd(debeziumTopicOffSetConsumer.position(topicPartition)); - return topic; - } - - private KafkaConsumer getDebeziumTopicOffSetConsumer() { - if (Objects.nonNull(debeziumTopicOffSetConsumer)) { - return debeziumTopicOffSetConsumer; - } else { - synchronized (lock) { - if (Objects.isNull(debeziumTopicOffSetConsumer)) { - IncrementCheckTopic topic = getDebeziumTopic(); - final TopicPartition topicPartition = new TopicPartition(topic.getTopic(), 0); - debeziumTopicOffSetConsumer = consumerConfig.getDebeziumConsumer(topic.getGroupId()); - List partitionList = List.of(topicPartition); - // Set consumption mode as partition - debeziumTopicOffSetConsumer.assign(partitionList); - } + while (begin <= fetchOffset) { + DebeziumDataBean debeziumDataBean = debeziumListener.poll(); + if (Objects.isNull(debeziumDataBean)) { + break; } + debeziumDataLogs.addDebeziumDataKey(debeziumDataBean.getTable(), debeziumDataBean.getData()); + begin++; } - return debeziumTopicOffSetConsumer; + return new ArrayList<>(debeziumDataLogs.values()); } /** @@ -185,16 +100,9 @@ public class DataConsolidationServiceImpl implements DataConsolidationService { * @return offset */ @Override - public long getDebeziumTopicRecordEndOffSet() { - final TopicPartition topicPartition = new TopicPartition(INCREMENT_CHECK_CONIFG.getDebeziumTopic(), 0); + public int getDebeziumTopicRecordEndOffSet() { // View topic current message consumption deadline - return debeziumTopicOffSetConsumer.position(topicPartition); - } - - private IncrementCheckTopic getDebeziumTopic() { - return new IncrementCheckTopic().setTopic(INCREMENT_CHECK_CONIFG.getDebeziumTopic()) - .setGroupId(INCREMENT_CHECK_CONIFG.getGroupId()) - .setPartitions(INCREMENT_CHECK_CONIFG.getPartitions()); + return debeziumListener.size(); } @Override @@ -214,9 +122,7 @@ public class DataConsolidationServiceImpl implements DataConsolidationService { */ private void checkIncrementCheckEnvironment() { final Set allKeys = metaDataService.queryMetaDataOfSchema().keySet(); - // Debezium environmental inspection - checkDebeziumEnvironment(INCREMENT_CHECK_CONIFG.getDebeziumTopic(), INCREMENT_CHECK_CONIFG.getDebeziumTables(), - allKeys); + checkDebeziumEnvironment(extractProperties.getDebeziumTopic(), extractProperties.getDebeziumTables(), allKeys); } /** diff --git a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/debe/DebeziumConsumerListener.java b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/debe/DebeziumConsumerListener.java new file mode 100644 index 0000000000000000000000000000000000000000..c709e83594cc02660e58fb650956ef516304f68e --- /dev/null +++ b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/debe/DebeziumConsumerListener.java @@ -0,0 +1,56 @@ +/* + * Copyright (c) 2022-2022 Huawei Technologies Co.,Ltd. + * + * openGauss is licensed under Mulan PSL v2. + * You can use this software according to the terms and conditions of the Mulan PSL v2. + * You may obtain a copy of Mulan PSL v2 at: + * + * http://license.coscl.org.cn/MulanPSL2 + * + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PSL v2 for more details. + */ + +package org.opengauss.datachecker.extract.debe; + +import com.alibaba.fastjson.JSONException; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.opengauss.datachecker.common.exception.DebeziumConfigException; +import org.springframework.stereotype.Service; + +import java.util.concurrent.LinkedBlockingQueue; + +/** + * DebeziumConsumerListener + * + * @author :wangchao + * @date :Created in 2022/9/21 + * @since :11 + */ +@Slf4j +@Service +public class DebeziumConsumerListener { + private static final LinkedBlockingQueue DATA_LOG_QUEUE = new LinkedBlockingQueue<>(); + private final DebeziumDataHandler debeziumDataHandler = new DebeziumDataHandler(); + + public void listen(ConsumerRecord record) { + try { + debeziumDataHandler.handler(record.value(), DATA_LOG_QUEUE); + } catch (DebeziumConfigException | JSONException | InterruptedException ex) { + // Abnormal message structure, ignoring the current message + log.error("Abnormal message structure, ignoring the current message,{},{}", record.value(), + ex.getMessage()); + } + } + + public int size() { + return DATA_LOG_QUEUE.size(); + } + + public DebeziumDataBean poll() { + return DATA_LOG_QUEUE.poll(); + } +} diff --git a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/debe/DebeziumDataBean.java b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/debe/DebeziumDataBean.java new file mode 100644 index 0000000000000000000000000000000000000000..8107eb00edd015e929b5bcb8f0a269f0e395c2b4 --- /dev/null +++ b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/debe/DebeziumDataBean.java @@ -0,0 +1,43 @@ +/* + * Copyright (c) 2022-2022 Huawei Technologies Co.,Ltd. + * + * openGauss is licensed under Mulan PSL v2. + * You can use this software according to the terms and conditions of the Mulan PSL v2. + * You may obtain a copy of Mulan PSL v2 at: + * + * http://license.coscl.org.cn/MulanPSL2 + * + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PSL v2 for more details. + */ + +package org.opengauss.datachecker.extract.debe; + +import java.util.Map; + +/** + * DebeziumDataBean + * + * @author :wangchao + * @date :Created in 2022/7/1 + * @since :11 + */ +public class DebeziumDataBean { + private String table; + private Map data; + + public DebeziumDataBean(String table, Map data) { + this.table = table; + this.data = data; + } + + public String getTable() { + return table; + } + + public Map getData() { + return data; + } +} diff --git a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/debe/DebeziumDataHandler.java b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/debe/DebeziumDataHandler.java index 17ac42e4a06ccef999d4d70b76d3a7b304c0cbec..0466900f073c1d25105ff4f529583bb2b37dc20c 100644 --- a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/debe/DebeziumDataHandler.java +++ b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/debe/DebeziumDataHandler.java @@ -23,6 +23,7 @@ import org.opengauss.datachecker.common.entry.debezium.PayloadSource; import javax.validation.constraints.NotNull; import java.util.Map; +import java.util.concurrent.LinkedBlockingQueue; /** * DebeziumDataHandler @@ -36,21 +37,17 @@ public class DebeziumDataHandler { /** * Debezium message parsing and adding the parsing result to the {@code DebeziumDataLogs.class} result set * - * @param message message - * @param debeziumDataLogs debeziumDataLogs + * @param message message + * @param queue debeziumDataLogs */ - public void handler(String message, @NotNull DebeziumDataLogs debeziumDataLogs) { + public void handler(String message, @NotNull LinkedBlockingQueue queue) + throws InterruptedException { final DebeziumData debeziumData = JSONObject.parseObject(message, DebeziumData.class); final DebeziumPayload payload = debeziumData.getPayload(); final Map before = payload.getBefore(); final Map after = payload.getAfter(); final PayloadSource source = payload.getSource(); // Extract the data and add it to the debezium incremental log object - if (!debeziumDataLogs.addDebeziumDataKey(source.getTable(), after != null ? after : before)) { - // The format of the debezium message is abnormal. - // The corresponding table [{}] of the current message does not exist or is illegal - log.error("The debezium message format is abnormal. The current message corresponding table [{}] " - + "does not exist or is illegal", source.getTable()); - } + queue.put(new DebeziumDataBean(source.getTable(), after != null ? after : before)); } } diff --git a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/debe/DebeziumWorker.java b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/debe/DebeziumWorker.java new file mode 100644 index 0000000000000000000000000000000000000000..350f645046872110a6f81cb270fb090dd61967a8 --- /dev/null +++ b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/debe/DebeziumWorker.java @@ -0,0 +1,61 @@ +/* + * Copyright (c) 2022-2022 Huawei Technologies Co.,Ltd. + * + * openGauss is licensed under Mulan PSL v2. + * You can use this software according to the terms and conditions of the Mulan PSL v2. + * You may obtain a copy of Mulan PSL v2 at: + * + * http://license.coscl.org.cn/MulanPSL2 + * + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PSL v2 for more details. + */ + +package org.opengauss.datachecker.extract.debe; + +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.opengauss.datachecker.extract.config.KafkaConsumerConfig; + +import java.time.Duration; + +/** + * DebeziumWorker + * + * @author :wangchao + * @date :Created in 2022/9/21 + * @since :11 + */ +@Slf4j +public class DebeziumWorker implements Runnable { + private static final String NAME = "DebeziumWorker"; + private DebeziumConsumerListener debeziumConsumerListener; + private KafkaConsumerConfig kafkaConsumerConfig; + + public DebeziumWorker(DebeziumConsumerListener debeziumConsumerListener, KafkaConsumerConfig kafkaConsumerConfig) { + this.debeziumConsumerListener = debeziumConsumerListener; + this.kafkaConsumerConfig = kafkaConsumerConfig; + } + + @Override + public void run() { + Thread.currentThread().setName(NAME); + log.info("The Debezium message listener task has started"); + final KafkaConsumer consumer = kafkaConsumerConfig.getDebeziumConsumer(); + while (true) { + ConsumerRecords records = consumer.poll(Duration.ofMillis(50)); + for (ConsumerRecord record : records) { + try { + debeziumConsumerListener.listen(record); + } catch (Exception ex) { + log.error("Abnormal message structure, ignoring the current message,{},{}", record.value(), + ex.getMessage()); + } + } + } + } +} diff --git a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/debe/IncrementDataAnalysisService.java b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/debe/IncrementDataAnalysisService.java index 8afc3de15c9bde90c591d5a38b4635a12f478909..a2a61b43ef979061e471559f3827b374aec3dc6e 100644 --- a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/debe/IncrementDataAnalysisService.java +++ b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/debe/IncrementDataAnalysisService.java @@ -19,7 +19,6 @@ import feign.FeignException; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.commons.collections4.CollectionUtils; -import org.opengauss.datachecker.common.entry.check.IncrementCheckTopic; import org.opengauss.datachecker.common.entry.extract.SourceDataLog; import org.opengauss.datachecker.common.exception.ExtractException; import org.opengauss.datachecker.common.util.ThreadUtil; @@ -75,9 +74,7 @@ public class IncrementDataAnalysisService { if (extractProperties.isDebeziumEnable() && consolidationService.isSourceEndpoint()) { log.info("Start incremental verification analysis"); verificationConfiguration(); - IncrementCheckTopic topicRecordOffSet = consolidationService.getDebeziumTopicRecordOffSet(); // Start the initialization load to verify the topic offset - lastOffSetAtomic.set(topicRecordOffSet.getBegin()); setLastTimestampAtomicCurrentTime(); dataAnalysis(); } @@ -135,10 +132,10 @@ public class IncrementDataAnalysisService { if ((time - lastTimestamp) >= debeziumTimePeriod * DEBEZIUM_TIME_PERIOD_UNIT) { // Set the start calculation time point of the next time execution cycle lastTimestamp = time; - log.info("Incremental log data , time latitude : {},{}", lastTimestamp, time); - final List debeziumTopicRecords = - consolidationService.getDebeziumTopicRecords(extractProperties.getDebeziumTopic()); - if (!CollectionUtils.isEmpty(debeziumTopicRecords)) { + final int defaultSize = extractProperties.getDebeziumNumDefaultPeriod(); + final List debeziumTopicRecords = consolidationService.getDebeziumTopicRecords(defaultSize); + if (CollectionUtils.isNotEmpty(debeziumTopicRecords)) { + log.info("Incremental log data , time latitude debeziumTopicRecords={}", debeziumTopicRecords.size()); checkingFeignClient.notifySourceIncrementDataLogs(debeziumTopicRecords); lastOffSetAtomic.addAndGet(debeziumTopicRecords.size()); } @@ -150,19 +147,18 @@ public class IncrementDataAnalysisService { * Incremental log data extraction, quantity and latitude management */ public void dataNumAnalysis() { - final long offset = consolidationService.getDebeziumTopicRecordEndOffSet(); + final int offset = consolidationService.getDebeziumTopicRecordEndOffSet(); // Verify whether the data volume threshold dimension scenario trigger conditions are met if ((offset - lastOffSetAtomic.get()) >= extractProperties.getDebeziumNumPeriod()) { - log.info("Incremental log data, quantity latitude :{},{}", lastOffSetAtomic.get(), offset); // When the data volume threshold is reached, // the data is extracted and pushed to the verification service. - final List debeziumTopicRecords = - consolidationService.getDebeziumTopicRecords(extractProperties.getDebeziumTopic()); + final List debeziumTopicRecords = consolidationService.getDebeziumTopicRecords(offset); if (CollectionUtils.isNotEmpty(debeziumTopicRecords)) { + log.info("Incremental log data, quantity latitude :start={},end={},count={}", lastOffSetAtomic.get(), + offset, debeziumTopicRecords.size()); checkingFeignClient.notifySourceIncrementDataLogs(debeziumTopicRecords); lastOffSetAtomic.addAndGet(debeziumTopicRecords.size()); } - // Trigger data volume threshold dimension scenario - update time threshold setLastTimestampAtomicCurrentTime(); } diff --git a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/kafka/KafkaAdminService.java b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/kafka/KafkaAdminService.java index 08853aaebfff0833e7af09a65a1e4798872a2f3b..ada183de8317e33f2a4aebc3706cb805b6c21a2c 100644 --- a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/kafka/KafkaAdminService.java +++ b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/kafka/KafkaAdminService.java @@ -149,7 +149,6 @@ public class KafkaAdminService { */ public boolean isTopicExists(String topicName) { try { - log.info("topic name :{}", topicName); return adminClient.listTopics().listings().get().stream().map(TopicListing::name) .anyMatch(name -> name.equalsIgnoreCase(topicName)); } catch (InterruptedException | ExecutionException e) { diff --git a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/kafka/KafkaCommonService.java b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/kafka/KafkaCommonService.java index 8ca557a3c13d2fa72b7ead13d5db942f1d32de7b..57f0af13b01a5feafbe36d09251782c2087fb069 100644 --- a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/kafka/KafkaCommonService.java +++ b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/kafka/KafkaCommonService.java @@ -26,7 +26,6 @@ import org.springframework.stereotype.Service; import java.util.HashMap; import java.util.List; -import java.util.Locale; import java.util.Map; import java.util.Objects; import java.util.stream.Collectors; @@ -51,14 +50,15 @@ public class KafkaCommonService { * Last splicing table name upper or lower code */ private static final String TOPIC_TEMPLATE = "%s_%s_%s_%s"; + private static final String UPPER_CODE = "1"; + private static final String LOWER_CODE = "0"; /** * Incremental verification topic prefix */ - private static final String INCREMENT_TOPIC_PREFIX = "TOPIC_EXTRACT_INCREMENT_"; + private static final String INCREMENT_TOPIC_PREFIX = "increment_"; private static final Object LOCK = new Object(); private static final Map TABLE_TOPIC_CACHE = new HashMap<>(); - private static final Map DEBEZIUM_TOPIC_CACHE = new HashMap<>(); @Autowired private final ExtractProperties extractProperties; @@ -107,12 +107,16 @@ public class KafkaCommonService { StringBuilder builder = new StringBuilder(); for (char aChar : chars) { if (aChar >= 'A' && aChar <= 'Z') { - builder.append("1"); + builder.append(UPPER_CODE); } else if (aChar >= 'a' && aChar <= 'z') { - builder.append("0"); + builder.append(LOWER_CODE); } } - return builder.toString(); + final String encoding = builder.toString(); + if (encoding.contains(UPPER_CODE) && encoding.contains(LOWER_CODE)) { + return encoding; + } + return ""; } /** @@ -145,23 +149,14 @@ public class KafkaCommonService { * @return Topic */ public Topic getIncrementTopicInfo(String tableName) { - Topic topic = TABLE_TOPIC_CACHE.get(tableName); - if (Objects.isNull(topic)) { - synchronized (LOCK) { - topic = TABLE_TOPIC_CACHE.get(tableName); - if (Objects.isNull(topic)) { - topic = new Topic().setTableName(tableName).setTopicName(getIncrementTopicName(tableName)) - .setPartitions(1); - TABLE_TOPIC_CACHE.put(tableName, topic); - } - } - } + Topic topic = new Topic(); + topic.setTableName(tableName).setTopicName(getIncrementTopicName(tableName)).setPartitions(1); log.debug("kafka topic info : [{}] ", topic.toString()); return topic; } private String getIncrementTopicName(String tableName) { return INCREMENT_TOPIC_PREFIX.concat(Integer.toString(extractProperties.getEndpoint().getCode())).concat("_") - .concat(tableName.toUpperCase(Locale.ROOT)); + .concat(tableName).toLowerCase(); } } diff --git a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/service/DataExtractService.java b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/service/DataExtractService.java index 0890f06d6d43571526b89453313ac342c6ba1dde..0df48c2f25bf189e49fcb15e9c1be7263657f978 100644 --- a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/service/DataExtractService.java +++ b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/service/DataExtractService.java @@ -113,19 +113,7 @@ public interface DataExtractService { * @return Primary key corresponds to table data */ List> queryTableColumnValues(String tableName, List compositeKeySet); - - /** - * Build an incremental extraction task according to the data change log - * - * @param sourceDataLogs source data logs - */ - void buildExtractIncrementTaskByLogs(List sourceDataLogs); - - /** - * Perform incremental check data extraction - */ - void execExtractIncrementTaskByLogs(); - + /** * Query the metadata information of the current table structure and hash * diff --git a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/service/DataExtractServiceImpl.java b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/service/DataExtractServiceImpl.java index be70fc47697d8718c21c5756a6fba4ed1dd7931c..cc32347a7189ed9822bb4e82d08906624ed85923 100644 --- a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/service/DataExtractServiceImpl.java +++ b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/service/DataExtractServiceImpl.java @@ -21,7 +21,6 @@ import org.opengauss.datachecker.common.constant.Constants; import org.opengauss.datachecker.common.entry.enums.DML; import org.opengauss.datachecker.common.entry.enums.Endpoint; import org.opengauss.datachecker.common.entry.extract.ColumnsMetaData; -import org.opengauss.datachecker.common.entry.extract.ExtractIncrementTask; import org.opengauss.datachecker.common.entry.extract.ExtractTask; import org.opengauss.datachecker.common.entry.extract.RowDataHash; import org.opengauss.datachecker.common.entry.extract.SourceDataLog; @@ -43,8 +42,6 @@ import org.opengauss.datachecker.extract.task.DataManipulationService; import org.opengauss.datachecker.extract.task.ExtractTaskBuilder; import org.opengauss.datachecker.extract.task.ExtractTaskRunnable; import org.opengauss.datachecker.extract.task.ExtractThreadSupport; -import org.opengauss.datachecker.extract.task.IncrementExtractTaskRunnable; -import org.opengauss.datachecker.extract.task.IncrementExtractThreadSupport; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.beans.factory.annotation.Value; @@ -94,7 +91,6 @@ public class DataExtractServiceImpl implements DataExtractService { private final AtomicReference atomicProcessNo = new AtomicReference<>(PROCESS_NO_RESET); private final AtomicReference> taskReference = new AtomicReference<>(); - private final AtomicReference> incrementTaskReference = new AtomicReference<>(); @Autowired @Qualifier("extractThreadExecutor") @@ -106,9 +102,6 @@ public class DataExtractServiceImpl implements DataExtractService { @Autowired private ExtractThreadSupport extractThreadSupport; - @Autowired - private IncrementExtractThreadSupport incrementExtractThreadSupport; - @Autowired private CheckingFeignClient checkingFeignClient; @@ -226,9 +219,6 @@ public class DataExtractServiceImpl implements DataExtractService { if (Objects.nonNull(taskReference.getAcquire())) { taskReference.getAcquire().clear(); } - if (Objects.nonNull(incrementTaskReference.getAcquire())) { - incrementTaskReference.getAcquire().clear(); - } TableExtractStatusCache.removeAll(); atomicProcessNo.set(PROCESS_NO_RESET); log.info("clear the current build task cache!"); @@ -371,58 +361,6 @@ public class DataExtractServiceImpl implements DataExtractService { return dataManipulationService.queryColumnValues(tableName, new ArrayList<>(compositeKeys), metadata); } - /** - * Build an incremental extraction task according to the data change log - * - * @param sourceDataLogs data change log - */ - @Override - public void buildExtractIncrementTaskByLogs(List sourceDataLogs) { - final String schema = extractProperties.getSchema(); - List taskList = extractTaskBuilder.buildIncrementTask(schema, sourceDataLogs); - log.info("Build incremental extraction task completed {}", taskList.size()); - if (CollectionUtils.isEmpty(taskList)) { - return; - } - incrementTaskReference.set(taskList); - - List tableNameList = - sourceDataLogs.stream().map(SourceDataLog::getTableName).collect(Collectors.toList()); - Map taskCount = new HashMap<>(Constants.InitialCapacity.EMPTY); - createTaskCountMapping(tableNameList, taskCount); - TableExtractStatusCache.init(taskCount); - } - - private void createTaskCountMapping(List tableNameList, Map taskCount) { - tableNameList.forEach(table -> taskCount.put(table, 1)); - } - - /** - * Perform incremental check data extraction - */ - @Override - public void execExtractIncrementTaskByLogs() { - List taskList = incrementTaskReference.get(); - if (CollectionUtils.isEmpty(taskList)) { - log.info("endpoint [{}] task is empty!", extractProperties.getEndpoint().getDescription()); - return; - } - Map tableCheckStatus = checkingFeignClient.queryTableCheckStatus(); - taskList.forEach(task -> { - log.info("Perform data extraction increment tasks:{}", task.getTaskName()); - final String tableName = task.getTableName(); - if (!tableCheckStatus.containsKey(tableName) || tableCheckStatus.get(tableName) == -1) { - log.info("Abnormal table[{}] status, ignoring the current table increment data extraction", tableName); - return; - } - Topic topic = kafkaCommonService.getIncrementTopicInfo(tableName); - kafkaAdminService.createTopic(topic.getTopicName(), topic.getPartitions()); - final IncrementExtractTaskRunnable extractRunnable = - new IncrementExtractTaskRunnable(task, topic, incrementExtractThreadSupport); - extractThreadExecutor.submit(extractRunnable); - }); - } - /** * Query the metadata information of the current table structure and perform hash calculation * diff --git a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/task/DataManipulationService.java b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/task/DataManipulationService.java index 182ee941bc94557cdd0daeb8e39b63d557673448..9eae00fd1762caa5c8f4fa7c5caeae8c4d85ce4d 100644 --- a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/task/DataManipulationService.java +++ b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/task/DataManipulationService.java @@ -15,6 +15,7 @@ package org.opengauss.datachecker.extract.task; +import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.lang3.StringUtils; import org.opengauss.datachecker.common.constant.Constants.InitialCapacity; import org.opengauss.datachecker.common.entry.enums.DataBaseType; @@ -39,7 +40,6 @@ import org.springframework.jdbc.core.JdbcTemplate; import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate; import org.springframework.stereotype.Service; import org.springframework.util.Assert; -import org.springframework.util.CollectionUtils; import java.util.ArrayList; import java.util.Comparator; @@ -360,14 +360,15 @@ public class DataManipulationService { final TableMetadataHash tableMetadataHash = new TableMetadataHash().setTableName(tableName); final List columnsMetaData = metaDataService.queryTableColumnMetaDataOfSchema(tableName); StringBuffer buffer = new StringBuffer(); - if (!CollectionUtils.isEmpty(columnsMetaData)) { - columnsMetaData.sort(Comparator.comparing(ColumnsMetaData::getColumnName)); + if (CollectionUtils.isNotEmpty(columnsMetaData)) { + columnsMetaData.sort(Comparator.comparing(ColumnsMetaData::getOrdinalPosition)); columnsMetaData.forEach(column -> { - buffer.append(column.getColumnName()).append(column.getColumnType()).append(column.getDataType()) - .append(column.getOrdinalPosition()); + buffer.append(column.getColumnName()).append(column.getOrdinalPosition()); }); + tableMetadataHash.setTableHash(HASH_UTIL.hashBytes(buffer.toString())); + } else { + tableMetadataHash.setTableHash(-1L); } - tableMetadataHash.setTableHash(HASH_UTIL.hashBytes(buffer.toString())); return tableMetadataHash; } } diff --git a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/task/IncrementExtractTaskRunnable.java b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/task/IncrementExtractTaskRunnable.java index f65b74dceae4724bd74f78c5d37b0ffed7842886..d458f305d2393f14d235a5a87320e18ee6a8c027 100644 --- a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/task/IncrementExtractTaskRunnable.java +++ b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/task/IncrementExtractTaskRunnable.java @@ -155,6 +155,7 @@ public class IncrementExtractTaskRunnable extends KafkaProducerWapper implements dmlBuilder.schema(schema).columns(tableMetadata.getColumnsMetas()).tableName(tableMetadata.getTableName()) .conditionCompositePrimary(primaryMetas); } + dmlBuilder.dataBaseType(databaseType); return dmlBuilder; } diff --git a/datachecker-extract/src/main/resources/application.yml b/datachecker-extract/src/main/resources/application.yml index 556f776108bf0934ff31938b15d825bc5378e3f7..3875c1b1013301bfc6ffbf44c65d02a932f42e8f 100644 --- a/datachecker-extract/src/main/resources/application.yml +++ b/datachecker-extract/src/main/resources/application.yml @@ -32,11 +32,6 @@ spring: request-timeout: 60000 feign: - client: - config: - default: - connectTimeout: 50000 - readTimeout: 50000 okhttp: enabled: true