diff --git a/datachecker-check/src/main/java/org/opengauss/datachecker/check/cache/TableStatusRegister.java b/datachecker-check/src/main/java/org/opengauss/datachecker/check/cache/TableStatusRegister.java index e21f37143c72d914a818a7bfda852d47aaa3a13b..2956735d8d782d45c2d6643737ff3813e97d0278 100644 --- a/datachecker-check/src/main/java/org/opengauss/datachecker/check/cache/TableStatusRegister.java +++ b/datachecker-check/src/main/java/org/opengauss/datachecker/check/cache/TableStatusRegister.java @@ -54,7 +54,10 @@ public class TableStatusRegister implements Cache { * Task status: if both the source and destination tasks have completed data verification, the setting status is 7 */ public static final int TASK_STATUS_CONSUMER_VALUE = 7; - + /** + * Task status cache. the table of data extract has error. + */ + public static final int TASK_STATUS_ERROR = -1; /** * Task status cache. The initial default value of status is 0 */ @@ -241,7 +244,7 @@ public class TableStatusRegister implements Cache { * @return Updated cache value */ @Override - public Integer update(String key, Integer value) { + public synchronized Integer update(String key, Integer value) { if (!TABLE_STATUS_CACHE.containsKey(key)) { log.error("current key={} does not exist", key); return 0; @@ -379,10 +382,7 @@ public class TableStatusRegister implements Cache { if (keys.size() <= 0) { return; } - final Pair extractProgress = extractProgress(); - final Pair checkProgress = checkProgress(); - log.info("There are [{}] tables in total, [{}] tables are extracted and [{}] table is verified", - checkProgress.getSink(), extractProgress.getSource(), checkProgress.getSource()); + List extractErrorList = new ArrayList<>(); List notExtractCompleteList = new ArrayList<>(); List notCheckCompleteList = new ArrayList<>(); List checkCompleteList = new ArrayList<>(); @@ -395,10 +395,11 @@ public class TableStatusRegister implements Cache { } else if (status == TASK_STATUS_CONSUMER_VALUE) { checkCompleteList.add(tableName); } else { + extractErrorList.add(tableName); log.error("table={} status={} error ", tableName, status); } }); - log.debug("progress information: {} is being extracted, {} is being verified, and {} is completed", - notExtractCompleteList, notCheckCompleteList, checkCompleteList); + log.debug("progress information: {} is being extracted, {} is being verified, {} is completed,and {} is error", + notExtractCompleteList, notCheckCompleteList, checkCompleteList, extractErrorList); } } diff --git a/datachecker-check/src/main/java/org/opengauss/datachecker/check/config/KafkaConsumerConfig.java b/datachecker-check/src/main/java/org/opengauss/datachecker/check/config/KafkaConsumerConfig.java index aa9316f5e15c7abb0f4c0f1f43927fb318abeb03..6fe4a88480cc37de49fb72b9e18d79bac034bb84 100644 --- a/datachecker-check/src/main/java/org/opengauss/datachecker/check/config/KafkaConsumerConfig.java +++ b/datachecker-check/src/main/java/org/opengauss/datachecker/check/config/KafkaConsumerConfig.java @@ -51,6 +51,10 @@ public class KafkaConsumerConfig { private String autoOffsetReset; @Value("${spring.kafka.consumer.max-poll-records}") private int maxPollRecordsConfig; + @Value("${spring.kafka.consumer.fetch-max-bytes}") + private int fetchMaxBytes; + @Value("${spring.kafka.consumer.request-timeout-ms}") + private int requestTimeoutMs; /** * consumerConfigs @@ -66,6 +70,8 @@ public class KafkaConsumerConfig { propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset); propsMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecordsConfig); + propsMap.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, fetchMaxBytes); + propsMap.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, requestTimeoutMs); return propsMap; } diff --git a/datachecker-common/src/main/java/org/opengauss/datachecker/common/entry/check/IncrementCheckConifg.java b/datachecker-check/src/main/java/org/opengauss/datachecker/check/controller/CheckHealthController.java similarity index 30% rename from datachecker-common/src/main/java/org/opengauss/datachecker/common/entry/check/IncrementCheckConifg.java rename to datachecker-check/src/main/java/org/opengauss/datachecker/check/controller/CheckHealthController.java index 713271f1eeca7d3c3ec60a0f4537f04439be2720..d940c7da926f3b3c0cd4977943bf7b747eff602b 100644 --- a/datachecker-common/src/main/java/org/opengauss/datachecker/common/entry/check/IncrementCheckConifg.java +++ b/datachecker-check/src/main/java/org/opengauss/datachecker/check/controller/CheckHealthController.java @@ -13,46 +13,28 @@ * See the Mulan PSL v2 for more details. */ -package org.opengauss.datachecker.common.entry.check; +package org.opengauss.datachecker.check.controller; -import io.swagger.v3.oas.annotations.media.Schema; -import lombok.Data; -import lombok.experimental.Accessors; - -import javax.validation.constraints.NotEmpty; -import javax.validation.constraints.NotNull; -import java.util.List; +import org.opengauss.datachecker.common.web.Result; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.RestController; /** - * Debezium incremental migration verification initialization configuration + * health check of the data check service * * @author :wangchao - * @date :Created in 2022/6/24 + * @date :Created in 2022/6/23 * @since :11 */ -@Schema(name = "Debezium incremental migration verification initialization configuration") -@Data -@Accessors(chain = true) -public class IncrementCheckConifg { - /** - * Debezium incremental migration topic, debezium monitors table incremental data, - * and uses a single topic for incremental data management - */ - @Schema(name = "debeziumTopic", required = true) - @NotNull(message = "Debezium incremental migration topic cannot be empty") - private String debeziumTopic; - - @Schema(name = "groupId", description = "Topic grouping") - @NotNull(message = "Debezium incremental migration topic groupid cannot be empty") - private String groupId; - - @Schema(name = "partitions", description = "Topic partition", defaultValue = "1") - private int partitions = 1; - +@RestController +public class CheckHealthController { /** - * Incremental migration table name list + * health check of the data check service + * + * @return void */ - @Schema(name = "debeziumTables", required = true, description = "Incremental migration table name list") - @NotEmpty(message = "Incremental migration table name list cannot be empty") - private List debeziumTables; + @GetMapping("/check/health") + public Result health() { + return Result.success(); + } } diff --git a/datachecker-check/src/main/java/org/opengauss/datachecker/check/controller/TaskStatusController.java b/datachecker-check/src/main/java/org/opengauss/datachecker/check/controller/TaskStatusController.java index 026199386e7b29043c4e4db8eeede57006b16733..95ac1e480920d72b24f5924ea7b584d103d62688 100644 --- a/datachecker-check/src/main/java/org/opengauss/datachecker/check/controller/TaskStatusController.java +++ b/datachecker-check/src/main/java/org/opengauss/datachecker/check/controller/TaskStatusController.java @@ -25,7 +25,6 @@ import org.springframework.lang.NonNull; import org.springframework.validation.annotation.Validated; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestBody; -import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController; import javax.validation.constraints.NotEmpty; @@ -49,14 +48,12 @@ public class TaskStatusController { * * @param tableName tableName * @param endpoint endpoint {@link org.opengauss.datachecker.common.entry.enums.Endpoint} + * @param status status */ @Operation(summary = "Refresh the execution status of the data extraction table of the specified task") @PostMapping("/table/extract/status") - public void refushTableExtractStatus( - @Parameter(description = "tableName") @RequestParam(value = "tableName") @NotEmpty String tableName, - @Parameter(description = Endpoint.API_DESCRIPTION) @RequestParam(value = "endpoint") @NonNull - Endpoint endpoint) { - taskManagerService.refreshTableExtractStatus(tableName, endpoint); + public void refreshTableExtractStatus(@NotEmpty String tableName, @NonNull Endpoint endpoint, int status) { + taskManagerService.refreshTableExtractStatus(tableName, endpoint, status); } /** diff --git a/datachecker-check/src/main/java/org/opengauss/datachecker/check/modules/check/CheckDiffResult.java b/datachecker-check/src/main/java/org/opengauss/datachecker/check/modules/check/CheckDiffResult.java index 8e2e421540430d17792124491c89ea76e9fca15e..52e8537693693da4854a205ce9f39a06be1387b5 100644 --- a/datachecker-check/src/main/java/org/opengauss/datachecker/check/modules/check/CheckDiffResult.java +++ b/datachecker-check/src/main/java/org/opengauss/datachecker/check/modules/check/CheckDiffResult.java @@ -17,6 +17,7 @@ package org.opengauss.datachecker.check.modules.check; import com.alibaba.fastjson.annotation.JSONType; import lombok.Data; +import org.springframework.util.CollectionUtils; import java.time.LocalDateTime; import java.util.List; @@ -30,14 +31,16 @@ import java.util.Set; * @since :11 */ @Data -@JSONType(orders = {"schema", "table", "topic", "partitions", "createTime", "keyInsertSet", "keyUpdateSet", - "keyDeleteSet", "repairInsert", "repairUpdate", "repairDelete"}) +@JSONType(orders = {"schema", "table", "topic", "partitions", "result", "message", "createTime", "keyInsertSet", + "keyUpdateSet", "keyDeleteSet", "repairInsert", "repairUpdate", "repairDelete"}) public class CheckDiffResult { private String schema; private String table; private String topic; private int partitions; private LocalDateTime createTime; + private String result; + private String message; private Set keyInsertSet; private Set keyUpdateSet; @@ -47,6 +50,17 @@ public class CheckDiffResult { private List repairUpdate; private List repairDelete; + /** + * constructor + */ + public CheckDiffResult() { + } + + /** + * constructor + * + * @param builder builder + */ public CheckDiffResult(final AbstractCheckDiffResultBuilder builder) { table = builder.getTable(); partitions = builder.getPartitions(); @@ -59,5 +73,21 @@ public class CheckDiffResult { repairUpdate = builder.getRepairUpdate(); repairInsert = builder.getRepairInsert(); repairDelete = builder.getRepairDelete(); + resultAnalysis(); + } + + private void resultAnalysis() { + if (CollectionUtils.isEmpty(keyInsertSet) && CollectionUtils.isEmpty(keyUpdateSet) && CollectionUtils + .isEmpty(keyDeleteSet)) { + result = "success"; + message = schema.concat(".").concat(table).concat("_[").concat(String.valueOf(partitions)) + .concat("] check success"); + } else { + result = "failed"; + message = + schema.concat(".").concat(table).concat("_[").concat(String.valueOf(partitions)).concat("] check : ") + .concat(" insert=" + keyInsertSet.size()).concat(" update=" + keyUpdateSet.size()) + .concat(" delete=" + keyDeleteSet.size()); + } } } diff --git a/datachecker-check/src/main/java/org/opengauss/datachecker/check/modules/check/DataCheckRunnable.java b/datachecker-check/src/main/java/org/opengauss/datachecker/check/modules/check/DataCheckRunnable.java index 3b634c4bce3e923dbaf7051e9dbb650b35a3d9a0..fb6e129094f06ef713d3ea9315dea09ca4e32a80 100644 --- a/datachecker-check/src/main/java/org/opengauss/datachecker/check/modules/check/DataCheckRunnable.java +++ b/datachecker-check/src/main/java/org/opengauss/datachecker/check/modules/check/DataCheckRunnable.java @@ -72,7 +72,7 @@ public class DataCheckRunnable implements Runnable { private final StatisticalService statisticalService; private final TableStatusRegister tableStatusRegister; private final DataCheckParam checkParam; - private final KafkaConsumerService kafkaConsumerService; + private final KafkaConsumerHandler kafkaConsumerHandler; private String sinkSchema; private Topic topic; @@ -93,7 +93,13 @@ public class DataCheckRunnable implements Runnable { feignClient = support.getFeignClientService(); statisticalService = support.getStatisticalService(); tableStatusRegister = support.getTableStatusRegister(); - kafkaConsumerService = support.getKafkaConsumerService(); + kafkaConsumerHandler = buildKafkaHandler(support); + } + + private KafkaConsumerHandler buildKafkaHandler(DataCheckRunnableSupport support) { + KafkaConsumerService kafkaConsumerService = support.getKafkaConsumerService(); + return new KafkaConsumerHandler(kafkaConsumerService.buildKafkaConsumer(false), + kafkaConsumerService.getRetryFetchRecordTimes()); } /** @@ -345,7 +351,8 @@ public class DataCheckRunnable implements Runnable { * @return Specify table Kafka partition data */ private List getTopicPartitionsData(Endpoint endpoint, int partitions) { - return kafkaConsumerService.queryCheckRowData(topic, partitions); + Topic endpointTopic = feignClient.queryTopicInfo(endpoint, tableName); + return kafkaConsumerHandler.queryCheckRowData(endpointTopic, partitions); } private boolean shouldCheckMerkleTree(int sourceBucketCount, int sinkBucketCount) { diff --git a/datachecker-check/src/main/java/org/opengauss/datachecker/check/modules/check/KafkaConsumerHandler.java b/datachecker-check/src/main/java/org/opengauss/datachecker/check/modules/check/KafkaConsumerHandler.java new file mode 100644 index 0000000000000000000000000000000000000000..f53642a0e344a13531034aa9e10304f106f5d433 --- /dev/null +++ b/datachecker-check/src/main/java/org/opengauss/datachecker/check/modules/check/KafkaConsumerHandler.java @@ -0,0 +1,130 @@ +/* + * Copyright (c) 2022-2022 Huawei Technologies Co.,Ltd. + * + * openGauss is licensed under Mulan PSL v2. + * You can use this software according to the terms and conditions of the Mulan PSL v2. + * You may obtain a copy of Mulan PSL v2 at: + * + * http://license.coscl.org.cn/MulanPSL2 + * + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PSL v2 for more details. + */ + +package org.opengauss.datachecker.check.modules.check; + +import com.alibaba.fastjson.JSON; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.collections4.CollectionUtils; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.common.TopicPartition; +import org.opengauss.datachecker.common.entry.extract.RowDataHash; +import org.opengauss.datachecker.common.entry.extract.Topic; +import org.opengauss.datachecker.common.util.ThreadUtil; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * KafkaConsumerHandler + * + * @author :wangchao + * @date :Created in 2022/8/31 + * @since :11 + */ +@Slf4j +public class KafkaConsumerHandler { + private static final int RETRY_FETCH_RECORD_INTERVAL = 1000; + private static final int KAFKA_CONSUMER_POLL_DURATION = 20; + + private final KafkaConsumer kafkaConsumer; + private final int retryFetchRecordTimes; + + /** + * Constructor + * + * @param consumer consumer + * @param retryTimes retryTimes + */ + public KafkaConsumerHandler(KafkaConsumer consumer, int retryTimes) { + kafkaConsumer = consumer; + retryFetchRecordTimes = retryTimes; + } + + /** + * Query the Kafka partition data corresponding to the specified table + * + * @param topic Kafka topic + * @param partitions Kafka partitions + * @return kafka partitions data + */ + public List queryCheckRowData(Topic topic, int partitions) { + return queryRowData(topic, partitions, false); + } + + /** + * Query the Kafka partition data corresponding to the specified table + * + * @param topic Kafka topic + * @param partitions Kafka partitions + * @param shouldChangeConsumerGroup if true change consumer Group random + * @return kafka partitions data + */ + public List queryRowData(Topic topic, int partitions, boolean shouldChangeConsumerGroup) { + List data = Collections.synchronizedList(new LinkedList<>()); + final TopicPartition topicPartition = new TopicPartition(topic.getTopicName(), partitions); + kafkaConsumer.assign(List.of(topicPartition)); + if (shouldChangeConsumerGroup) { + resetOffsetToBeginning(kafkaConsumer, topicPartition); + } + consumerTopicRecords(data, kafkaConsumer); + AtomicInteger retryTimes = new AtomicInteger(0); + final String groupId = kafkaConsumer.groupMetadata().groupId(); + while (CollectionUtils.isEmpty(data) && retryTimes.incrementAndGet() <= retryFetchRecordTimes) { + ThreadUtil.sleep(RETRY_FETCH_RECORD_INTERVAL); + resetOffsetToBeginning(kafkaConsumer, topicPartition); + log.debug("consumer group={} topic=[{}] partitions=[{}] empty ,retryTimes=[{}]", groupId, + topic.getTopicName(), partitions, retryTimes.get()); + consumerTopicRecords(data, kafkaConsumer); + } + log.debug("consumer group={} topic=[{}] partitions=[{}] dataList=[{}]", groupId, topic.getTopicName(), + partitions, data.size()); + return data; + } + + private void resetOffsetToBeginning(KafkaConsumer consumer, TopicPartition topicPartition) { + Map offset = new HashMap<>(); + consumer.seekToBeginning(List.of(topicPartition)); + long position = consumer.position(topicPartition); + offset.put(topicPartition, new OffsetAndMetadata(position)); + consumer.commitSync(offset); + } + + private void consumerTopicRecords(List data, KafkaConsumer kafkaConsumer) { + List result = getTopicRecords(kafkaConsumer); + while (CollectionUtils.isNotEmpty(result)) { + data.addAll(result); + result = getTopicRecords(kafkaConsumer); + } + } + + private List getTopicRecords(KafkaConsumer kafkaConsumer) { + List dataList = new ArrayList<>(); + ConsumerRecords consumerRecords = + kafkaConsumer.poll(Duration.ofMillis(KAFKA_CONSUMER_POLL_DURATION)); + consumerRecords.forEach(record -> { + dataList.add(JSON.parseObject(record.value(), RowDataHash.class)); + }); + return dataList; + } +} diff --git a/datachecker-check/src/main/java/org/opengauss/datachecker/check/modules/check/KafkaConsumerService.java b/datachecker-check/src/main/java/org/opengauss/datachecker/check/modules/check/KafkaConsumerService.java index f9ec57c817dbdfa72bb038fe8b867e3fd2e06d3c..2d238dd9b22ab044d1e66a41140ab2d55b965628 100644 --- a/datachecker-check/src/main/java/org/opengauss/datachecker/check/modules/check/KafkaConsumerService.java +++ b/datachecker-check/src/main/java/org/opengauss/datachecker/check/modules/check/KafkaConsumerService.java @@ -15,32 +15,15 @@ package org.opengauss.datachecker.check.modules.check; -import com.alibaba.fastjson.JSON; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; -import org.apache.commons.collections4.CollectionUtils; import org.apache.kafka.clients.consumer.Consumer; -import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.apache.kafka.clients.consumer.OffsetAndMetadata; -import org.apache.kafka.common.TopicPartition; import org.opengauss.datachecker.check.config.KafkaConsumerConfig; -import org.opengauss.datachecker.common.constant.Constants.InitialCapacity; -import org.opengauss.datachecker.common.entry.extract.RowDataHash; -import org.opengauss.datachecker.common.entry.extract.Topic; import org.opengauss.datachecker.common.util.IdGenerator; -import org.opengauss.datachecker.common.util.ThreadUtil; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Service; -import java.time.Duration; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.concurrent.atomic.AtomicInteger; - /** * KafkaConsumerService * @@ -52,8 +35,6 @@ import java.util.concurrent.atomic.AtomicInteger; @Service @RequiredArgsConstructor public class KafkaConsumerService { - private static final int RETRY_FETCH_RECORD_INTERVAL = 1000; - private static final int KAFKA_CONSUMER_POLL_DURATION = 20; private static final String CLIENT_ID_SUFFIX = "Random"; private final KafkaConsumerConfig kafkaConsumerConfig; @@ -62,70 +43,21 @@ public class KafkaConsumerService { private int retryFetchRecordTimes = 5; /** - * Query the Kafka partition data corresponding to the specified table + * consumer retry times * - * @param topic Kafka topic - * @param partitions Kafka partitions - * @return kafka partitions data + * @return consumer retry times */ - public List queryCheckRowData(Topic topic, int partitions) { - return queryRowData(topic, partitions, false); + public int getRetryFetchRecordTimes() { + return retryFetchRecordTimes; } /** - * Query the Kafka partition data corresponding to the specified table + * consumer * - * @param topic Kafka topic - * @param partitions Kafka partitions - * @param shouldChangeConsumerGroup if true change consumer Group random - * @return kafka partitions data + * @param isNewGroup isNewGroup + * @return consumer */ - public List queryRowData(Topic topic, int partitions, boolean shouldChangeConsumerGroup) { - List data = Collections.synchronizedList(new ArrayList<>()); - KafkaConsumer kafkaConsumer = buildKafkaConsumer(shouldChangeConsumerGroup); - final TopicPartition topicPartition = new TopicPartition(topic.getTopicName(), partitions); - kafkaConsumer.assign(List.of(topicPartition)); - if (shouldChangeConsumerGroup) { - resetOffsetToBeginning(kafkaConsumer, topicPartition); - } - consumerTopicRecords(data, kafkaConsumer); - AtomicInteger retryTimes = new AtomicInteger(0); - while (CollectionUtils.isEmpty(data) && retryTimes.incrementAndGet() <= retryFetchRecordTimes) { - ThreadUtil.sleep(RETRY_FETCH_RECORD_INTERVAL); - consumerTopicRecords(data, kafkaConsumer); - } - log.debug("consumer group={} topic=[{}] partitions=[{}] dataList=[{}]", kafkaConsumer.groupMetadata().groupId(), - topic.getTopicName(), partitions, data.size()); - return data; - } - - private void resetOffsetToBeginning(KafkaConsumer consumer, TopicPartition topicPartition) { - Map offset = new HashMap<>(InitialCapacity.CAPACITY_1); - consumer.seekToBeginning(List.of(topicPartition)); - long position = consumer.position(topicPartition); - offset.put(topicPartition, new OffsetAndMetadata(position)); - consumer.commitSync(offset); - } - - private void consumerTopicRecords(List data, KafkaConsumer kafkaConsumer) { - List result = getTopicRecords(kafkaConsumer); - while (CollectionUtils.isNotEmpty(result)) { - data.addAll(result); - result = getTopicRecords(kafkaConsumer); - } - } - - private List getTopicRecords(KafkaConsumer kafkaConsumer) { - List dataList = new ArrayList<>(); - ConsumerRecords consumerRecords = - kafkaConsumer.poll(Duration.ofMillis(KAFKA_CONSUMER_POLL_DURATION)); - consumerRecords.forEach(record -> { - dataList.add(JSON.parseObject(record.value(), RowDataHash.class)); - }); - return dataList; - } - - private KafkaConsumer buildKafkaConsumer(boolean isNewGroup) { + public KafkaConsumer buildKafkaConsumer(boolean isNewGroup) { Consumer consumer; if (isNewGroup) { consumer = kafkaConsumerConfig.consumerFactory().createConsumer(IdGenerator.nextId36(), CLIENT_ID_SUFFIX); diff --git a/datachecker-check/src/main/java/org/opengauss/datachecker/check/modules/task/TaskManagerService.java b/datachecker-check/src/main/java/org/opengauss/datachecker/check/modules/task/TaskManagerService.java index cfbb8eaa1bcc2536b7594f3fb3d15c7182e01a88..16b66b6e48a4aa16300a20202db75b02d3e44379 100644 --- a/datachecker-check/src/main/java/org/opengauss/datachecker/check/modules/task/TaskManagerService.java +++ b/datachecker-check/src/main/java/org/opengauss/datachecker/check/modules/task/TaskManagerService.java @@ -20,6 +20,8 @@ import org.opengauss.datachecker.common.entry.enums.Endpoint; import java.util.List; /** + * TaskManagerService + * * @author :wangchao * @date :Created in 2022/5/25 * @since :11 @@ -30,8 +32,9 @@ public interface TaskManagerService { * * @param tableName tableName * @param endpoint endpoint {@link org.opengauss.datachecker.common.entry.enums.Endpoint} + * @param status status */ - void refreshTableExtractStatus(String tableName, Endpoint endpoint); + void refreshTableExtractStatus(String tableName, Endpoint endpoint, int status); /** * Initialize task status @@ -44,4 +47,12 @@ public interface TaskManagerService { * Clean up task status information */ void cleanTaskStatus(); + + /** + * query check status of current table + * + * @param tableName tableName + * @return status + */ + int queryTableCheckStatus(String tableName); } diff --git a/datachecker-check/src/main/java/org/opengauss/datachecker/check/modules/task/TaskManagerServiceImpl.java b/datachecker-check/src/main/java/org/opengauss/datachecker/check/modules/task/TaskManagerServiceImpl.java index 47a5cad1f3489e94dc08c78ac4448c87e469ac3d..49f392b9446e52447f2a76121e22067e0c474746 100644 --- a/datachecker-check/src/main/java/org/opengauss/datachecker/check/modules/task/TaskManagerServiceImpl.java +++ b/datachecker-check/src/main/java/org/opengauss/datachecker/check/modules/task/TaskManagerServiceImpl.java @@ -27,6 +27,8 @@ import java.util.HashSet; import java.util.List; /** + * TaskManagerServiceImpl + * * @author :wangchao * @date :Created in 2022/5/25 * @since :11 @@ -34,7 +36,6 @@ import java.util.List; @Slf4j @Service public class TaskManagerServiceImpl implements TaskManagerService { - @Autowired private TableStatusRegister tableStatusRegister; @@ -43,12 +44,13 @@ public class TaskManagerServiceImpl implements TaskManagerService { * * @param tableName tableName * @param endpoint endpoint {@link org.opengauss.datachecker.common.entry.enums.Endpoint} + * @param status status */ @Override - public void refreshTableExtractStatus(String tableName, Endpoint endpoint) { + public void refreshTableExtractStatus(String tableName, Endpoint endpoint, int status) { log.info("check server refresh endpoint=[{}] extract tableName=[{}] status=[{}] ", endpoint.getDescription(), - tableName, endpoint.getCode()); - tableStatusRegister.update(tableName, endpoint.getCode()); + tableName, status); + tableStatusRegister.update(tableName, status); } /** @@ -68,7 +70,6 @@ public class TaskManagerServiceImpl implements TaskManagerService { throw new CheckingException("The last verification process is being executed," + " and the table verification status data cannot be reinitialized!"); } - } /** @@ -78,4 +79,15 @@ public class TaskManagerServiceImpl implements TaskManagerService { public void cleanTaskStatus() { tableStatusRegister.removeAll(); } + + /** + * query check status of current table + * + * @param tableName tableName + * @return status + */ + @Override + public int queryTableCheckStatus(String tableName) { + return tableStatusRegister.get(tableName); + } } diff --git a/datachecker-check/src/main/java/org/opengauss/datachecker/check/service/IncrementManagerService.java b/datachecker-check/src/main/java/org/opengauss/datachecker/check/service/IncrementManagerService.java index 5dc56791e99e44167c22f530a08315b24a93bdc4..b701f5275dc8dcce33923473c51870719fb63038 100644 --- a/datachecker-check/src/main/java/org/opengauss/datachecker/check/service/IncrementManagerService.java +++ b/datachecker-check/src/main/java/org/opengauss/datachecker/check/service/IncrementManagerService.java @@ -15,23 +15,40 @@ package org.opengauss.datachecker.check.service; +import com.alibaba.fastjson.JSONException; +import com.alibaba.fastjson.JSONObject; +import lombok.extern.slf4j.Slf4j; import org.opengauss.datachecker.check.client.FeignClientService; +import org.opengauss.datachecker.check.modules.check.CheckDiffResult; import org.opengauss.datachecker.common.entry.enums.Endpoint; import org.opengauss.datachecker.common.entry.extract.SourceDataLog; +import org.opengauss.datachecker.common.exception.CheckingException; +import org.opengauss.datachecker.common.util.FileUtils; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Service; +import java.io.File; +import java.nio.file.Path; import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; import java.util.List; +import java.util.Map; +import java.util.Set; /** + * IncrementManagerService + * * @author :wangchao * @date :Created in 2022/6/14 * @since :11 */ +@Slf4j @Service public class IncrementManagerService { - + @Value("${data.check.data-path}") + private String path; @Autowired private FeignClientService feignClientService; @@ -43,7 +60,6 @@ public class IncrementManagerService { public void notifySourceIncrementDataLogs(List dataLogList) { // Collect the last verification results and build an incremental verification log dataLogList.addAll(collectLastResults()); - feignClientService.notifyIncrementDataLogs(Endpoint.SOURCE, dataLogList); feignClientService.notifyIncrementDataLogs(Endpoint.SINK, dataLogList); } @@ -55,7 +71,45 @@ public class IncrementManagerService { */ private List collectLastResults() { List dataLogList = new ArrayList<>(); - + final List checkResultFileList = FileUtils.loadDirectory(getResultPath()); + List historyResultList = new ArrayList<>(); + checkResultFileList.forEach(checkResultFile -> { + try { + String content = FileUtils.readFileContents(checkResultFile); + historyResultList.add(JSONObject.parseObject(content, CheckDiffResult.class)); + } catch (CheckingException | JSONException ex) { + log.error("load check result {} has error", checkResultFile.getFileName()); + } + }); + parseCheckResult(historyResultList, dataLogList); return dataLogList; } + + private String getResultPath() { + String rootPath = path.endsWith(File.separator) ? path : path + File.separator; + return rootPath + "result" + File.separator; + } + + private void parseCheckResult(List historyDataList, List dataLogList) { + Map dataLogMap = new HashMap<>(); + historyDataList.forEach(dataLog -> { + final String tableName = dataLog.getTable(); + if (dataLogMap.containsKey(tableName)) { + dataLogMap.get(tableName).getCompositePrimaryValues().addAll(getDiffKeyValues(dataLog)); + } else { + SourceDataLog sourceDataLog = new SourceDataLog(); + sourceDataLog.setTableName(tableName).setCompositePrimaryValues(getDiffKeyValues(dataLog)); + dataLogMap.put(tableName, sourceDataLog); + } + }); + dataLogList.addAll(dataLogMap.values()); + } + + private List getDiffKeyValues(CheckDiffResult dataLog) { + Set keyValues = new HashSet<>(); + keyValues.addAll(dataLog.getKeyInsertSet()); + keyValues.addAll(dataLog.getKeyUpdateSet()); + keyValues.addAll(dataLog.getKeyDeleteSet()); + return new ArrayList<>(keyValues); + } } diff --git a/datachecker-check/src/main/java/org/opengauss/datachecker/check/service/TableKafkaService.java b/datachecker-check/src/main/java/org/opengauss/datachecker/check/service/TableKafkaService.java index 9d191c64d7702b7559b6c3be0a7380ca273624ba..c7e96439cae4ce3e6bd7100eafd99bf745af48e5 100644 --- a/datachecker-check/src/main/java/org/opengauss/datachecker/check/service/TableKafkaService.java +++ b/datachecker-check/src/main/java/org/opengauss/datachecker/check/service/TableKafkaService.java @@ -17,13 +17,14 @@ package org.opengauss.datachecker.check.service; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; +import org.opengauss.datachecker.check.modules.check.KafkaConsumerHandler; import org.opengauss.datachecker.check.modules.check.KafkaConsumerService; import org.opengauss.datachecker.common.entry.check.TopicRecordInfo; import org.opengauss.datachecker.common.entry.extract.RowDataHash; import org.opengauss.datachecker.common.entry.extract.Topic; import org.springframework.stereotype.Service; -import java.util.ArrayList; +import java.util.LinkedList; import java.util.List; import java.util.stream.IntStream; @@ -48,10 +49,14 @@ public class TableKafkaService { * @return table kafka info */ public List getTableKafkaConsumerInfo(String topicName, int partitionTotal) { - List list = new ArrayList<>(); + List list = new LinkedList<>(); Topic topic = new Topic().setTopicName(topicName).setPartitions(partitionTotal); + IntStream.range(0, topic.getPartitions()).forEach(partitions -> { - final List rowDataHashes = kafkaConsumerService.queryRowData(topic, partitions, true); + final KafkaConsumerHandler consumerHandler = + new KafkaConsumerHandler(kafkaConsumerService.buildKafkaConsumer(false), + kafkaConsumerService.getRetryFetchRecordTimes()); + final List rowDataHashes = consumerHandler.queryRowData(topic, partitions, true); log.info("topic={},partitions={} record-size={}", topic.getTopicName(), partitions, rowDataHashes.size()); final TopicRecordInfo recordInfo = new TopicRecordInfo().setTopic(topic.getTopicName()).setPartitions(partitions) diff --git a/datachecker-check/src/main/java/org/opengauss/datachecker/check/service/impl/CheckServiceImpl.java b/datachecker-check/src/main/java/org/opengauss/datachecker/check/service/impl/CheckServiceImpl.java index 3cc844a1150e5194518458a7fd7b4bf32e8ae6f6..1ee339fd74e58d44913c0d10eb89dc035f032c7a 100644 --- a/datachecker-check/src/main/java/org/opengauss/datachecker/check/service/impl/CheckServiceImpl.java +++ b/datachecker-check/src/main/java/org/opengauss/datachecker/check/service/impl/CheckServiceImpl.java @@ -47,8 +47,6 @@ import javax.annotation.Resource; import java.time.LocalDateTime; import java.util.List; import java.util.Objects; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Future; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -91,27 +89,20 @@ public class CheckServiceImpl implements CheckService { * Verify polling thread name */ private static final String SELF_CHECK_POLL_THREAD_NAME = "check-polling-thread"; - private static final String START_MESSAGE = "the execution time of %s process is %s"; @Autowired private FeignClientService feignClientService; - @Autowired private TableStatusRegister tableStatusRegister; - @Resource private DataCheckService dataCheckService; - @Autowired private DataCheckProperties properties; - @Autowired private EndpointMetaDataManager endpointMetaDataManager; - @Value("${data.check.auto-clean-environment}") private boolean isAutoCleanEnvironment = true; - @Value("${data.check.check-with-sync-extracting}") private boolean isCheckWithSyncExtracting = true; @@ -279,18 +270,12 @@ public class CheckServiceImpl implements CheckService { private void startCheckTableThread(String tableName) { Topic topic = feignClientService.queryTopicInfo(Endpoint.SOURCE, tableName); - if (Objects.nonNull(topic)) { tableStatusRegister.initPartitionsStatus(tableName, topic.getPartitions()); IntStream.range(0, topic.getPartitions()).forEach(idxPartition -> { log.info("kafka consumer topic=[{}] partitions=[{}]", topic.toString(), idxPartition); // Verify the data according to the table name and Kafka partition - try { - final Future future = dataCheckService.checkTableData(topic, idxPartition); - future.get(); - } catch (InterruptedException | ExecutionException e) { - log.info("data check topic=[{}] partitions=[{}] error:", topic.toString(), idxPartition, e); - } + dataCheckService.checkTableData(topic, idxPartition); }); } } diff --git a/datachecker-check/src/main/resources/application.yml b/datachecker-check/src/main/resources/application.yml index 1281d9e4b32d9a0328e9013cdbe74fbdc6563efe..751674f3cec47beb8c959e677421da667f42effb 100644 --- a/datachecker-check/src/main/resources/application.yml +++ b/datachecker-check/src/main/resources/application.yml @@ -2,7 +2,7 @@ server: port: 9000 shutdown: graceful -debug: false +debug: true spring: application: @@ -18,7 +18,9 @@ spring: key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer fetch-min-size: 1 - max-poll-records: 10000 + max-poll-records: 20000 + request-timeout-ms: 300000 + fetch-max-bytes: 536870912 # 512M datasource: druid: diff --git a/datachecker-check/src/test/java/org/opengauss/datachecker/check/controller/TaskStatusControllerTest.java b/datachecker-check/src/test/java/org/opengauss/datachecker/check/controller/TaskStatusControllerTest.java index 6c3c0e3eda41788a913dbd050ed9773474afdd38..c909032befa77a647f3cca6fa93a3958f175140d 100644 --- a/datachecker-check/src/test/java/org/opengauss/datachecker/check/controller/TaskStatusControllerTest.java +++ b/datachecker-check/src/test/java/org/opengauss/datachecker/check/controller/TaskStatusControllerTest.java @@ -59,6 +59,6 @@ class TaskStatusControllerTest { // Verify the results assertThat(response.getStatus()).isEqualTo(HttpStatus.OK.value()); assertThat(response.getContentAsString()).isEqualTo(""); - verify(taskManagerService).refreshTableExtractStatus("tableName", Endpoint.SOURCE); + verify(taskManagerService).refreshTableExtractStatus("tableName", Endpoint.SOURCE, 1); } } diff --git a/datachecker-common/src/main/java/org/opengauss/datachecker/common/entry/debezium/DebeziumData.java b/datachecker-common/src/main/java/org/opengauss/datachecker/common/entry/debezium/DebeziumData.java index a832bb7fb1864149cfde0184356f68cd68303604..9454804af9942f2e44345c11f4d3623e8ea574fd 100644 --- a/datachecker-common/src/main/java/org/opengauss/datachecker/common/entry/debezium/DebeziumData.java +++ b/datachecker-common/src/main/java/org/opengauss/datachecker/common/entry/debezium/DebeziumData.java @@ -28,11 +28,12 @@ import java.util.List; */ @Data public class DebeziumData { - private DebePayload payload; + private DebeziumSchema schema; + private DebeziumPayload payload; } @Data -class DebeSchema { +class DebeziumSchema { private String name; private String type; private List fields; diff --git a/datachecker-common/src/main/java/org/opengauss/datachecker/common/entry/debezium/DebePayload.java b/datachecker-common/src/main/java/org/opengauss/datachecker/common/entry/debezium/DebeziumPayload.java similarity index 42% rename from datachecker-common/src/main/java/org/opengauss/datachecker/common/entry/debezium/DebePayload.java rename to datachecker-common/src/main/java/org/opengauss/datachecker/common/entry/debezium/DebeziumPayload.java index 6806ce4d4cf56171ce1a3dceb3b882979d80852c..80c15b22e617c62bb4bbe699ab2005c463b0deab 100644 --- a/datachecker-common/src/main/java/org/opengauss/datachecker/common/entry/debezium/DebePayload.java +++ b/datachecker-common/src/main/java/org/opengauss/datachecker/common/entry/debezium/DebeziumPayload.java @@ -17,61 +17,21 @@ package org.opengauss.datachecker.common.entry.debezium; import lombok.Data; -import java.util.List; import java.util.Map; /** - * DebePayload + * DebeziumPayload * * @author :wangchao * @date :Created in 2022/6/30 * @since :11 */ @Data -public class DebePayload { +public class DebeziumPayload { private PayloadSource source; private Map before; private Map after; - private String databaseName; - private String schemaName; - private String ddl; - private List tableChanges; -} - -@Data -class PayloadTableChange { - private String id; - private String type; - private PayloadTable table; -} - -@Data -class PayloadTable { - private String defaultCharsetName; - private List primaryKeyColumnNames; - private List primaryKeyColumnChanges; - private List foreignKeyColumns; - private List uniqueColumns; - private List checkColumns; - private List columns; - private String comment; -} - -@Data -class PayloadTableColumns { - private String name; - private int jdbcType; - private String nativeType; - private String typeName; - private String typeExpression; - private String charsetName; - private int length; - private int scale; - private int position; - private boolean optional; - private String defaultValueExpression; - private boolean autoIncremented; - private boolean generated; - private String comment; - private List modifyKeys; + private String op; + private String ts_ms; + private String transaction; } \ No newline at end of file diff --git a/datachecker-common/src/main/java/org/opengauss/datachecker/common/util/FileUtils.java b/datachecker-common/src/main/java/org/opengauss/datachecker/common/util/FileUtils.java index 6c08131fe3ac6edbe668aca3e4e13a9be3cfa9a0..c5e00db1fb52b87eb5d63ec1ccdcd5165634c27b 100644 --- a/datachecker-common/src/main/java/org/opengauss/datachecker/common/util/FileUtils.java +++ b/datachecker-common/src/main/java/org/opengauss/datachecker/common/util/FileUtils.java @@ -16,15 +16,20 @@ package org.opengauss.datachecker.common.util; import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; import java.io.File; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.nio.file.Files; +import java.nio.file.NotDirectoryException; +import java.nio.file.Path; import java.nio.file.Paths; import java.nio.file.StandardOpenOption; +import java.util.ArrayList; import java.util.List; import java.util.Set; +import java.util.stream.Collectors; /** * FileUtils @@ -95,6 +100,26 @@ public class FileUtils { } } + /** + * Load files under the specified path + * + * @param fileDirectory fileDirectory + * @return file paths + */ + public static List loadDirectory(String fileDirectory) { + try { + final Path pathDirectory = Path.of(fileDirectory); + if (Files.isDirectory(pathDirectory)) { + return Files.list(pathDirectory).collect(Collectors.toList()); + } else { + throw new NotDirectoryException(fileDirectory); + } + } catch (IOException e) { + log.error("file write error:", e); + } + return new ArrayList<>(0); + } + /** * Deletes a file if it exists. * @@ -107,4 +132,20 @@ public class FileUtils { log.error("file write error:", e); } } + + /** + * Read the contents of the specified file + * + * @param filePath filePath + * @return file content + */ + public static String readFileContents(Path filePath) { + try { + final byte[] bytes = Files.readAllBytes(filePath); + return new String(bytes, StandardCharsets.UTF_8); + } catch (IOException e) { + log.error("file read error:", e); + } + return StringUtils.EMPTY; + } } diff --git a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/cache/TableExtractStatusCache.java b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/cache/TableExtractStatusCache.java index 1bd464a7608117e133fd2cf2360fc5c8e4d82e1c..22b73e125f387899777c9ecc1a0c29eb6154041d 100644 --- a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/cache/TableExtractStatusCache.java +++ b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/cache/TableExtractStatusCache.java @@ -23,6 +23,7 @@ import org.springframework.util.Assert; import java.util.Map; import java.util.Objects; import java.util.Set; +import java.util.Vector; import java.util.concurrent.ConcurrentHashMap; import java.util.stream.IntStream; @@ -60,6 +61,8 @@ public class TableExtractStatusCache { */ private static final Map> TABLE_EXTRACT_STATUS_MAP = new ConcurrentHashMap<>(); + private static final Vector ERROR_LIST = new Vector<>(); + /** * Table data extraction task status initialization. {code map} is a set of table decomposition tasks. * @@ -97,13 +100,32 @@ public class TableExtractStatusCache { // update status tableStatus.put(ordinal, STATUS_COMPLATE); - log.info("update tableName : {}, ordinal : {} check completed-status {}", tableName, ordinal, + log.info("update tableName : {}, ordinal : {} extract completed-status {}", tableName, ordinal, STATUS_COMPLATE); } catch (Exception exception) { log.error(Message.UPDATE_STATUS_EXCEPTION, exception); } } + /** + * Mark current table data extraction exception + * + * @param tableName tableName + */ + public static void addErrorList(String tableName) { + ERROR_LIST.add(tableName); + } + + /** + * Check whether the current table is marked as data extraction exception table + * + * @param tableName tableName + * @return If true is returned, it indicates that the current table has a task occurred an exceptio + */ + public static boolean hasErrorOccurred(String tableName) { + return ERROR_LIST.contains(tableName); + } + /** * data extraction status cache message management */ diff --git a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/client/CheckingFeignClient.java b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/client/CheckingFeignClient.java index 8a229ee537872cfd362ace4ca81b03da0eca05fa..96935aa7046d50077988beeed00349965dc97c0d 100644 --- a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/client/CheckingFeignClient.java +++ b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/client/CheckingFeignClient.java @@ -19,6 +19,7 @@ import org.opengauss.datachecker.common.entry.enums.Endpoint; import org.opengauss.datachecker.common.entry.extract.SourceDataLog; import org.springframework.cloud.openfeign.FeignClient; import org.springframework.lang.NonNull; +import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestBody; import org.springframework.web.bind.annotation.RequestParam; @@ -48,10 +49,11 @@ public interface CheckingFeignClient { * * @param tableName table name * @param endpoint endpoint enum type {@link org.opengauss.datachecker.common.entry.enums.Endpoint} + * @param status status */ @PostMapping("/table/extract/status") void refreshTableExtractStatus(@RequestParam(value = "tableName") @NotEmpty String tableName, - @RequestParam(value = "endpoint") @NonNull Endpoint endpoint); + @RequestParam(value = "endpoint") @NonNull Endpoint endpoint, @RequestParam(value = "status") int status); /** * Initializing task status @@ -68,4 +70,10 @@ public interface CheckingFeignClient { */ @PostMapping("/notify/source/increment/data/logs") void notifySourceIncrementDataLogs(@RequestBody @NotEmpty List dataLogList); + + /** + * health check + */ + @GetMapping("/check/health") + void health(); } \ No newline at end of file diff --git a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/config/DruidDataSourceConfig.java b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/config/DruidDataSourceConfig.java index ed309ff44007a995d711962807d2022d64501c67..be7394107dcb60a991576d2bd82e4ed3fd37729e 100644 --- a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/config/DruidDataSourceConfig.java +++ b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/config/DruidDataSourceConfig.java @@ -60,7 +60,9 @@ public class DruidDataSourceConfig { */ @Bean("jdbcTemplateOne") public JdbcTemplate jdbcTemplateOne(@Qualifier("dataSourceOne") DataSource dataSourceOne) { - return new JdbcTemplate(dataSourceOne); + JdbcTemplate jdbcTemplate = new JdbcTemplate(dataSourceOne); + jdbcTemplate.setFetchSize(50000); + return jdbcTemplate; } /** diff --git a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/config/ExtractProperties.java b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/config/ExtractProperties.java index c2a6c56cdc8de7d96c5210bcabe61fa1ba5032e9..7352f02e75931b9a069aaabdbd2737c8b9a19826 100644 --- a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/config/ExtractProperties.java +++ b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/config/ExtractProperties.java @@ -59,7 +59,7 @@ public class ExtractProperties { * By default,this function is disabled. the default value is false. */ @NotNull(message = "whether to enable debezium configuration, which cannot be empty") - private Boolean debeziumEnable; + private boolean isDebeziumEnable = false; /** * Debezium incremental migration verification topic. * Debezium listens to incremental data in tables and uses a single topic for incremental data management. @@ -83,10 +83,11 @@ public class ExtractProperties { /** * debezium incremental migration verification period: 24 x 60 (unit:minute) */ - private int debeziumTimePeriod; + private int debeziumTimePeriod = 1; /** * debezium incremental migration verification statistics incremental change record count threshold. * the threshold must be greater than 100. */ private int debeziumNumPeriod = 1000; + private int debeziumNumDefaultPeriod = 1000; } diff --git a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/config/KafkaConsumerConfig.java b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/config/KafkaConsumerConfig.java index 7319cb4d2365be19fe20af8786b9bfe163a4602b..502dbac7831a2dda554d832f9ccbc3db0d1dc3bf 100644 --- a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/config/KafkaConsumerConfig.java +++ b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/config/KafkaConsumerConfig.java @@ -19,7 +19,6 @@ import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.serialization.StringDeserializer; -import org.opengauss.datachecker.common.entry.check.IncrementCheckTopic; import org.opengauss.datachecker.extract.constants.ExtConstants; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.autoconfigure.kafka.KafkaProperties; @@ -42,7 +41,6 @@ import java.util.concurrent.ConcurrentHashMap; @Component @EnableConfigurationProperties(KafkaProperties.class) public class KafkaConsumerConfig { - private static final Object LOCK = new Object(); private static final Map> CONSUMER_MAP = new ConcurrentHashMap<>(); @@ -71,11 +69,17 @@ public class KafkaConsumerConfig { return consumer; } - public KafkaConsumer getDebeziumConsumer(IncrementCheckTopic topic) { + /** + * Obtaining a specified consumer client based on topic. + * + * @param groupId groupId + * @return consumer client. + */ + public KafkaConsumer getDebeziumConsumer(String groupId) { Properties props = new Properties(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, String.join(ExtConstants.DELIMITER, properties.getBootstrapServers())); - props.put(ConsumerConfig.GROUP_ID_CONFIG, topic.getGroupId()); + props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, properties.getConsumer().getAutoOffsetReset()); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); diff --git a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/debe/DataConsolidationServiceImpl.java b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/debe/DataConsolidationServiceImpl.java index 550e089f34e0ddbc848951baab9f6066c94b1199..83fadbd040e47b9e080c797ecc6ed7be21c9bc58 100644 --- a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/debe/DataConsolidationServiceImpl.java +++ b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/debe/DataConsolidationServiceImpl.java @@ -15,6 +15,7 @@ package org.opengauss.datachecker.extract.debe; +import com.alibaba.fastjson.JSONException; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; @@ -24,11 +25,11 @@ import org.opengauss.datachecker.common.entry.check.IncrementCheckTopic; import org.opengauss.datachecker.common.entry.enums.Endpoint; import org.opengauss.datachecker.common.entry.extract.SourceDataLog; import org.opengauss.datachecker.common.exception.DebeziumConfigException; -import org.opengauss.datachecker.common.util.IdGenerator; import org.opengauss.datachecker.extract.cache.MetaDataCache; import org.opengauss.datachecker.extract.config.ExtractProperties; import org.opengauss.datachecker.extract.config.KafkaConsumerConfig; import org.opengauss.datachecker.extract.kafka.KafkaAdminService; +import org.opengauss.datachecker.extract.service.MetaDataService; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import org.springframework.util.Assert; @@ -57,23 +58,26 @@ public class DataConsolidationServiceImpl implements DataConsolidationService { private static final IncrementCheckConfig INCREMENT_CHECK_CONIFG = new IncrementCheckConfig(); private final Object lock = new Object(); + private final DebeziumDataHandler debeziumDataHandler = new DebeziumDataHandler(); private KafkaConsumer debeziumTopicOffSetConsumer = null; - @Autowired - private DebeziumDataHandler debeziumDataHandler; + @Autowired private KafkaConsumerConfig consumerConfig; @Autowired private KafkaAdminService kafkaAdminService; @Autowired private ExtractProperties extractProperties; + @Autowired + private MetaDataService metaDataService; /** * initIncrementConfig */ @PostConstruct public void initIncrementConfig() { - if (extractProperties.getDebeziumEnable()) { + if (extractProperties.isDebeziumEnable()) { + metaDataService.init(); INCREMENT_CHECK_CONIFG.setDebeziumTopic(extractProperties.getDebeziumTopic()) .setDebeziumTables(extractProperties.getDebeziumTables()) .setPartitions(extractProperties.getDebeziumTopicPartitions()) @@ -92,17 +96,19 @@ public class DataConsolidationServiceImpl implements DataConsolidationService { public List getDebeziumTopicRecords(String topicName) { checkIncrementCheckEnvironment(); IncrementCheckTopic topic = getDebeziumTopic(); - topic.setTopic(topicName).setGroupId(IdGenerator.nextId36()); - KafkaConsumer kafkaConsumer = consumerConfig.getDebeziumConsumer(topic); + // if test service reset the group id is IdGenerator.nextId36() + topic.setTopic(topicName); + KafkaConsumer kafkaConsumer = consumerConfig.getDebeziumConsumer(topic.getGroupId()); + kafkaConsumer.subscribe(List.of(topicName)); log.info("kafka debezium topic consumer topic=[{}]", topicName); // Consume a partition data of a topic List dataList = new ArrayList<>(); - comsumerAllRecords(kafkaConsumer, dataList); + consumerAllRecords(kafkaConsumer, dataList); log.info("kafka consumer topic=[{}] dataList=[{}]", topicName, dataList.size()); return dataList; } - private void comsumerAllRecords(KafkaConsumer kafkaConsumer, List dataList) { + private void consumerAllRecords(KafkaConsumer kafkaConsumer, List dataList) { log.debug("kafka Consumer poll"); DebeziumDataLogs debeziumDataLogs = new DebeziumDataLogs(); int consumerRecords = getConsumerRecords(kafkaConsumer, debeziumDataLogs); @@ -110,7 +116,7 @@ public class DataConsolidationServiceImpl implements DataConsolidationService { consumerRecords = getConsumerRecords(kafkaConsumer, debeziumDataLogs); } dataList.addAll(debeziumDataLogs.values()); - log.debug("Consumer data debezium DataHandler"); + log.debug("Consumer data debezium data handler"); } /** @@ -125,7 +131,7 @@ public class DataConsolidationServiceImpl implements DataConsolidationService { consumerRecords.forEach(record -> { try { debeziumDataHandler.handler(record.value(), debeziumDataLogs); - } catch (DebeziumConfigException ex) { + } catch (DebeziumConfigException | JSONException ex) { // Abnormal message structure, ignoring the current message log.error("Abnormal message structure, ignoring the current message,{},{}", record.value(), ex.getMessage()); @@ -160,16 +166,15 @@ public class DataConsolidationServiceImpl implements DataConsolidationService { private KafkaConsumer getDebeziumTopicOffSetConsumer() { if (Objects.nonNull(debeziumTopicOffSetConsumer)) { return debeziumTopicOffSetConsumer; - } - if (Objects.isNull(debeziumTopicOffSetConsumer)) { + } else { synchronized (lock) { if (Objects.isNull(debeziumTopicOffSetConsumer)) { IncrementCheckTopic topic = getDebeziumTopic(); final TopicPartition topicPartition = new TopicPartition(topic.getTopic(), 0); - KafkaConsumer kafkaConsumer = consumerConfig.getDebeziumConsumer(topic); + debeziumTopicOffSetConsumer = consumerConfig.getDebeziumConsumer(topic.getGroupId()); List partitionList = List.of(topicPartition); // Set consumption mode as partition - kafkaConsumer.assign(partitionList); + debeziumTopicOffSetConsumer.assign(partitionList); } } } @@ -210,7 +215,7 @@ public class DataConsolidationServiceImpl implements DataConsolidationService { * Check the configuration of the debezium environment for incremental verification */ private void checkIncrementCheckEnvironment() { - final Set allKeys = MetaDataCache.getAllKeys(); + final Set allKeys = metaDataService.queryMetaDataOfSchema().keySet(); // Debezium environmental inspection checkDebeziumEnvironment(INCREMENT_CHECK_CONIFG.getDebeziumTopic(), INCREMENT_CHECK_CONIFG.getDebeziumTables(), allKeys); diff --git a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/debe/DebeziumDataHandler.java b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/debe/DebeziumDataHandler.java index 4353f3f49aae6d9c55474925c7babb009c022bf5..17ac42e4a06ccef999d4d70b76d3a7b304c0cbec 100644 --- a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/debe/DebeziumDataHandler.java +++ b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/debe/DebeziumDataHandler.java @@ -17,10 +17,9 @@ package org.opengauss.datachecker.extract.debe; import com.alibaba.fastjson.JSONObject; import lombok.extern.slf4j.Slf4j; -import org.opengauss.datachecker.common.entry.debezium.DebePayload; import org.opengauss.datachecker.common.entry.debezium.DebeziumData; +import org.opengauss.datachecker.common.entry.debezium.DebeziumPayload; import org.opengauss.datachecker.common.entry.debezium.PayloadSource; -import org.springframework.stereotype.Service; import javax.validation.constraints.NotNull; import java.util.Map; @@ -33,7 +32,6 @@ import java.util.Map; * @since :11 */ @Slf4j -@Service public class DebeziumDataHandler { /** * Debezium message parsing and adding the parsing result to the {@code DebeziumDataLogs.class} result set @@ -43,7 +41,7 @@ public class DebeziumDataHandler { */ public void handler(String message, @NotNull DebeziumDataLogs debeziumDataLogs) { final DebeziumData debeziumData = JSONObject.parseObject(message, DebeziumData.class); - final DebePayload payload = debeziumData.getPayload(); + final DebeziumPayload payload = debeziumData.getPayload(); final Map before = payload.getBefore(); final Map after = payload.getAfter(); final PayloadSource source = payload.getSource(); diff --git a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/debe/IncrementDataAnalysisService.java b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/debe/IncrementDataAnalysisService.java index 64472ab7a97da95684009db0db4f5c21c62a2d94..449c6926599ea2aa9fcb23eb22a4e6514ec26333 100644 --- a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/debe/IncrementDataAnalysisService.java +++ b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/debe/IncrementDataAnalysisService.java @@ -15,15 +15,18 @@ package org.opengauss.datachecker.extract.debe; +import feign.FeignException; import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.collections4.CollectionUtils; import org.opengauss.datachecker.common.entry.check.IncrementCheckTopic; import org.opengauss.datachecker.common.entry.extract.SourceDataLog; +import org.opengauss.datachecker.common.exception.ExtractException; import org.opengauss.datachecker.common.util.ThreadUtil; import org.opengauss.datachecker.extract.client.CheckingFeignClient; import org.opengauss.datachecker.extract.config.ExtractProperties; import org.springframework.stereotype.Service; import org.springframework.util.Assert; -import org.springframework.util.CollectionUtils; import javax.annotation.PostConstruct; import java.util.List; @@ -38,6 +41,7 @@ import java.util.concurrent.atomic.AtomicLong; * @date :Created in 2022/7/4 * @since :11 */ +@Slf4j @RequiredArgsConstructor @Service public class IncrementDataAnalysisService { @@ -68,6 +72,7 @@ public class IncrementDataAnalysisService { @PostConstruct public void startIncrDataAnalysis() { if (extractProperties.getDebeziumEnable() && consolidationService.isSourceEndpoint()) { + log.info("Start incremental verification analysis"); verificationConfiguration(); IncrementCheckTopic topicRecordOffSet = consolidationService.getDebeziumTopicRecordOffSet(); // Start the initialization load to verify the topic offset @@ -78,18 +83,21 @@ public class IncrementDataAnalysisService { } private void verificationConfiguration() { + log.info("Incremental verification configuration parameter check"); final int debeziumTimePeriod = extractProperties.getDebeziumTimePeriod(); final int debeziumNumPeriod = extractProperties.getDebeziumNumPeriod(); + final int defaultPeriod = extractProperties.getDebeziumNumDefaultPeriod(); Assert.isTrue(debeziumTimePeriod > 0, "Debezium incremental migration verification, the time period should be greater than 0"); - Assert.isTrue(debeziumNumPeriod > 100, "Debezium incremental migration verification statistics:" - + "the threshold value of the number of incremental change records should be greater than 100"); + Assert.isTrue(debeziumNumPeriod >= defaultPeriod, "Debezium incremental migration verification statistics:" + + "the value of the number of incremental change records should be greater than " + defaultPeriod); } /** * Incremental log data record extraction scheduling task */ public void dataAnalysis() { + log.info("Start the incremental verification data analysis thread"); SCHEDULED_EXECUTOR .scheduleWithFixedDelay(peekDebeziumTopicRecordOffset(), DataNumAnalysisThreadConstant.INITIAL_DELAY, DataNumAnalysisThreadConstant.DELAY, TimeUnit.SECONDS); @@ -103,8 +111,15 @@ public class IncrementDataAnalysisService { private Runnable peekDebeziumTopicRecordOffset() { return () -> { Thread.currentThread().setName(DataNumAnalysisThreadConstant.NAME); - dataNumAnalysis(); - dataTimeAnalysis(); + try { + checkingFeignClient.health(); + dataNumAnalysis(); + dataTimeAnalysis(); + } catch (FeignException ex) { + log.error("check service has an error occurred. {}", ex.getMessage()); + } catch (ExtractException ex) { + log.error("peek debezium topic record offset has an error occurred,", ex); + } }; } @@ -112,6 +127,7 @@ public class IncrementDataAnalysisService { * Incremental log data extraction and time latitude management */ public void dataTimeAnalysis() { + log.info("Incremental log data extraction and time latitude management"); long time = System.currentTimeMillis(); if ((time - lastTimestampAtomic.get()) >= extractProperties.getDebeziumTimePeriod()) { final List debeziumTopicRecords = @@ -129,6 +145,7 @@ public class IncrementDataAnalysisService { * Incremental log data extraction, quantity and latitude management */ public void dataNumAnalysis() { + log.info("Incremental log data extraction, quantity and latitude management"); final long offset = consolidationService.getDebeziumTopicRecordEndOffSet(); // Verify whether the data volume threshold dimension scenario trigger conditions are met if ((offset - lastOffSetAtomic.get()) >= extractProperties.getDebeziumNumPeriod()) { @@ -136,8 +153,11 @@ public class IncrementDataAnalysisService { // the data is extracted and pushed to the verification service. final List debeziumTopicRecords = consolidationService.getDebeziumTopicRecords(extractProperties.getDebeziumTopic()); - checkingFeignClient.notifySourceIncrementDataLogs(debeziumTopicRecords); - lastOffSetAtomic.addAndGet(debeziumTopicRecords.size()); + if (CollectionUtils.isNotEmpty(debeziumTopicRecords)) { + checkingFeignClient.notifySourceIncrementDataLogs(debeziumTopicRecords); + lastOffSetAtomic.addAndGet(debeziumTopicRecords.size()); + } + // Trigger data volume threshold dimension scenario - update time threshold setLastTimestampAtomicCurrentTime(); } diff --git a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/kafka/KafkaProducerWapper.java b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/kafka/KafkaProducerWapper.java index 4ad148b4c4b5e09a3df64bdee633d9580b361d1f..ed5251b8e62c463040df9444061760bfdf9a1b9a 100644 --- a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/kafka/KafkaProducerWapper.java +++ b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/kafka/KafkaProducerWapper.java @@ -25,7 +25,6 @@ import org.opengauss.datachecker.extract.config.KafkaProducerConfig; import java.util.ArrayList; import java.util.List; -import java.util.concurrent.atomic.AtomicInteger; /** * KafkaProducerWapper @@ -68,20 +67,18 @@ public class KafkaProducerWapper { private void sendRecordToSinglePartitionTopic(List recordHashList, String topicName) { final KafkaProducer kafkaProducer = kafkaProducerConfig.getKafkaProducer(topicName); - AtomicInteger cnt = new AtomicInteger(0); recordHashList.forEach(record -> { record.setPartition(DEFAULT_PARTITION); final ProducerRecord producerRecord = new ProducerRecord<>(topicName, DEFAULT_PARTITION, record.getPrimaryKey(), JSON.toJSONString(record)); - sendMessage(kafkaProducer, producerRecord, cnt); + sendMessage(kafkaProducer, producerRecord); }); kafkaProducer.flush(); - log.info("send topic={}, record size :{},cnt:{}", topicName, recordHashList.size(), cnt.get()); + log.info("send topic={}, record size :{}", topicName, recordHashList.size()); } private void sendMultiPartitionTopic(List recordHashList, String topicName, int partitions) { final KafkaProducer kafkaProducer = kafkaProducerConfig.getKafkaProducer(topicName); - AtomicInteger cnt = new AtomicInteger(0); List kafkaRecordList = new ArrayList<>(); recordHashList.forEach(record -> { int partition = calcSimplePartition(record.getPrimaryKeyHash(), partitions); @@ -89,7 +86,7 @@ public class KafkaProducerWapper { ProducerRecord producerRecord = new ProducerRecord<>(topicName, partition, record.getPrimaryKey(), JSON.toJSONString(record)); kafkaRecordList.add(producerRecord); - sendMessage(kafkaProducer, producerRecord, cnt); + sendMessage(kafkaProducer, producerRecord); }); kafkaProducer.flush(); } @@ -98,16 +95,12 @@ public class KafkaProducerWapper { return (int) Math.abs(value % mod); } - private void sendMessage(KafkaProducer kafkaProducer, ProducerRecord producerRecord, - AtomicInteger cnt) { - kafkaProducer.send(producerRecord, (metadata, exception) -> { + private void sendMessage(KafkaProducer kafkaProducer, ProducerRecord record) { + kafkaProducer.send(record, (metadata, exception) -> { if (exception != null) { - log.error("send failed,topic={},key:{} ,partition:{},offset:{}", metadata.topic(), producerRecord.key(), + log.error("send failed,topic={},key:{} ,partition:{},offset:{}", metadata.topic(), record.key(), metadata.partition(), metadata.offset(), exception); } }); - if (cnt.incrementAndGet() % FLUSH_KAFKA_PARALLEL_THRESHOLD == 0) { - kafkaProducer.flush(); - } } } diff --git a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/service/DataExtractServiceImpl.java b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/service/DataExtractServiceImpl.java index ccc2ea7ffe8cd0501ec3c25ade18e1a765d36579..508bef1ec8b83482d7861180bb59982b19a38b11 100644 --- a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/service/DataExtractServiceImpl.java +++ b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/service/DataExtractServiceImpl.java @@ -43,7 +43,6 @@ import org.opengauss.datachecker.extract.task.ExtractTaskRunnable; import org.opengauss.datachecker.extract.task.ExtractThreadSupport; import org.opengauss.datachecker.extract.task.IncrementExtractTaskRunnable; import org.opengauss.datachecker.extract.task.IncrementExtractThreadSupport; -import org.opengauss.datachecker.extract.task.RowDataHashHandler; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.beans.factory.annotation.Value; @@ -323,23 +322,6 @@ public class DataExtractServiceImpl implements DataExtractService { } } - static class DataExtractThreadExceptionHandler implements Thread.UncaughtExceptionHandler { - - /** - * Method invoked when the given thread terminates due to the - * given uncaught exception. - *

Any exception thrown by this method will be ignored by the - * Java Virtual Machine. - * - * @param thread the thread - * @param throwable the exception - */ - @Override - public void uncaughtException(Thread thread, Throwable throwable) { - log.error(thread.getName() + " exception: " + throwable); - } - } - /** * DML statement generating repair report * @@ -456,15 +438,11 @@ public class DataExtractServiceImpl implements DataExtractService { public List querySecondaryCheckRowData(SourceDataLog dataLog) { final String tableName = dataLog.getTableName(); final List compositeKeys = dataLog.getCompositePrimaryValues(); - final TableMetadata metadata = MetaDataCache.get(tableName); if (Objects.isNull(metadata)) { throw new TableNotExistException(tableName); } - List> dataRowList = - dataManipulationService.queryColumnValues(tableName, compositeKeys, metadata); - RowDataHashHandler handler = new RowDataHashHandler(); - return handler.handlerQueryResult(metadata, dataRowList); + return dataManipulationService.queryColumnHashValues(tableName, compositeKeys, metadata); } @Override diff --git a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/task/DataManipulationService.java b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/task/DataManipulationService.java index 202223c24ecca4a94a86d5eefe8f0f9862137f4f..831baf5564afbcd0c1029d443877f8750f376dfe 100644 --- a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/task/DataManipulationService.java +++ b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/task/DataManipulationService.java @@ -18,6 +18,7 @@ package org.opengauss.datachecker.extract.task; import org.apache.commons.lang3.StringUtils; import org.opengauss.datachecker.common.constant.Constants.InitialCapacity; import org.opengauss.datachecker.common.entry.extract.ColumnsMetaData; +import org.opengauss.datachecker.common.entry.extract.RowDataHash; import org.opengauss.datachecker.common.entry.extract.TableMetadata; import org.opengauss.datachecker.common.entry.extract.TableMetadataHash; import org.opengauss.datachecker.common.util.LongHashFunctionWrapper; @@ -30,6 +31,7 @@ import org.opengauss.datachecker.extract.dml.InsertDmlBuilder; import org.opengauss.datachecker.extract.dml.ReplaceDmlBuilder; import org.opengauss.datachecker.extract.dml.SelectDmlBuilder; import org.opengauss.datachecker.extract.service.MetaDataService; +import org.opengauss.datachecker.extract.util.MetaDataUtil; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.jdbc.core.JdbcTemplate; import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate; @@ -37,7 +39,6 @@ import org.springframework.stereotype.Service; import org.springframework.util.Assert; import org.springframework.util.CollectionUtils; -import java.sql.ResultSetMetaData; import java.util.ArrayList; import java.util.Comparator; import java.util.HashMap; @@ -58,6 +59,8 @@ import java.util.stream.Collectors; public class DataManipulationService { private static final LongHashFunctionWrapper HASH_UTIL = new LongHashFunctionWrapper(); + private final ResultSetHashHandler resultSetHashHandler = new ResultSetHashHandler(); + private final ResultSetHandler resultSetHandler = new ResultSetHandler(); @Autowired private JdbcTemplate jdbcTemplateOne; @Autowired @@ -70,7 +73,40 @@ public class DataManipulationService { * * @param tableName tableName * @param compositeKeys compositeKeys - * @param metadata metadata + * @param tableMetadata tableMetadata + * @return query result + */ + public List queryColumnHashValues(String tableName, List compositeKeys, + TableMetadata tableMetadata) { + Assert.isTrue(Objects.nonNull(tableMetadata), "Abnormal table metadata , failed to build select SQL"); + final List primaryMetas = tableMetadata.getPrimaryMetas(); + + Assert.isTrue(!CollectionUtils.isEmpty(primaryMetas), + "The metadata information of the table primary key is abnormal, and the construction of select SQL failed"); + + // Single primary key table data query + if (primaryMetas.size() == 1) { + final ColumnsMetaData primaryData = primaryMetas.get(0); + String querySql = + new SelectDmlBuilder().schema(extractProperties.getSchema()).columns(tableMetadata.getColumnsMetas()) + .tableName(tableName).conditionPrimary(primaryData).build(); + return queryColumnValuesSinglePrimaryKey(querySql, compositeKeys, tableMetadata); + } else { + // Compound primary key table data query + final SelectDmlBuilder dmlBuilder = new SelectDmlBuilder(); + String querySql = dmlBuilder.schema(extractProperties.getSchema()).columns(tableMetadata.getColumnsMetas()) + .tableName(tableName).conditionCompositePrimary(primaryMetas).build(); + List batchParam = dmlBuilder.conditionCompositePrimaryValue(primaryMetas, compositeKeys); + return queryColumnValuesByCompositePrimary(querySql, batchParam, tableMetadata); + } + } + + /** + * queryColumnValues + * + * @param tableName tableName + * @param compositeKeys compositeKeys + * @param metadata tableMetadata * @return query result */ public List> queryColumnValues(String tableName, List compositeKeys, @@ -87,7 +123,7 @@ public class DataManipulationService { String querySql = new SelectDmlBuilder().schema(extractProperties.getSchema()).columns(metadata.getColumnsMetas()) .tableName(tableName).conditionPrimary(primaryData).build(); - return queryColumnValues(querySql, compositeKeys); + return queryColumnValuesSinglePrimaryKey(querySql, compositeKeys); } else { // Compound primary key table data query final SelectDmlBuilder dmlBuilder = new SelectDmlBuilder(); @@ -101,13 +137,22 @@ public class DataManipulationService { /** * Compound primary key table data query * - * @param selectDml Query SQL - * @param batchParam Compound PK query parameters + * @param selectDml Query SQL + * @param batchParam Compound PK query parameters + * @param tableMetadata tableMetadata * @return Query data results */ + private List queryColumnValuesByCompositePrimary(String selectDml, List batchParam, + TableMetadata tableMetadata) { + // Query the current task data and organize the data + HashMap paramMap = new HashMap<>(InitialCapacity.CAPACITY_1); + paramMap.put(DmlBuilder.PRIMARY_KEYS, batchParam); + return queryColumnValues(selectDml, paramMap, tableMetadata); + } + private List> queryColumnValuesByCompositePrimary(String selectDml, List batchParam) { // Query the current task data and organize the data - HashMap paramMap = new HashMap<>(InitialCapacity.CAPACITY_16); + HashMap paramMap = new HashMap<>(InitialCapacity.CAPACITY_1); paramMap.put(DmlBuilder.PRIMARY_KEYS, batchParam); return queryColumnValues(selectDml, paramMap); } @@ -115,13 +160,22 @@ public class DataManipulationService { /** * Single primary key table data query * - * @param selectDml Query SQL - * @param primaryKeys Query primary key collection + * @param selectDml Query SQL + * @param primaryKeys Query primary key collection + * @param tableMetadata tableMetadata * @return Query data results */ - private List> queryColumnValues(String selectDml, List primaryKeys) { + private List queryColumnValuesSinglePrimaryKey(String selectDml, List primaryKeys, + TableMetadata tableMetadata) { // Query the current task data and organize the data - HashMap paramMap = new HashMap<>(InitialCapacity.CAPACITY_16); + HashMap paramMap = new HashMap<>(InitialCapacity.CAPACITY_1); + paramMap.put(DmlBuilder.PRIMARY_KEYS, primaryKeys); + return queryColumnValues(selectDml, paramMap, tableMetadata); + } + + private List> queryColumnValuesSinglePrimaryKey(String selectDml, List primaryKeys) { + // Query the current task data and organize the data + HashMap paramMap = new HashMap<>(InitialCapacity.CAPACITY_1); paramMap.put(DmlBuilder.PRIMARY_KEYS, primaryKeys); return queryColumnValues(selectDml, paramMap); } @@ -133,17 +187,20 @@ public class DataManipulationService { * @param paramMap query parameters * @return query result */ - private List> queryColumnValues(String selectDml, Map paramMap) { + private List queryColumnValues(String selectDml, Map paramMap, + TableMetadata tableMetadata) { + List columns = MetaDataUtil.getTableColumns(tableMetadata); + List primary = MetaDataUtil.getTablePrimaryColumns(tableMetadata); // Use JDBC to query the current task to extract data NamedParameterJdbcTemplate jdbc = new NamedParameterJdbcTemplate(jdbcTemplateOne); - return jdbc.query(selectDml, paramMap, (rs, rowNum) -> { - // Get the metadata information corresponding to the current result set - ResultSetMetaData metaData = rs.getMetaData(); - // Result set processor - ResultSetHandler handler = new ResultSetHandler(); - // Data conversion of query result set according to metadata information - return handler.putOneResultSetToMap(rs, metaData); - }); + return jdbc.query(selectDml, paramMap, + (rs, rowNum) -> resultSetHashHandler.handler(primary, columns, resultSetHandler.putOneResultSetToMap(rs))); + } + + private List> queryColumnValues(String selectDml, Map paramMap) { + ResultSetHandler handler = new ResultSetHandler(); + NamedParameterJdbcTemplate jdbc = new NamedParameterJdbcTemplate(jdbcTemplateOne); + return jdbc.query(selectDml, paramMap, (rs, rowNum) -> handler.putOneResultSetToMap(rs)); } /** diff --git a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/task/ExtractTaskRunnable.java b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/task/ExtractTaskRunnable.java index 481c4d595ddf08a43085466cec875b7f7048a89e..b06898c028569da1615dc3d4344b791e5982f5af 100644 --- a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/task/ExtractTaskRunnable.java +++ b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/task/ExtractTaskRunnable.java @@ -17,22 +17,24 @@ package org.opengauss.datachecker.extract.task; import lombok.extern.slf4j.Slf4j; import org.opengauss.datachecker.common.constant.Constants.InitialCapacity; +import org.opengauss.datachecker.common.entry.enums.DataBaseType; import org.opengauss.datachecker.common.entry.enums.Endpoint; import org.opengauss.datachecker.common.entry.extract.ExtractTask; import org.opengauss.datachecker.common.entry.extract.RowDataHash; import org.opengauss.datachecker.common.entry.extract.TableMetadata; import org.opengauss.datachecker.common.entry.extract.Topic; +import org.opengauss.datachecker.common.exception.ExtractException; import org.opengauss.datachecker.common.util.ThreadUtil; import org.opengauss.datachecker.extract.cache.TableExtractStatusCache; import org.opengauss.datachecker.extract.client.CheckingFeignClient; import org.opengauss.datachecker.extract.kafka.KafkaProducerWapper; +import org.opengauss.datachecker.extract.task.sql.SelectSqlBuilder; +import org.opengauss.datachecker.extract.util.MetaDataUtil; import org.springframework.jdbc.core.JdbcTemplate; import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate; -import java.sql.ResultSetMetaData; import java.util.HashMap; import java.util.List; -import java.util.Map; /** * Data extraction thread class @@ -44,10 +46,12 @@ import java.util.Map; @Slf4j public class ExtractTaskRunnable extends KafkaProducerWapper implements Runnable { private static final String EXTRACT_THREAD_NAME_PREFIX = "EXTRACT_"; + private static final String EXTRACT_STATUS_THREAD_NAME_PREFIX = "EXTRACT_STATUS_"; private final Topic topic; private final ExtractTask task; private final Endpoint endpoint; + private final DataBaseType databaseType; private final String schema; private final JdbcTemplate jdbcTemplate; private final CheckingFeignClient checkingFeignClient; @@ -63,66 +67,87 @@ public class ExtractTaskRunnable extends KafkaProducerWapper implements Runnable super(support.getKafkaProducerConfig()); this.task = task; this.topic = topic; + databaseType = support.getExtractProperties().getDatabaseType(); schema = support.getExtractProperties().getSchema(); endpoint = support.getExtractProperties().getEndpoint(); jdbcTemplate = new JdbcTemplate(support.getDataSourceOne()); checkingFeignClient = support.getCheckingFeignClient(); } - @Override - public void run() { - Thread.currentThread().setName(EXTRACT_THREAD_NAME_PREFIX.concat(task.getTaskName())); - log.info("Data extraction task {} is starting", task.getTaskName()); + /** + * Core logic of data extraction task execution + */ + public void executeTask() { TableMetadata tableMetadata = task.getTableMetadata(); // Construct query SQL according to the metadata information of the table in the current task - String sql = new SelectSqlBulder(tableMetadata, schema, task.getStart(), task.getOffset()).builder(); + final SelectSqlBuilder sqlBuilder = new SelectSqlBuilder(tableMetadata, schema); + String sql = sqlBuilder.dataBaseType(databaseType).offset(task.getStart(), task.getOffset()).builder(); log.debug("selectSql {}", sql); - // Query data through JDBC SQL - List> dataRowList = queryColumnValues(sql); - - log.info("Data extraction task {} completes basic data query through JDBC", task.getTaskName()); - // Hash the queried data results - RowDataHashHandler handler = new RowDataHashHandler(); - List recordHashList = handler.handlerQueryResult(tableMetadata, dataRowList); + // Query data through JDBC SQL and Hash the queried data results , + // then package data into RowDataHash type Objects + log.info("Data extraction task {} start, data query through JDBC", task.getTaskName()); + List recordHashList = queryAndConvertColumnValues(sql, tableMetadata); + log.info("Data extraction task {} completes, data query through JDBC", task.getTaskName()); // Push the data to Kafka according to the fragmentation order syncSend(topic, recordHashList); - String tableName = task.getTableName(); - - // When the push is completed, the extraction status of the current task will be updated - TableExtractStatusCache.update(tableName, task.getDivisionsOrdinal()); - log.info("update extract task={} status completed", task.getTaskName()); - if (!task.isDivisions()) { - // Notify the verification service that the task data extraction corresponding to - // the current table has been completed - checkingFeignClient.refreshTableExtractStatus(tableName, endpoint); - log.info("refresh table extract status tableName={} status completed", task.getTaskName()); - } - // If the current task is a sharding task, check the sharding status of the current task before sharding and - // whether the execution is completed. - // If the previous sharding task is not completed, wait 1000 milliseconds, - // check again and try until all the previous sharding tasks are completed, - // and then refresh the current sharding status. - if (task.isDivisions() && task.getDivisionsOrdinal() == task.getDivisionsTotalNumber()) { - // The data extraction task of the current table is completed (all subtasks are completed) - // Notify the verification service that the task data extraction corresponding to - // the current table has been completed - while (!TableExtractStatusCache.checkCompleted(tableName, task.getDivisionsOrdinal())) { - ThreadUtil.sleep(1000); + ThreadUtil.newSingleThreadExecutor().submit(() -> { + Thread.currentThread().setName(EXTRACT_STATUS_THREAD_NAME_PREFIX.concat(task.getTaskName())); + String tableName = task.getTableName(); + // When the push is completed, the extraction status of the current task will be updated + TableExtractStatusCache.update(tableName, task.getDivisionsOrdinal()); + if (!task.isDivisions()) { + // Notify the verification service that the task data extraction corresponding to + // the current table has been completed + checkingFeignClient.refreshTableExtractStatus(tableName, endpoint, endpoint.getCode()); + log.info("refresh table extract status tableName={} status completed", task.getTaskName()); } - checkingFeignClient.refreshTableExtractStatus(tableName, endpoint); - log.info("refresh table=[{}] extract status completed,task=[{}]", tableName, task.getTaskName()); - } + if (task.isDivisions() && task.getDivisionsOrdinal() == task.getDivisionsTotalNumber()) { + // The data extraction task of the current table is completed (all subtasks are completed) + // Notify the verification service that the task data extraction corresponding to + // the current table has been completed + while (!TableExtractStatusCache.checkCompleted(tableName, task.getDivisionsOrdinal())) { + ThreadUtil.sleep(1000); + if (TableExtractStatusCache.hasErrorOccurred(tableName)) { + break; + } + } + if (TableExtractStatusCache.hasErrorOccurred(tableName)) { + checkingFeignClient.refreshTableExtractStatus(tableName, endpoint, -1); + log.error("refresh table=[{}] extract status error,task=[{}]", tableName, task.getTaskName()); + } else { + checkingFeignClient.refreshTableExtractStatus(tableName, endpoint, endpoint.getCode()); + log.info("refresh table=[{}] extract status completed,task=[{}]", tableName, task.getTaskName()); + } + } + }); } - private List> queryColumnValues(String sql) { - Map map = new HashMap<>(InitialCapacity.CAPACITY_16); + private List queryAndConvertColumnValues(String sql, TableMetadata tableMetadata) { + List columns = MetaDataUtil.getTableColumns(tableMetadata); + List primary = MetaDataUtil.getTablePrimaryColumns(tableMetadata); NamedParameterJdbcTemplate jdbc = new NamedParameterJdbcTemplate(jdbcTemplate); - return jdbc.query(sql, map, (rs, rowNum) -> { - ResultSetMetaData metaData = rs.getMetaData(); - ResultSetHandler handler = new ResultSetHandler(); - return handler.putOneResultSetToMap(rs, metaData); - }); + ResultSetHashHandler resultSetHashHandler = new ResultSetHashHandler(); + ResultSetHandler resultSetHandler = new ResultSetHandler(); + return jdbc.query(sql, new HashMap<>(InitialCapacity.EMPTY), + (rs, rowNum) -> resultSetHashHandler.handler(primary, columns, resultSetHandler.putOneResultSetToMap(rs))); + } + + @Override + public void run() { + Thread.currentThread().setName(EXTRACT_THREAD_NAME_PREFIX.concat(task.getTaskName())); + final String tableName = task.getTableName(); + log.info("Data extraction task {} is starting", tableName); + try { + if (TableExtractStatusCache.hasErrorOccurred(tableName)) { + log.error("table:[{}] has some error,current task=[{}] is canceled", tableName, task.getTaskName()); + return; + } + executeTask(); + } catch (ExtractException exp) { + log.error("Data extraction task {} has some exception,{}", task.getTaskName(), exp.getMessage()); + TableExtractStatusCache.addErrorList(tableName); + } } } diff --git a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/task/IncrementExtractTaskRunnable.java b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/task/IncrementExtractTaskRunnable.java index e9f9c2198933ce2aea10051977317b1d28f11e82..03924d0e837ad76ab536ec91711fd05b54f6dfd2 100644 --- a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/task/IncrementExtractTaskRunnable.java +++ b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/task/IncrementExtractTaskRunnable.java @@ -31,11 +31,11 @@ import org.opengauss.datachecker.extract.dml.DmlBuilder; import org.opengauss.datachecker.extract.dml.SelectDmlBuilder; import org.opengauss.datachecker.extract.kafka.KafkaProducerWapper; import org.opengauss.datachecker.extract.service.MetaDataService; +import org.opengauss.datachecker.extract.util.MetaDataUtil; import org.springframework.jdbc.core.JdbcTemplate; import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate; import org.springframework.util.CollectionUtils; -import java.sql.ResultSetMetaData; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -96,22 +96,18 @@ public class IncrementExtractTaskRunnable extends KafkaProducerWapper implements paramMap.put(DmlBuilder.PRIMARY_KEYS, getSqlParam(sqlBuilder, tableMetadata.getPrimaryMetas(), compositePrimaryValues)); - // Query the current task data and organize the data - List> dataRowList = queryColumnValues(sqlBuilder.build(), paramMap); + // Query the current task data and organize the data && Hash the queried data results + List dataRowList = queryColumnValues(sqlBuilder.build(), paramMap, tableMetadata); log.info("query extract task={} completed row count=[{}]", taskName, dataRowList.size()); - // Hash the queried data results - RowDataHashHandler handler = new RowDataHashHandler(); - List recordHashList = handler.handlerQueryResult(tableMetadata, dataRowList); - log.info("hash extract task={} completed", taskName); // Push the local cache to push the data to Kafka according to the fragmentation order - syncSend(topic, recordHashList); + syncSend(topic, dataRowList); log.info("send kafka extract task={} completed", taskName); // When the push is completed, the extraction status of the current task will be updated TableExtractStatusCache.update(tableName, 1); log.info("update extract task={} status completed", tableName); // Notify the verification service that the task data extraction corresponding to // the current table has been completed - checkingFeignClient.refreshTableExtractStatus(tableName, endpoint); + checkingFeignClient.refreshTableExtractStatus(tableName, endpoint, endpoint.getCode()); log.info("refush table extract status tableName={} status completed", tableName); } @@ -177,17 +173,20 @@ public class IncrementExtractTaskRunnable extends KafkaProducerWapper implements /** * Primary key table data query * - * @param selectDml Query SQL - * @param paramMap Query Parameter + * @param selectDml Query SQL + * @param paramMap Query Parameter + * @param tableMetadata tableMetadata * @return query results */ - private List> queryColumnValues(String selectDml, Map paramMap) { + private List queryColumnValues(String selectDml, Map paramMap, + TableMetadata tableMetadata) { NamedParameterJdbcTemplate jdbc = new NamedParameterJdbcTemplate(jdbcTemplate); - return jdbc.query(selectDml, paramMap, (rs, rowNum) -> { - ResultSetMetaData metaData = rs.getMetaData(); - ResultSetHandler handler = new ResultSetHandler(); - return handler.putOneResultSetToMap(rs, metaData); - }); + List columns = MetaDataUtil.getTableColumns(tableMetadata); + List primary = MetaDataUtil.getTablePrimaryColumns(tableMetadata); + ResultSetHashHandler resultSetHashHandler = new ResultSetHashHandler(); + ResultSetHandler resultSetHandler = new ResultSetHandler(); + return jdbc.query(selectDml, paramMap, + (rs, rowNum) -> resultSetHashHandler.handler(primary, columns, resultSetHandler.putOneResultSetToMap(rs))); } private TableMetadata getTableMetadata() { diff --git a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/task/ResultSetHandler.java b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/task/ResultSetHandler.java index b29b14d81ba41d2ce88941ad7858b77f732d4ffd..b052b110df5d31ba30a6f0c37ea89283827c9280 100644 --- a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/task/ResultSetHandler.java +++ b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/task/ResultSetHandler.java @@ -62,20 +62,20 @@ public class ResultSetHandler { * Convert the current query result set into map according to the metadata information of the result set * * @param resultSet JDBC Data query result set - * @param rsmd JDBC ResultSet Metadata * @return JDBC Data encapsulation results * @throws SQLException Return SQL exception */ - public Map putOneResultSetToMap(ResultSet resultSet, ResultSetMetaData rsmd) throws SQLException { - Map values = new HashMap<>(InitialCapacity.CAPACITY_16); - IntStream.range(0, rsmd.getColumnCount()).forEach(idx -> { + public Map putOneResultSetToMap(ResultSet resultSet) throws SQLException { + Map values = new HashMap<>(InitialCapacity.CAPACITY_64); + final ResultSetMetaData resultSetMetaData = resultSet.getMetaData(); + IntStream.range(0, resultSetMetaData.getColumnCount()).forEach(idx -> { try { int columnIdx = idx + 1; // Get the column and its corresponding column name - String columnLabel = rsmd.getColumnLabel(columnIdx); + String columnLabel = resultSetMetaData.getColumnLabel(columnIdx); // Get the corresponding value from the resultset result set according to the column name Object columnValue; - final int columnType = rsmd.getColumnType(columnIdx); + final int columnType = resultSetMetaData.getColumnType(columnIdx); if (SQL_TIME_TYPES.contains(columnType)) { columnValue = timeHandler(resultSet, columnIdx, columnType); } else { @@ -83,8 +83,7 @@ public class ResultSetHandler { } values.put(columnLabel, MAPPER.convertValue(columnValue, String.class)); } catch (SQLException ex) { - log.error("putOneResultSetToMap Convert data according to result set metadata information." - + " Result set exception {}", ex.getMessage()); + log.error("putOneResultSetToMap Convert data according to result set metadata information.", ex); } }); return values; diff --git a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/task/RowDataHashHandler.java b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/task/ResultSetHashHandler.java similarity index 37% rename from datachecker-extract/src/main/java/org/opengauss/datachecker/extract/task/RowDataHashHandler.java rename to datachecker-extract/src/main/java/org/opengauss/datachecker/extract/task/ResultSetHashHandler.java index d0b14ba3f5da4b3c3e0808cbce7f2881018e9c69..a1788309a67a427b64a8c01f787efe75ff4d1fe2 100644 --- a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/task/RowDataHashHandler.java +++ b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/task/ResultSetHashHandler.java @@ -16,42 +16,40 @@ package org.opengauss.datachecker.extract.task; import org.opengauss.datachecker.common.entry.extract.RowDataHash; -import org.opengauss.datachecker.common.entry.extract.TableMetadata; import org.opengauss.datachecker.extract.util.HashHandler; -import org.opengauss.datachecker.extract.util.MetaDataUtil; -import java.util.ArrayList; -import java.util.Collections; import java.util.List; import java.util.Map; /** + * ResultSetHashHandler + * * @author :wangchao - * @date :Created in 2022/6/17 + * @date :Created in 2022/9/5 * @since :11 */ -public class RowDataHashHandler { +public class ResultSetHashHandler { + private final HashHandler hashHandler = new HashHandler(); + /** - * According to the column order in the table metadata information {@code tableMetadata}, - * the queried data results are spliced, and the hash calculation of the spliced result rows is performed + *

+     * Obtain the primary key information in the ResultSet according to the primary key name of the table.
+     * Obtain all field information in the ResultSet according to the set of table field names.
+     * And hash the primary key value and the record value.
+     * The calculation result is encapsulated as a RowDataHash object
+     * 
* - * @param tableMetadata Table metadata information - * @param dataRowList Query data set + * @param primary primary list + * @param columns column list + * @param rowData Query data set * @return Returns the hash calculation result of extracted data */ - public List handlerQueryResult(TableMetadata tableMetadata, List> dataRowList) { - List recordHashList = Collections.synchronizedList(new ArrayList<>()); - HashHandler hashHandler = new HashHandler(); - List columns = MetaDataUtil.getTableColumns(tableMetadata); - List primarys = MetaDataUtil.getTablePrimaryColumns(tableMetadata); - dataRowList.forEach(rowColumnsValueMap -> { - long rowHash = hashHandler.xx3Hash(rowColumnsValueMap, columns); - String primaryValue = hashHandler.value(rowColumnsValueMap, primarys); - long primaryHash = hashHandler.xx3Hash(rowColumnsValueMap, primarys); - RowDataHash hashData = new RowDataHash(); - hashData.setPrimaryKey(primaryValue).setPrimaryKeyHash(primaryHash).setRowHash(rowHash); - recordHashList.add(hashData); - }); - return recordHashList; + public RowDataHash handler(List primary, List columns, Map rowData) { + long rowHash = hashHandler.xx3Hash(rowData, columns); + String primaryValue = hashHandler.value(rowData, primary); + long primaryHash = hashHandler.xx3Hash(rowData, primary); + RowDataHash hashData = new RowDataHash(); + hashData.setPrimaryKey(primaryValue).setPrimaryKeyHash(primaryHash).setRowHash(rowHash); + return hashData; } } diff --git a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/task/SelectSqlBulder.java b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/task/SelectSqlBulder.java deleted file mode 100644 index ac95ddb18526aee48feaab1a2f0991e7c34a50d4..0000000000000000000000000000000000000000 --- a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/task/SelectSqlBulder.java +++ /dev/null @@ -1,220 +0,0 @@ -/* - * Copyright (c) 2022-2022 Huawei Technologies Co.,Ltd. - * - * openGauss is licensed under Mulan PSL v2. - * You can use this software according to the terms and conditions of the Mulan PSL v2. - * You may obtain a copy of Mulan PSL v2 at: - * - * http://license.coscl.org.cn/MulanPSL2 - * - * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, - * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, - * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. - * See the Mulan PSL v2 for more details. - */ - -package org.opengauss.datachecker.extract.task; - -import org.opengauss.datachecker.common.entry.extract.ColumnsMetaData; -import org.opengauss.datachecker.common.entry.extract.TableMetadata; -import org.springframework.util.Assert; - -import java.util.List; -import java.util.Objects; -import java.util.stream.Collectors; - -import static org.opengauss.datachecker.extract.task.SelectSqlBulder.QuerySqlMapper.AND_CONDITION; -import static org.opengauss.datachecker.extract.task.SelectSqlBulder.QuerySqlMapper.COLUMN; -import static org.opengauss.datachecker.extract.task.SelectSqlBulder.QuerySqlMapper.DELIMITER; -import static org.opengauss.datachecker.extract.task.SelectSqlBulder.QuerySqlMapper.EQUAL_CONDITION; -import static org.opengauss.datachecker.extract.task.SelectSqlBulder.QuerySqlMapper.JOIN_ON; -import static org.opengauss.datachecker.extract.task.SelectSqlBulder.QuerySqlMapper.OFFSET; -import static org.opengauss.datachecker.extract.task.SelectSqlBulder.QuerySqlMapper.PRIMARY_KEY; -import static org.opengauss.datachecker.extract.task.SelectSqlBulder.QuerySqlMapper.QUERY_MULTIPLE_PRIMARY_KEY_OFF_SET; -import static org.opengauss.datachecker.extract.task.SelectSqlBulder.QuerySqlMapper.QUERY_OFF_SET; -import static org.opengauss.datachecker.extract.task.SelectSqlBulder.QuerySqlMapper.QUERY_OFF_SET_ZERO; -import static org.opengauss.datachecker.extract.task.SelectSqlBulder.QuerySqlMapper.SCHEMA; -import static org.opengauss.datachecker.extract.task.SelectSqlBulder.QuerySqlMapper.START; -import static org.opengauss.datachecker.extract.task.SelectSqlBulder.QuerySqlMapper.SUB_TABLE_ALIAS; -import static org.opengauss.datachecker.extract.task.SelectSqlBulder.QuerySqlMapper.TABLE_ALIAS; -import static org.opengauss.datachecker.extract.task.SelectSqlBulder.QuerySqlMapper.TABLE_NAME; - -/** - * Data extraction SQL builder - * - * @author wang chao - * @date 2022/5/12 19:17 - * @since 11 - **/ -public class SelectSqlBulder { - private static final long OFF_SET_ZERO = 0L; - /** - * Start position of task execution - */ - private final long start; - /** - * Task execution offset - */ - private final long offset; - /** - * Query data schema - */ - private final String schema; - /** - * Table metadata information - */ - private final TableMetadata tableMetadata; - - /** - * Table fragment query SQL Statement Builder - * - * @param tableMetadata tableMetadata - * @param schema schema - * @param start start - * @param offset offset - */ - public SelectSqlBulder(TableMetadata tableMetadata, String schema, long start, long offset) { - this.tableMetadata = tableMetadata; - this.start = start; - this.offset = offset; - this.schema = schema; - } - - /** - * Table fragment query SQL Statement Builder - * - * @return build sql - */ - public String builder() { - Assert.isTrue(Objects.nonNull(tableMetadata), "Abnormal table metadata information, failed to build SQL"); - List columnsMetas = tableMetadata.getColumnsMetas(); - if (offset == OFF_SET_ZERO) { - return buildSelectSqlOffsetZero(columnsMetas, tableMetadata.getTableName()); - } else { - return buildSelectSqlOffset(tableMetadata, start, offset); - } - } - - /** - * Construct query statements based on metadata information SELECT * FROM test.test1 - * - * @param columnsMetas Column metadata information - * @param tableName tableName - * @return - */ - private String buildSelectSqlOffsetZero(List columnsMetas, String tableName) { - String columnNames = - columnsMetas.stream().map(ColumnsMetaData::getColumnName).collect(Collectors.joining(DELIMITER)); - return QUERY_OFF_SET_ZERO.replace(COLUMN, columnNames).replace(SCHEMA, schema).replace(TABLE_NAME, tableName); - } - - /** - *
-     * Construct query statements based on metadata and fragment information
-     * SELECT * FROM test.test1 WHERE b_number IN
-     * (SELECT t.b_number FROM (SELECT b_number FROM test.test1 LIMIT 0,20) t);
-     * 
- * - * @param tableMetadata Table metadata information - * @param start Start position of fragment query - * @param offset Fragment query start position fragment query displacement - * @return Return the constructed select statement - */ - private String buildSelectSqlOffset(TableMetadata tableMetadata, long start, long offset) { - List columnsMetas = tableMetadata.getColumnsMetas(); - List primaryMetas = tableMetadata.getPrimaryMetas(); - String columnNames; - String primaryKey; - String tableName = tableMetadata.getTableName(); - if (primaryMetas.size() == 1) { - columnNames = - columnsMetas.stream().map(ColumnsMetaData::getColumnName).collect(Collectors.joining(DELIMITER)); - primaryKey = primaryMetas.stream().map(ColumnsMetaData::getColumnName).collect(Collectors.joining()); - return QUERY_OFF_SET.replace(COLUMN, columnNames).replace(SCHEMA, schema).replace(TABLE_NAME, tableName) - .replace(PRIMARY_KEY, primaryKey).replace(START, String.valueOf(start)) - .replace(OFFSET, String.valueOf(offset)); - } else { - columnNames = - columnsMetas.stream().map(ColumnsMetaData::getColumnName).map(counm -> TABLE_ALIAS.concat(counm)) - .collect(Collectors.joining(DELIMITER)); - primaryKey = - primaryMetas.stream().map(ColumnsMetaData::getColumnName).collect(Collectors.joining(DELIMITER)); - String joinOn = primaryMetas.stream().map(ColumnsMetaData::getColumnName).map( - column -> TABLE_ALIAS.concat(column).concat(EQUAL_CONDITION).concat(SUB_TABLE_ALIAS).concat(column)) - .collect(Collectors.joining(AND_CONDITION)); - return QUERY_MULTIPLE_PRIMARY_KEY_OFF_SET.replace(COLUMN, columnNames).replace(SCHEMA, schema) - .replace(TABLE_NAME, tableName).replace(PRIMARY_KEY, primaryKey) - .replace(JOIN_ON, joinOn).replace(START, String.valueOf(start)) - .replace(OFFSET, String.valueOf(offset)); - } - } - - /** - * Query SQL build template - */ - interface QuerySqlMapper { - /** - * Query SQL statement columnsList fragment - */ - String COLUMN = ":columnsList"; - /** - * Query SQL statement tableName fragment - */ - String TABLE_NAME = ":tableName"; - /** - * Query SQL statement primaryKey fragment - */ - String PRIMARY_KEY = ":primaryKey"; - /** - * Query SQL statement schema fragment - */ - String SCHEMA = ":schema"; - /** - * Query SQL statement start fragment: Start position of fragment query - */ - String START = ":start"; - /** - * Query SQL statement offset fragment: Fragment query offset - */ - String OFFSET = ":offset"; - /** - * Query SQL statement joinOn fragment: Query SQL statement joinOn fragment - */ - String JOIN_ON = ":joinOn"; - /** - * Query SQL statement fragment: Query SQL statements in the scenario without offset - */ - String QUERY_OFF_SET_ZERO = "SELECT :columnsList FROM :schema.:tableName"; - /** - * Query SQL statement fragment: SQL statement for fragment query using offset in single primary key scenario - */ - String QUERY_OFF_SET = "SELECT :columnsList FROM :schema.:tableName WHERE :primaryKey IN " - + "(SELECT t.:primaryKey FROM (SELECT :primaryKey FROM :schema.:tableName order by :primaryKey " - + " LIMIT :start,:offset) t)"; - /** - * Query SQL statement fragment: SQL statement for fragment query using offset in multiple primary key scenario - */ - String QUERY_MULTIPLE_PRIMARY_KEY_OFF_SET = "SELECT :columnsList FROM :schema.:tableName a RIGHT JOIN " - + " (SELECT :primaryKey FROM :schema.:tableName order by :primaryKey LIMIT :start,:offset) b ON :joinOn"; - /** - * Query SQL statement fragment: SQL statement field spacing symbol - */ - String DELIMITER = ","; - /** - * Query SQL statement fragment: SQL statement equality condition symbol - */ - String EQUAL_CONDITION = "="; - /** - * Query SQL statement and fragment - */ - String AND_CONDITION = " and "; - /** - * Query SQL statement table alias fragment: table alias - */ - String TABLE_ALIAS = "a."; - /** - * Query SQL statement sub table alias fragment: Sub query result alias - */ - String SUB_TABLE_ALIAS = "b."; - } -} diff --git a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/task/sql/QuerySqlTemplate.java b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/task/sql/QuerySqlTemplate.java new file mode 100644 index 0000000000000000000000000000000000000000..6862e2dcaee9cba3c92e793ffe12a1d731ce9ba3 --- /dev/null +++ b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/task/sql/QuerySqlTemplate.java @@ -0,0 +1,106 @@ +/* + * Copyright (c) 2022-2022 Huawei Technologies Co.,Ltd. + * + * openGauss is licensed under Mulan PSL v2. + * You can use this software according to the terms and conditions of the Mulan PSL v2. + * You may obtain a copy of Mulan PSL v2 at: + * + * http://license.coscl.org.cn/MulanPSL2 + * + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PSL v2 for more details. + */ + +package org.opengauss.datachecker.extract.task.sql; + +/** + * Query SQL build template + * + * @author :wangchao + * @date :Created in 2022/9/2 + * @since :11 + */ +public interface QuerySqlTemplate { + /** + * Query SQL statement columnsList fragment + */ + String COLUMN = ":columnsList"; + + /** + * Query SQL statement tableName fragment + */ + String TABLE_NAME = ":tableName"; + + /** + * Query SQL statement primaryKey fragment + */ + String PRIMARY_KEY = ":primaryKey"; + + /** + * Query SQL statement schema fragment + */ + String SCHEMA = ":schema"; + + /** + * Query SQL statement start fragment: Start position of fragment query + */ + String START = ":start"; + + /** + * Query SQL statement offset fragment: Fragment query offset + */ + String OFFSET = ":offset"; + + /** + * Query SQL statement joinOn fragment: Query SQL statement joinOn fragment + */ + String JOIN_ON = ":joinOn"; + + /** + * Query SQL statement fragment: Query SQL statements in the scenario without offset + */ + String QUERY_OFF_SET_ZERO = "SELECT :columnsList FROM :schema.:tableName"; + + /** + * Query SQL statement fragment: SQL statement for fragment query using offset in single primary key scenario + */ + String QUERY_OFF_SET = "SELECT :columnsList FROM :schema.:tableName LIMIT :start,:offset"; + + /** + * Query SQL statement fragment: SQL statement for fragment query using offset in single primary key scenario + */ + String QUERY_NO_OFF_SET = "SELECT :columnsList FROM :schema.:tableName"; + + /** + * Query SQL statement fragment: SQL statement for fragment query using offset in multiple primary key scenario + */ + String QUERY_MULTIPLE_PRIMARY_KEY_OFF_SET = "SELECT :columnsList FROM :schema.:tableName a RIGHT JOIN " + + " (SELECT :primaryKey FROM :schema.:tableName order by :primaryKey LIMIT :start,:offset) b ON :joinOn"; + + /** + * Query SQL statement fragment: SQL statement field spacing symbol + */ + String DELIMITER = ","; + + /** + * Query SQL statement fragment: SQL statement equality condition symbol + */ + String EQUAL_CONDITION = "="; + + /** + * Query SQL statement and fragment + */ + String AND_CONDITION = " and "; + + /** + * Query SQL statement table alias fragment: table alias + */ + String TABLE_ALIAS = "a."; + + /** + * Query SQL statement sub table alias fragment: Sub query result alias + */ + String SUB_TABLE_ALIAS = "b."; +} diff --git a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/task/sql/SelectSqlBuilder.java b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/task/sql/SelectSqlBuilder.java new file mode 100644 index 0000000000000000000000000000000000000000..de1bd492f85adb5ac7747bcd210d193445d863b5 --- /dev/null +++ b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/task/sql/SelectSqlBuilder.java @@ -0,0 +1,223 @@ +/* + * Copyright (c) 2022-2022 Huawei Technologies Co.,Ltd. + * + * openGauss is licensed under Mulan PSL v2. + * You can use this software according to the terms and conditions of the Mulan PSL v2. + * You may obtain a copy of Mulan PSL v2 at: + * + * http://license.coscl.org.cn/MulanPSL2 + * + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PSL v2 for more details. + */ + +package org.opengauss.datachecker.extract.task.sql; + +import lombok.AllArgsConstructor; +import lombok.Getter; +import org.opengauss.datachecker.common.entry.enums.DataBaseType; +import org.opengauss.datachecker.common.entry.extract.ColumnsMetaData; +import org.opengauss.datachecker.common.entry.extract.TableMetadata; +import org.springframework.lang.NonNull; +import org.springframework.util.Assert; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.stream.Collectors; + +import static org.opengauss.datachecker.extract.task.sql.QuerySqlTemplate.COLUMN; +import static org.opengauss.datachecker.extract.task.sql.QuerySqlTemplate.DELIMITER; +import static org.opengauss.datachecker.extract.task.sql.QuerySqlTemplate.OFFSET; +import static org.opengauss.datachecker.extract.task.sql.QuerySqlTemplate.SCHEMA; +import static org.opengauss.datachecker.extract.task.sql.QuerySqlTemplate.START; +import static org.opengauss.datachecker.extract.task.sql.QuerySqlTemplate.TABLE_ALIAS; +import static org.opengauss.datachecker.extract.task.sql.QuerySqlTemplate.TABLE_NAME; + +/** + * OpenGaussSelectSqlBuilder Data extraction SQL builder + * + * @author wang chao + * @date 2022/5/12 19:17 + * @since 11 + **/ +public class SelectSqlBuilder { + private static final Map SQL_GENERATE = new HashMap<>(); + private static final long OFF_SET_ZERO = 0L; + private static final SqlGenerateTemplate GENERATE_TEMPLATE = + (template, sqlGenerateMeta) -> template.replace(COLUMN, sqlGenerateMeta.getColumns()) + .replace(SCHEMA, sqlGenerateMeta.getSchema()) + .replace(TABLE_NAME, sqlGenerateMeta.getTableName()) + .replace(START, String.valueOf(sqlGenerateMeta.getStart())) + .replace(OFFSET, String.valueOf(sqlGenerateMeta.getOffset())); + private static final SqlGenerateTemplate NO_OFFSET_SQL_GENERATE_TEMPLATE = + (template, sqlGenerateMeta) -> template.replace(COLUMN, sqlGenerateMeta.getColumns()) + .replace(SCHEMA, sqlGenerateMeta.getSchema()) + .replace(TABLE_NAME, sqlGenerateMeta.getTableName()); + private static final SqlGenerate OFFSET_GENERATE = + (sqlGenerateMeta) -> GENERATE_TEMPLATE.replace(QuerySqlTemplate.QUERY_OFF_SET, sqlGenerateMeta); + private static final SqlGenerate NO_OFFSET_GENERATE = (sqlGenerateMeta) -> NO_OFFSET_SQL_GENERATE_TEMPLATE + .replace(QuerySqlTemplate.QUERY_NO_OFF_SET, sqlGenerateMeta); + + static { + SQL_GENERATE.put(DataBaseType.MS, OFFSET_GENERATE); + SQL_GENERATE.put(DataBaseType.OG, OFFSET_GENERATE); + SQL_GENERATE.put(DataBaseType.O, OFFSET_GENERATE); + } + + private String schema; + private TableMetadata tableMetadata; + private long start = 0L; + private long offset = 0L; + private DataBaseType dataBaseType; + + /** + * Table fragment query SQL Statement Builder + * + * @param tableMetadata tableMetadata + * @param schema schema + */ + public SelectSqlBuilder(TableMetadata tableMetadata, String schema) { + this.tableMetadata = tableMetadata; + this.schema = schema; + } + + /** + * Table fragment query SQL Statement Builder + * + * @param start start + * @param offset offset + * @return builder + */ + public SelectSqlBuilder offset(long start, long offset) { + this.start = start; + this.offset = offset; + return this; + } + + /** + * set param dataBaseType + * + * @param dataBaseType dataBaseType + * @return builder + */ + public SelectSqlBuilder dataBaseType(DataBaseType dataBaseType) { + this.dataBaseType = dataBaseType; + return this; + } + + /** + * Table fragment query SQL Statement Builder + * + * @return build sql + */ + public String builder() { + Assert.isTrue(Objects.nonNull(tableMetadata), Message.TABLE_METADATA_NULL_NOT_TO_BUILD_SQL); + List columnsMetas = tableMetadata.getColumnsMetas(); + Assert.notEmpty(columnsMetas, Message.COLUMN_METADATA_EMPTY_NOT_TO_BUILD_SQL); + if (offset == OFF_SET_ZERO) { + return buildSelectSqlOffsetZero(columnsMetas, tableMetadata.getTableName()); + } else { + return buildSelectSqlOffset(tableMetadata, start, offset); + } + } + + /** + *
+     * Construct query statements based on metadata and fragment information
+     * SELECT * FROM test.test1 LIMIT 0,20
+     * 
+ * + * @param tableMetadata Table metadata information + * @param start Start position of fragment query + * @param offset Fragment query start position fragment query displacement + * @return Return the constructed select statement + */ + public String buildSelectSqlOffset(TableMetadata tableMetadata, long start, long offset) { + List columnsMetas = tableMetadata.getColumnsMetas(); + String tableName = tableMetadata.getTableName(); + String columnNames = getColumnNameList(columnsMetas); + SqlGenerateMeta sqlGenerateMeta = new SqlGenerateMeta(schema, tableName, columnNames, start, offset); + return getSqlGenerate(dataBaseType).replace(sqlGenerateMeta); + } + + /** + * Construct query statements based on metadata information SELECT * FROM test.test1 + * + * @param columnsMetas Column metadata information + * @param tableName tableName + * @return sql + */ + private String buildSelectSqlOffsetZero(List columnsMetas, String tableName) { + String columnNames = getColumnNameList(columnsMetas); + SqlGenerateMeta sqlGenerateMeta = new SqlGenerateMeta(schema, tableName, columnNames, 0, 0); + return NO_OFFSET_GENERATE.replace(sqlGenerateMeta); + } + + private static String getColumnNameList(@NonNull List columnsMetas) { + return columnsMetas.stream().map(ColumnsMetaData::getColumnName).collect(Collectors.joining(DELIMITER)); + } + + /** + * get column names with alias + * + * @param columnsMetas columnsMetas + * @return column names + */ + private String getColumnNameListWithAlias(@NonNull List columnsMetas) { + return columnsMetas.stream().map(ColumnsMetaData::getColumnName).map(TABLE_ALIAS::concat) + .collect(Collectors.joining(DELIMITER)); + } + + private SqlGenerate getSqlGenerate(DataBaseType dataBaseType) { + return SQL_GENERATE.get(dataBaseType); + } + + @Getter + @AllArgsConstructor + static class SqlGenerateMeta { + private final String schema; + private final String tableName; + private final String columns; + private final long start; + private final long offset; + } + + @FunctionalInterface + interface SqlGenerate { + /** + * Generate SQL statement according to SQL generator metadata object + * + * @param sqlGenerateMeta SQL generator metadata + * @return Return fragment query SQL statement + */ + String replace(SqlGenerateMeta sqlGenerateMeta); + } + + @FunctionalInterface + interface SqlGenerateTemplate { + /** + * Generate SQL statement according to SQL generator metadata object + * + * @param template SQL template + * @param sqlGenerateMeta SQL generator metadata + * @return sql + */ + String replace(String template, SqlGenerateMeta sqlGenerateMeta); + } + + interface Message { + /** + * error message tips + */ + String TABLE_METADATA_NULL_NOT_TO_BUILD_SQL = "Abnormal table metadata information, failed to build SQL"; + + /** + * error message tips + */ + String COLUMN_METADATA_EMPTY_NOT_TO_BUILD_SQL = "Abnormal column metadata information, failed to build SQL"; + } +} diff --git a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/util/MetaDataUtil.java b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/util/MetaDataUtil.java index 1b044808ec294ac9ebfa2fcec7ff878e8777f4b3..20af546724dbdc341518ba15f03afce8866e2f28 100644 --- a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/util/MetaDataUtil.java +++ b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/util/MetaDataUtil.java @@ -54,9 +54,7 @@ public class MetaDataUtil { if (Objects.isNull(columnsMetas)) { return emptyList(); } - return columnsMetas.stream() - .sorted(Comparator.comparing(ColumnsMetaData::getOrdinalPosition)) - .map(ColumnsMetaData::getColumnName) - .collect(Collectors.toList()); + return columnsMetas.stream().sorted(Comparator.comparing(ColumnsMetaData::getOrdinalPosition)) + .map(ColumnsMetaData::getColumnName).collect(Collectors.toUnmodifiableList()); } } diff --git a/datachecker-extract/src/main/resources/application-sink.yml b/datachecker-extract/src/main/resources/application-sink.yml index 7c5e5c2bd89ee1b2a464d68a6bf11e88c7e19e5e..f8db9f995eaebee849a47bbe7e7e8750b2e19087 100644 --- a/datachecker-extract/src/main/resources/application-sink.yml +++ b/datachecker-extract/src/main/resources/application-sink.yml @@ -16,9 +16,8 @@ spring: schema: jack databaseType: OG #OG opengauss endpoint: SINK - query-table-row-count: true debezium-enable: false - sync-extract: false + sync-extract: true datasource: druid: diff --git a/datachecker-extract/src/main/resources/application.yml b/datachecker-extract/src/main/resources/application.yml index e777369b2eb5860e35dfcd6f552905a734be1e15..ee6bb242727aea326d846eb4a7c882d5e31ed812 100644 --- a/datachecker-extract/src/main/resources/application.yml +++ b/datachecker-extract/src/main/resources/application.yml @@ -12,8 +12,8 @@ spring: producer: retries: 1 acks: all - batch-size: 262144 # 256K - buffer-memory: 134217728 # 128M + batch-size: 1048576 + buffer-memory: 536870912 key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer @@ -25,7 +25,8 @@ spring: key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer fetch-min-size: 1 - max-poll-records: 10000 + max-poll-records: 20000 + fetch-max-bytes: 536870912 # 512M feign: okhttp: diff --git a/datachecker-extract/src/test/java/org/opengauss/datachecker/extract/task/SelectSqlBulderTest.java b/datachecker-extract/src/test/java/org/opengauss/datachecker/extract/task/SelectSqlBulderTest.java deleted file mode 100644 index a8f5d1183a9c5b5fb3dce3e3beaffcba801f2708..0000000000000000000000000000000000000000 --- a/datachecker-extract/src/test/java/org/opengauss/datachecker/extract/task/SelectSqlBulderTest.java +++ /dev/null @@ -1,134 +0,0 @@ -/* - * Copyright (c) 2022-2022 Huawei Technologies Co.,Ltd. - * - * openGauss is licensed under Mulan PSL v2. - * You can use this software according to the terms and conditions of the Mulan PSL v2. - * You may obtain a copy of Mulan PSL v2 at: - * - * http://license.coscl.org.cn/MulanPSL2 - * - * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, - * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, - * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. - * See the Mulan PSL v2 for more details. - */ - -package org.opengauss.datachecker.extract.task; - -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.extension.ExtendWith; -import org.mockito.Mock; -import org.mockito.junit.jupiter.MockitoExtension; -import org.opengauss.datachecker.common.entry.enums.ColumnKey; -import org.opengauss.datachecker.common.entry.extract.ColumnsMetaData; -import org.opengauss.datachecker.common.entry.extract.TableMetadata; - -import java.util.List; - -import static org.assertj.core.api.Assertions.assertThat; -import static org.mockito.Mockito.when; - -/** - * SelectSqlBulderTest - * - * @author :wangchao - * @date :Created in 2022/5/14 - * @since :11 - */ -@ExtendWith(MockitoExtension.class) -class SelectSqlBulderTest { - @Mock - private TableMetadata mockTableMetadata; - - private SelectSqlBulder selectSqlBulderUnderTest; - - @BeforeEach - void setUp() { - selectSqlBulderUnderTest = new SelectSqlBulder(mockTableMetadata, "test", 0L, 0L); - } - - @Test - void testBuilder() { - // Configure TableMetadata.getColumnsMetas(...). - final ColumnsMetaData columnsMeta1 = new ColumnsMetaData(); - columnsMeta1.setTableName("tableName"); - columnsMeta1.setColumnName("columnName1"); - columnsMeta1.setColumnType("columnType"); - columnsMeta1.setDataType("dataType"); - columnsMeta1.setOrdinalPosition(2); - final List columnsMetaData = List.of(columnsMeta1); - when(mockTableMetadata.getColumnsMetas()).thenReturn(columnsMetaData); - when(mockTableMetadata.getTableName()).thenReturn("tableName"); - // Run the test - final String result = selectSqlBulderUnderTest.builder(); - // Verify the results - assertThat(result).isEqualTo("SELECT columnName1 FROM test.tableName"); - } - - @Test - void testBuilderOffSet() { - selectSqlBulderUnderTest = new SelectSqlBulder(mockTableMetadata, "test", 0L, 1000L); - // Setup - // Configure TableMetadata.getPrimaryMetas(...). - final ColumnsMetaData columnsMetaPri = new ColumnsMetaData(); - columnsMetaPri.setTableName("tableName"); - columnsMetaPri.setColumnName("columnName1"); - columnsMetaPri.setColumnType("columnType"); - columnsMetaPri.setDataType("dataType"); - columnsMetaPri.setOrdinalPosition(1); - columnsMetaPri.setColumnKey(ColumnKey.PRI); - final List primaryList = List.of(columnsMetaPri); - when(mockTableMetadata.getPrimaryMetas()).thenReturn(primaryList); - // Configure TableMetadata.getColumnsMetas(...). - final ColumnsMetaData columnsMeta1 = new ColumnsMetaData(); - columnsMeta1.setTableName("tableName"); - columnsMeta1.setColumnName("columnName2"); - columnsMeta1.setColumnType("columnType"); - columnsMeta1.setDataType("dataType"); - columnsMeta1.setOrdinalPosition(2); - final List columnsMetaData = List.of(columnsMetaPri, columnsMeta1); - when(mockTableMetadata.getColumnsMetas()).thenReturn(columnsMetaData); - when(mockTableMetadata.getTableName()).thenReturn("tableName"); - // Run the test - final String result = selectSqlBulderUnderTest.builder(); - // Verify the results - assertThat(result).isEqualTo("SELECT columnName1,columnName2 FROM test.tableName WHERE columnName1 IN " - + "(SELECT t.columnName1 FROM (SELECT columnName1 FROM test.tableName order by columnName1" - + " LIMIT 0,1000) t)"); - } - - @Test - void testBuilderMuliPrimaryLeyOffSet() { - selectSqlBulderUnderTest = new SelectSqlBulder(mockTableMetadata, "test", 0L, 1000L); - // Setup - // Configure TableMetadata.getPrimaryMetas(...). - final ColumnsMetaData columnsMetaPri = new ColumnsMetaData(); - columnsMetaPri.setTableName("tableName"); - columnsMetaPri.setColumnName("columnName1"); - columnsMetaPri.setColumnType("columnType"); - columnsMetaPri.setDataType("dataType"); - columnsMetaPri.setOrdinalPosition(1); - columnsMetaPri.setColumnKey(ColumnKey.PRI); - - // Configure TableMetadata.getColumnsMetas(...). - final ColumnsMetaData columnsMeta1 = new ColumnsMetaData(); - columnsMeta1.setTableName("tableName"); - columnsMeta1.setColumnName("columnName2"); - columnsMeta1.setColumnType("columnType"); - columnsMeta1.setDataType("dataType"); - columnsMeta1.setOrdinalPosition(2); - columnsMeta1.setColumnKey(ColumnKey.PRI); - final List columnsMetaData = List.of(columnsMetaPri, columnsMeta1); - final List primaryList = List.of(columnsMetaPri, columnsMeta1); - when(mockTableMetadata.getPrimaryMetas()).thenReturn(primaryList); - when(mockTableMetadata.getColumnsMetas()).thenReturn(columnsMetaData); - when(mockTableMetadata.getTableName()).thenReturn("tableName"); - // Run the test - final String result = selectSqlBulderUnderTest.builder(); - // Verify the results - assertThat(result).isEqualTo("SELECT a.columnName1,a.columnName2 FROM test.tableName a RIGHT JOIN " - + " (SELECT columnName1,columnName2 FROM test.tableName order by columnName1,columnName2 LIMIT 0,1000) b" - + " ON a.columnName1=b.columnName1 and a.columnName2=b.columnName2"); - } -} diff --git a/datachecker-extract/src/test/java/org/opengauss/datachecker/extract/task/sql/SelectSqlBuilderTest.java b/datachecker-extract/src/test/java/org/opengauss/datachecker/extract/task/sql/SelectSqlBuilderTest.java new file mode 100644 index 0000000000000000000000000000000000000000..dcfcd0454dbb7d70ab286f01e9eefa0523fa50c4 --- /dev/null +++ b/datachecker-extract/src/test/java/org/opengauss/datachecker/extract/task/sql/SelectSqlBuilderTest.java @@ -0,0 +1,122 @@ +/* + * Copyright (c) 2022-2022 Huawei Technologies Co.,Ltd. + * + * openGauss is licensed under Mulan PSL v2. + * You can use this software according to the terms and conditions of the Mulan PSL v2. + * You may obtain a copy of Mulan PSL v2 at: + * + * http://license.coscl.org.cn/MulanPSL2 + * + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PSL v2 for more details. + */ + +package org.opengauss.datachecker.extract.task.sql; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.opengauss.datachecker.common.entry.enums.ColumnKey; +import org.opengauss.datachecker.common.entry.enums.DataBaseType; +import org.opengauss.datachecker.common.entry.extract.ColumnsMetaData; +import org.opengauss.datachecker.common.entry.extract.TableMetadata; +import org.opengauss.datachecker.extract.task.sql.SelectSqlBuilder.Message; + +import java.util.List; + +import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy; +import static org.assertj.core.api.AssertionsForInterfaceTypes.assertThat; +import static org.mockito.Mockito.when; + +/** + * SelectSqlBuilderTest + * + * @author :wangchao + * @date :Created in 2022/9/2 + * @since :11 + */ +@ExtendWith(MockitoExtension.class) +class SelectSqlBuilderTest { + @Mock + private TableMetadata mockTableMetadata; + private SelectSqlBuilder selectSqlBuilder; + + @BeforeEach + void setUp() { + selectSqlBuilder = new SelectSqlBuilder(mockTableMetadata, "schema"); + } + + /** + * testBuilder + */ + @Test + void testBuilder() { + // Setup + // Configure TableMetadata.getColumnsMetas(...). + final ColumnsMetaData columnsMetaData1 = new ColumnsMetaData(); + columnsMetaData1.setTableName("tableName"); + columnsMetaData1.setColumnName("columnName"); + columnsMetaData1.setColumnType("columnType"); + columnsMetaData1.setDataType("dataType"); + columnsMetaData1.setOrdinalPosition(0); + columnsMetaData1.setColumnKey(ColumnKey.PRI); + final List columnsMetaData = List.of(columnsMetaData1); + when(mockTableMetadata.getColumnsMetas()).thenReturn(columnsMetaData); + + when(mockTableMetadata.getTableName()).thenReturn("tableName"); + // Run the test + final String result = selectSqlBuilder.dataBaseType(DataBaseType.MS).offset(0, 120).builder(); + + // Verify the results + assertThat(result).isEqualTo("SELECT columnName FROM schema.tableName LIMIT 0,120"); + } + + /** + * testBuilder_TableMetadataGetColumnsMetasReturnsNoItems + */ + @Test + void testBuilder_TableMetadataGetColumnsMetasReturnsNoItems() { + // Run the test + assertThatThrownBy(() -> selectSqlBuilder.builder()).isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining( + Message.COLUMN_METADATA_EMPTY_NOT_TO_BUILD_SQL); + } + + /** + * testBuildSelectSqlOffset + */ + @Test + void testBuildSelectSqlOffset() { + // Setup + final TableMetadata tableMetadata = new TableMetadata(); + tableMetadata.setTableName("tableName"); + tableMetadata.setTableRows(0L); + final ColumnsMetaData columnsMetaData = new ColumnsMetaData(); + columnsMetaData.setTableName("tableName"); + columnsMetaData.setColumnName("pk_columnName1"); + columnsMetaData.setColumnType("pk_columnType1"); + columnsMetaData.setDataType("dataType"); + columnsMetaData.setOrdinalPosition(0); + columnsMetaData.setColumnKey(ColumnKey.PRI); + tableMetadata.setPrimaryMetas(List.of(columnsMetaData)); + final ColumnsMetaData columnsMetaData1 = new ColumnsMetaData(); + columnsMetaData1.setTableName("tableName"); + columnsMetaData1.setColumnName("columnName2"); + columnsMetaData1.setColumnType("columnType2"); + columnsMetaData1.setDataType("dataType"); + columnsMetaData1.setOrdinalPosition(0); + columnsMetaData1.setColumnKey(ColumnKey.PRI); + tableMetadata.setColumnsMetas(List.of(columnsMetaData, columnsMetaData1)); + + // Run the test + final String result = + selectSqlBuilder.dataBaseType(DataBaseType.MS).offset(0, 120).buildSelectSqlOffset(tableMetadata, 0L, 100L); + + // Verify the results + assertThat(result).isEqualTo("SELECT pk_columnName1,columnName2 FROM schema.tableName LIMIT 0,100"); + } +}