diff --git a/datachecker-check/src/main/java/org/opengauss/datachecker/check/modules/check/DataCheckRunnable.java b/datachecker-check/src/main/java/org/opengauss/datachecker/check/modules/check/DataCheckRunnable.java index f820788cf1a301aa4be8a29d329a7ffb238db3c6..903d9a2b184a1dbde4c0e54cd58ee0e1791e2e17 100644 --- a/datachecker-check/src/main/java/org/opengauss/datachecker/check/modules/check/DataCheckRunnable.java +++ b/datachecker-check/src/main/java/org/opengauss/datachecker/check/modules/check/DataCheckRunnable.java @@ -415,6 +415,6 @@ public class DataCheckRunnable implements Runnable { } private void resetThreadName(String tableName, int partitions) { - Thread.currentThread().setName(THREAD_NAME_PRIFEX + tableName.toUpperCase(Locale.ROOT) + "_" + partitions); + Thread.currentThread().setName(THREAD_NAME_PRIFEX + tableName + "_" + partitions); } } diff --git a/datachecker-check/src/main/java/org/opengauss/datachecker/check/modules/check/ExportCheckResult.java b/datachecker-check/src/main/java/org/opengauss/datachecker/check/modules/check/ExportCheckResult.java index 6c73654127faafc7ba5793eed2c1d02cfb27b72b..782607cb2859406eb7026020a0d6e21e525854fe 100644 --- a/datachecker-check/src/main/java/org/opengauss/datachecker/check/modules/check/ExportCheckResult.java +++ b/datachecker-check/src/main/java/org/opengauss/datachecker/check/modules/check/ExportCheckResult.java @@ -39,7 +39,7 @@ public class ExportCheckResult { private static final String CHECK_RESULT_PATH = File.separator + "result" + File.separator; public static void export(String path, CheckDiffResult result) { - String fileName = getCheckResultFileName(path, result.getTable(), result.getPartitions()); + String fileName = getCheckResultFileName(path, result.getTopic(), result.getPartitions()); FileUtils.writeAppendFile(fileName, JsonObjectUtil.format(result)); } diff --git a/datachecker-check/src/main/java/org/opengauss/datachecker/check/modules/check/KafkaConsumerHandler.java b/datachecker-check/src/main/java/org/opengauss/datachecker/check/modules/check/KafkaConsumerHandler.java index f53642a0e344a13531034aa9e10304f106f5d433..4facaa41bf753c5400f5185352f83e61b46e57ab 100644 --- a/datachecker-check/src/main/java/org/opengauss/datachecker/check/modules/check/KafkaConsumerHandler.java +++ b/datachecker-check/src/main/java/org/opengauss/datachecker/check/modules/check/KafkaConsumerHandler.java @@ -17,23 +17,18 @@ package org.opengauss.datachecker.check.modules.check; import com.alibaba.fastjson.JSON; import lombok.extern.slf4j.Slf4j; -import org.apache.commons.collections4.CollectionUtils; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.TopicPartition; import org.opengauss.datachecker.common.entry.extract.RowDataHash; import org.opengauss.datachecker.common.entry.extract.Topic; -import org.opengauss.datachecker.common.util.ThreadUtil; import java.time.Duration; -import java.util.ArrayList; -import java.util.Collections; import java.util.HashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; -import java.util.concurrent.atomic.AtomicInteger; /** * KafkaConsumerHandler @@ -44,11 +39,9 @@ import java.util.concurrent.atomic.AtomicInteger; */ @Slf4j public class KafkaConsumerHandler { - private static final int RETRY_FETCH_RECORD_INTERVAL = 1000; private static final int KAFKA_CONSUMER_POLL_DURATION = 20; private final KafkaConsumer kafkaConsumer; - private final int retryFetchRecordTimes; /** * Constructor @@ -58,7 +51,6 @@ public class KafkaConsumerHandler { */ public KafkaConsumerHandler(KafkaConsumer consumer, int retryTimes) { kafkaConsumer = consumer; - retryFetchRecordTimes = retryTimes; } /** @@ -81,27 +73,30 @@ public class KafkaConsumerHandler { * @return kafka partitions data */ public List queryRowData(Topic topic, int partitions, boolean shouldChangeConsumerGroup) { - List data = Collections.synchronizedList(new LinkedList<>()); + List data = new LinkedList<>(); final TopicPartition topicPartition = new TopicPartition(topic.getTopicName(), partitions); kafkaConsumer.assign(List.of(topicPartition)); + long endOfOffset = getEndOfOffset(topicPartition); + long beginOfOffset = beginningOffsets(topicPartition); if (shouldChangeConsumerGroup) { resetOffsetToBeginning(kafkaConsumer, topicPartition); } - consumerTopicRecords(data, kafkaConsumer); - AtomicInteger retryTimes = new AtomicInteger(0); - final String groupId = kafkaConsumer.groupMetadata().groupId(); - while (CollectionUtils.isEmpty(data) && retryTimes.incrementAndGet() <= retryFetchRecordTimes) { - ThreadUtil.sleep(RETRY_FETCH_RECORD_INTERVAL); - resetOffsetToBeginning(kafkaConsumer, topicPartition); - log.debug("consumer group={} topic=[{}] partitions=[{}] empty ,retryTimes=[{}]", groupId, - topic.getTopicName(), partitions, retryTimes.get()); - consumerTopicRecords(data, kafkaConsumer); - } - log.debug("consumer group={} topic=[{}] partitions=[{}] dataList=[{}]", groupId, topic.getTopicName(), - partitions, data.size()); + consumerTopicRecords(data, kafkaConsumer, endOfOffset); + log.debug("consumer topic=[{}] partitions=[{}] dataList=[{}] ,beginOfOffset={},endOfOffset={}", + topic.getTopicName(), partitions, data.size(), beginOfOffset, endOfOffset); return data; } + private long getEndOfOffset(TopicPartition topicPartition) { + final Map topicPartitionLongMap = kafkaConsumer.endOffsets(List.of(topicPartition)); + return topicPartitionLongMap.get(topicPartition); + } + + private long beginningOffsets(TopicPartition topicPartition) { + final Map topicPartitionLongMap = kafkaConsumer.beginningOffsets(List.of(topicPartition)); + return topicPartitionLongMap.get(topicPartition); + } + private void resetOffsetToBeginning(KafkaConsumer consumer, TopicPartition topicPartition) { Map offset = new HashMap<>(); consumer.seekToBeginning(List.of(topicPartition)); @@ -110,21 +105,21 @@ public class KafkaConsumerHandler { consumer.commitSync(offset); } - private void consumerTopicRecords(List data, KafkaConsumer kafkaConsumer) { - List result = getTopicRecords(kafkaConsumer); - while (CollectionUtils.isNotEmpty(result)) { - data.addAll(result); - result = getTopicRecords(kafkaConsumer); + private void consumerTopicRecords(List data, KafkaConsumer kafkaConsumer, + long endOfOffset) { + if (endOfOffset == 0) { + return; + } + while (endOfOffset > data.size()) { + getTopicRecords(data, kafkaConsumer); } } - private List getTopicRecords(KafkaConsumer kafkaConsumer) { - List dataList = new ArrayList<>(); + private void getTopicRecords(List dataList, KafkaConsumer kafkaConsumer) { ConsumerRecords consumerRecords = kafkaConsumer.poll(Duration.ofMillis(KAFKA_CONSUMER_POLL_DURATION)); consumerRecords.forEach(record -> { dataList.add(JSON.parseObject(record.value(), RowDataHash.class)); }); - return dataList; } } diff --git a/datachecker-check/src/main/java/org/opengauss/datachecker/check/modules/merkle/MerkleTree.java b/datachecker-check/src/main/java/org/opengauss/datachecker/check/modules/merkle/MerkleTree.java index 219ce090699245a386b9850aeb8e7bebe8cafbac..f5bebbd6c3605b98448e10d54d350d98429b1d65 100644 --- a/datachecker-check/src/main/java/org/opengauss/datachecker/check/modules/merkle/MerkleTree.java +++ b/datachecker-check/src/main/java/org/opengauss/datachecker/check/modules/merkle/MerkleTree.java @@ -87,9 +87,9 @@ public class MerkleTree { * @param bucketList bucketList */ public MerkleTree(List bucketList) { - log.info("MerkleTree init bucket size={}", bucketList.size()); + log.debug("MerkleTree init bucket size={}", bucketList.size()); constructTree(bucketList); - log.info("MerkleTree root node depth={},nnodes={}", depth, nnodes); + log.debug("MerkleTree root node depth={},nnodes={}", depth, nnodes); } /** diff --git a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/controller/KafkaManagerController.java b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/controller/KafkaManagerController.java index 4d074dd2f19eff50d9a5e4f5e5a1b3f8429aadbb..44185b254968c8b5e7b44a59a050610899f153ba 100644 --- a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/controller/KafkaManagerController.java +++ b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/controller/KafkaManagerController.java @@ -93,17 +93,6 @@ public class KafkaManagerController { return Result.success(kafkaManagerService.createTopic(process, tableName, partitions)); } - /** - * Query the list of all topic names - * - * @return Topic name list - */ - @Operation(summary = "Query the list of all topic names") - @GetMapping("/extract/query/topic") - public Result> queryTopicData() { - return Result.success(kafkaManagerService.getAllTopic()); - } - /** * Query topic information of the specified table name * diff --git a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/dao/DataBaseMetaDataDAOImpl.java b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/dao/DataBaseMetaDataDAOImpl.java index 34c36d8ec79ae3c073e39c921609d90c6edcde9d..c392dd3381751f2bbcc9eee9544a1a7e279d5499 100644 --- a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/dao/DataBaseMetaDataDAOImpl.java +++ b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/dao/DataBaseMetaDataDAOImpl.java @@ -21,6 +21,7 @@ import org.opengauss.datachecker.common.constant.Constants; import org.opengauss.datachecker.common.entry.enums.CheckBlackWhiteMode; import org.opengauss.datachecker.common.entry.enums.ColumnKey; import org.opengauss.datachecker.common.entry.enums.DataBaseMeta; +import org.opengauss.datachecker.common.entry.enums.DataBaseType; import org.opengauss.datachecker.common.entry.extract.ColumnsMetaData; import org.opengauss.datachecker.common.entry.extract.TableMetadata; import org.opengauss.datachecker.common.util.EnumUtil; @@ -162,14 +163,25 @@ public class DataBaseMetaDataDAOImpl implements MetaDataDAO { final List tableMetadata = new ArrayList<>(); String sqlQueryTableRowCount = MetaSqlMapper.getTableCount(); final String schema = getSchema(); + final Boolean isConvertTableName = isOpenGauss(); + tableNameList.forEach(tableName -> { - final Long rowCount = - JdbcTemplateOne.queryForObject(String.format(sqlQueryTableRowCount, schema, tableName), Long.class); + final Long rowCount = JdbcTemplateOne.queryForObject( + String.format(sqlQueryTableRowCount, schema, isConvertTableName ? convert(tableName) : tableName), + Long.class); tableMetadata.add(new TableMetadata().setTableName(tableName).setTableRows(rowCount)); }); return tableMetadata; } + private Boolean isOpenGauss() { + return Objects.equals(extractProperties.getDatabaseType(), DataBaseType.OG); + } + + private String convert(String tableName) { + return "\"" + tableName + "\""; + } + @Override public List queryColumnMetadata(String tableName) { return queryColumnMetadata(List.of(tableName)); diff --git a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/dml/UpdateDmlBuilder.java b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/dml/UpdateDmlBuilder.java new file mode 100644 index 0000000000000000000000000000000000000000..1e889a50c0ea130859e8fdec0e1e719f1305a997 --- /dev/null +++ b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/dml/UpdateDmlBuilder.java @@ -0,0 +1,85 @@ +/* + * Copyright (c) 2022-2022 Huawei Technologies Co.,Ltd. + * + * openGauss is licensed under Mulan PSL v2. + * You can use this software according to the terms and conditions of the Mulan PSL v2. + * You may obtain a copy of Mulan PSL v2 at: + * + * http://license.coscl.org.cn/MulanPSL2 + * + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PSL v2 for more details. + */ + +package org.opengauss.datachecker.extract.dml; + +import org.opengauss.datachecker.common.entry.extract.ColumnsMetaData; + +import javax.validation.constraints.NotNull; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +/** + * UpdateDmlBuilder + * + * @author :wangchao + * @date :Created in 2022/6/14 + * @since :11 + */ +public class UpdateDmlBuilder extends DmlBuilder { + + /** + * build Schema + * + * @param schema Schema + * @return InsertDMLBuilder + */ + public UpdateDmlBuilder schema(@NotNull String schema) { + super.buildSchema(schema); + return this; + } + + /** + * build tableName + * + * @param tableName tableName + * @return InsertDMLBuilder + */ + public UpdateDmlBuilder tableName(@NotNull String tableName) { + super.buildTableName(tableName); + return this; + } + + /** + * build SQL column statement fragment + * + * @param columnsMetas Field Metadata + * @return InsertDMLBuilder + */ + public UpdateDmlBuilder columns(@NotNull List columnsMetas) { + columns = columnsMetas.stream().map(ColumnsMetaData::getColumnName).collect(Collectors.joining(DELIMITER)); + return this; + } + + /** + * build SQL column value statement fragment + * + * @param columnsMetaList Field Metadata + * @return InsertDMLBuilder + */ + public UpdateDmlBuilder columnsValue(@NotNull Map columnsValue, + @NotNull List columnsMetaList) { + List valueList = new ArrayList<>(columnsValueList(columnsValue, columnsMetaList)); + this.columnsValue = String.join(DELIMITER, valueList); + return this; + } + + public String build() { + return Fragment.DML_REPLACE.replace(Fragment.SCHEMA, schema).replace(Fragment.TABLE_NAME, tableName) + .replace(Fragment.COLUMNS, columns).replace(Fragment.VALUE, columnsValue); + } +} diff --git a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/kafka/KafkaCommonService.java b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/kafka/KafkaCommonService.java index 73331b607fe20caee0a4c66cc6c61158d2978f06..8ca557a3c13d2fa72b7ead13d5db942f1d32de7b 100644 --- a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/kafka/KafkaCommonService.java +++ b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/kafka/KafkaCommonService.java @@ -20,13 +20,16 @@ import lombok.extern.slf4j.Slf4j; import org.opengauss.datachecker.common.entry.enums.Endpoint; import org.opengauss.datachecker.common.entry.extract.Topic; import org.opengauss.datachecker.extract.config.ExtractProperties; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.lang.NonNull; import org.springframework.stereotype.Service; import java.util.HashMap; +import java.util.List; import java.util.Locale; import java.util.Map; import java.util.Objects; +import java.util.stream.Collectors; /** * KafkaCommonService @@ -40,20 +43,14 @@ import java.util.Objects; @RequiredArgsConstructor public class KafkaCommonService { /** - * Full calibration extraction topic name template TOPIC_EXTRACT_%s_%s_

- * The first % is the endpoint {@link Endpoint} + * Rules for generating topic names for full verification + * process_endpoint_tableName_code * The second % is the process verification process number - * Last splicing table name - */ - private static final String TOPIC_PROCESS_PREFIX = "TOPIC_EXTRACT_%s_%s_"; - - /** - * Full calibration extraction topic name template TOPIC_EXTRACT_%s_

* The first % is the endpoint {@link Endpoint} - * Used to batch query all topic names created by the verification process in Kafka + * table name + * Last splicing table name upper or lower code */ - private static final String TOPIC_PREFIX_PR = "TOPIC_EXTRACT_%s_"; - private static final String TOPIC_PREFIX = "TOPIC_EXTRACT_"; + private static final String TOPIC_TEMPLATE = "%s_%s_%s_%s"; /** * Incremental verification topic prefix @@ -63,36 +60,9 @@ public class KafkaCommonService { private static final Map TABLE_TOPIC_CACHE = new HashMap<>(); private static final Map DEBEZIUM_TOPIC_CACHE = new HashMap<>(); + @Autowired private final ExtractProperties extractProperties; - /** - * Get data verification Kafka topic prefix - * - * @param process Verification process No - * @return topic prefix - */ - public String getTopicPrefixProcess(String process) { - return String.format(TOPIC_PROCESS_PREFIX, extractProperties.getEndpoint().getCode(), process); - } - - /** - * Get data verification Kafka topic prefix - * - * @return Data verification Kafka topic prefix - */ - public String getTopicPrefixEndpoint() { - return String.format(TOPIC_PREFIX_PR, extractProperties.getEndpoint().getCode()); - } - - /** - * Get data verification Kafka topic prefix - * - * @return topic prefix - */ - public String getTopicPrefix() { - return TOPIC_PREFIX; - } - /** * Get the corresponding topic according to the table name * @@ -117,8 +87,7 @@ public class KafkaCommonService { synchronized (LOCK) { topic = TABLE_TOPIC_CACHE.get(tableName); if (Objects.isNull(topic)) { - topic = new Topic().setTableName(tableName).setTopicName( - getTopicPrefixProcess(process).concat(tableName.toUpperCase(Locale.ROOT))) + topic = new Topic().setTableName(tableName).setTopicName(createTopicName(process, tableName)) .setPartitions(calcPartitions(divisions)); TABLE_TOPIC_CACHE.put(tableName, topic); } @@ -128,6 +97,24 @@ public class KafkaCommonService { return topic; } + private String createTopicName(String process, String tableName) { + final Endpoint endpoint = extractProperties.getEndpoint(); + return String.format(TOPIC_TEMPLATE, process, endpoint.getCode(), tableName, letterCaseEncoding(tableName)); + } + + private String letterCaseEncoding(String tableName) { + final char[] chars = tableName.toCharArray(); + StringBuilder builder = new StringBuilder(); + for (char aChar : chars) { + if (aChar >= 'A' && aChar <= 'Z') { + builder.append("1"); + } else if (aChar >= 'a' && aChar <= 'z') { + builder.append("0"); + } + } + return builder.toString(); + } + /** * Calculate the Kafka partition according to the total number of task slices. * The total number of Kafka partitions shall not exceed 10 @@ -142,6 +129,10 @@ public class KafkaCommonService { /** * Clean up table name and topic information */ + public List getAllTopicName() { + return TABLE_TOPIC_CACHE.values().stream().map(Topic::getTopicName).collect(Collectors.toList()); + } + public void cleanTopicMapping() { TABLE_TOPIC_CACHE.clear(); log.info("clear table topic cache information"); diff --git a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/kafka/KafkaManagerService.java b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/kafka/KafkaManagerService.java index f99268659770850124238d0de3e512a0208d4bd8..5d5a944a18dd5b9c55cc8591ae9956d9e0fbd79d 100644 --- a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/kafka/KafkaManagerService.java +++ b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/kafka/KafkaManagerService.java @@ -40,15 +40,6 @@ public class KafkaManagerService { private final KafkaConsumerConfig kafkaConsumerConfig; private final KafkaProducerConfig kafkaProducerConfig; - /** - * get kafka topic list - * - * @return topic list - */ - public List getAllTopic() { - return kafkaAdminService.getAllTopic(kafkaCommonService.getTopicPrefixEndpoint()); - } - /** * Create a topic according to the table name * @@ -74,7 +65,7 @@ public class KafkaManagerService { log.info("Extract service to clean up Kafka consumer information"); kafkaProducerConfig.cleanKafkaProducer(); log.info("Extract service cleanup Kafka producer mapping information"); - List topics = kafkaAdminService.getAllTopic(kafkaCommonService.getTopicPrefixProcess(processNo)); + List topics = kafkaAdminService.getAllTopic(processNo); kafkaAdminService.deleteTopic(topics); log.info("Extract service cleanup current process ({}) Kafka topics {}", processNo, topics); kafkaAdminService.deleteTopic(topics); @@ -85,11 +76,11 @@ public class KafkaManagerService { * Clear Kafka information */ public void cleanKafka() { - kafkaCommonService.cleanTopicMapping(); + final List topics = kafkaCommonService.getAllTopicName(); kafkaConsumerConfig.cleanKafkaConsumer(); kafkaProducerConfig.cleanKafkaProducer(); - List topics = kafkaAdminService.getAllTopic(kafkaCommonService.getTopicPrefix()); kafkaAdminService.deleteTopic(topics); + kafkaCommonService.cleanTopicMapping(); } /** @@ -98,7 +89,7 @@ public class KafkaManagerService { * @param processNo process */ public void deleteTopic(String processNo) { - List topics = kafkaAdminService.getAllTopic(kafkaCommonService.getTopicPrefixProcess(processNo)); + List topics = kafkaAdminService.getAllTopic(processNo); kafkaAdminService.deleteTopic(topics); } diff --git a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/service/DataExtractServiceImpl.java b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/service/DataExtractServiceImpl.java index 9c19d26e4b4fd2dce1c2ceea9f072ae3be02054d..3f504e61fee7846e497edc4024e93c72eab538f9 100644 --- a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/service/DataExtractServiceImpl.java +++ b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/service/DataExtractServiceImpl.java @@ -408,7 +408,6 @@ public class DataExtractServiceImpl implements DataExtractService { log.info("Abnormal table[{}] status, ignoring the current table increment data extraction", tableName); return; } - ThreadUtil.sleep(100); Topic topic = kafkaCommonService.getIncrementTopicInfo(tableName); kafkaAdminService.createTopic(topic.getTopicName(), topic.getPartitions()); final IncrementExtractTaskRunnable extractRunnable = diff --git a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/task/ExtractTaskRunnable.java b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/task/ExtractTaskRunnable.java index 7ed38ca346382a2dcf718350897128283fa96cfa..9d6796a9c8a5263aa625657707cef4fdca63690f 100644 --- a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/task/ExtractTaskRunnable.java +++ b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/task/ExtractTaskRunnable.java @@ -30,6 +30,7 @@ import org.opengauss.datachecker.extract.client.CheckingFeignClient; import org.opengauss.datachecker.extract.kafka.KafkaProducerWapper; import org.opengauss.datachecker.extract.task.sql.SelectSqlBuilder; import org.opengauss.datachecker.extract.util.MetaDataUtil; +import org.springframework.dao.DataAccessException; import org.springframework.jdbc.core.JdbcTemplate; import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate; @@ -148,7 +149,7 @@ public class ExtractTaskRunnable extends KafkaProducerWapper implements Runnable return; } executeTask(); - } catch (ExtractException exp) { + } catch (DataAccessException | ExtractException exp) { log.error("Data extraction task {} has some exception,{}", task.getTaskName(), exp.getMessage()); TableExtractStatusCache.addErrorList(tableName); } diff --git a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/task/OpenGaussResultSetHandler.java b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/task/OpenGaussResultSetHandler.java index 4e74da268d4ee5323b636a98aabb7ec475e3e177..93316651b7daae403f9468bca21db1ab58d3bd95 100644 --- a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/task/OpenGaussResultSetHandler.java +++ b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/task/OpenGaussResultSetHandler.java @@ -39,6 +39,7 @@ public class OpenGaussResultSetHandler extends ResultSetHandler { typeHandlers.put(OpenGaussType.BLOB, blobToString); // The openGauss jdbc driver obtains the character,character varying type as varchar + typeHandlers.put(OpenGaussType.VARCHAR, this::trim); typeHandlers.put(OpenGaussType.BPCHAR, this::trim); // date time timestamp @@ -59,6 +60,7 @@ public class OpenGaussResultSetHandler extends ResultSetHandler { interface OpenGaussType { String BYTEA = "bytea"; String BLOB = "blob"; + String VARCHAR = "varchar"; String BPCHAR = "bpchar"; String DATE = "date"; String TIME = "time"; diff --git a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/task/sql/SelectSqlBuilder.java b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/task/sql/SelectSqlBuilder.java index 8e0d3c18a75fc58a049d48e7a9be8e30335a478f..ffde8325981490b4b1b2483e8519a67e98e9c9d3 100644 --- a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/task/sql/SelectSqlBuilder.java +++ b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/task/sql/SelectSqlBuilder.java @@ -34,7 +34,6 @@ import static org.opengauss.datachecker.extract.task.sql.QuerySqlTemplate.DELIMI import static org.opengauss.datachecker.extract.task.sql.QuerySqlTemplate.OFFSET; import static org.opengauss.datachecker.extract.task.sql.QuerySqlTemplate.SCHEMA; import static org.opengauss.datachecker.extract.task.sql.QuerySqlTemplate.START; -import static org.opengauss.datachecker.extract.task.sql.QuerySqlTemplate.TABLE_ALIAS; import static org.opengauss.datachecker.extract.task.sql.QuerySqlTemplate.TABLE_NAME; /** @@ -148,24 +147,44 @@ public class SelectSqlBuilder { * @param offset Fragment query start position fragment query displacement * @return Return the constructed select statement */ - public String buildSelectSqlOffset(TableMetadata tableMetadata, long start, long offset) { + private String buildSelectSqlOffset(TableMetadata tableMetadata, long start, long offset, + boolean isConvertTableName) { List columnsMetas = tableMetadata.getColumnsMetas(); String tableName = tableMetadata.getTableName(); String columnNames = getColumnNameList(columnsMetas); - SqlGenerateMeta sqlGenerateMeta = new SqlGenerateMeta(schema, tableName, columnNames, start, offset); + SqlGenerateMeta sqlGenerateMeta; + if (isConvertTableName) { + sqlGenerateMeta = new SqlGenerateMeta(schema, convert(tableName), columnNames, start, offset); + } else { + sqlGenerateMeta = new SqlGenerateMeta(schema, tableName, columnNames, start, offset); + } return getSqlGenerate(dataBaseType).replace(sqlGenerateMeta); } - /** - * Construct query statements based on metadata information SELECT * FROM test.test1 - * - * @param columnsMetas Column metadata information - * @param tableName tableName - * @return sql - */ + private String convert(String tableName) { + return "\"" + tableName + "\""; + } + + public String buildSelectSqlOffset(TableMetadata tableMetadata, long start, long offset) { + if (Objects.equals(dataBaseType, DataBaseType.OG)) { + return buildSelectSqlOffset(tableMetadata, start, offset, true); + } + return buildSelectSqlOffset(tableMetadata, start, offset, false); + } + private String buildSelectSqlOffsetZero(List columnsMetas, String tableName) { + if (Objects.equals(dataBaseType, DataBaseType.OG)) { + return buildSelectSqlOffsetZero(columnsMetas, tableName, true); + } else { + return buildSelectSqlOffsetZero(columnsMetas, tableName, false); + } + } + + private String buildSelectSqlOffsetZero(List columnsMetas, String tableName, + boolean isConvertTableName) { String columnNames = getColumnNameList(columnsMetas); - SqlGenerateMeta sqlGenerateMeta = new SqlGenerateMeta(schema, tableName, columnNames, 0, 0); + SqlGenerateMeta sqlGenerateMeta = + new SqlGenerateMeta(schema, isConvertTableName ? convert(tableName) : tableName, columnNames, 0, 0); return NO_OFFSET_GENERATE.replace(sqlGenerateMeta); } @@ -173,17 +192,6 @@ public class SelectSqlBuilder { return columnsMetas.stream().map(ColumnsMetaData::getColumnName).collect(Collectors.joining(DELIMITER)); } - /** - * get column names with alias - * - * @param columnsMetas columnsMetas - * @return column names - */ - private String getColumnNameListWithAlias(@NonNull List columnsMetas) { - return columnsMetas.stream().map(ColumnsMetaData::getColumnName).map(TABLE_ALIAS::concat) - .collect(Collectors.joining(DELIMITER)); - } - private SqlGenerate getSqlGenerate(DataBaseType dataBaseType) { return SQL_GENERATE.get(dataBaseType); }