diff --git a/config/application.yml b/config/application.yml index 31129d029f44b44f172a97a6cadd6e9b371f4f49..7a862f4cf7e748b2aaf3b0dc0ba83143d9d9ce22 100644 --- a/config/application.yml +++ b/config/application.yml @@ -17,7 +17,7 @@ data: # 0 is not delete; 1 is delete when checked all completed ; 2 is delete when checked a table auto-delete-topic: 2 increment-max-diff-count: 1000 - max-retry-times: 20 + max-retry-times: 200 rules: # There are three types of filtering rules: table-level rules, row-level rules, and column-level rules. # Rules are configured in the form of List collection. diff --git a/datachecker-check/src/main/java/org/opengauss/datachecker/check/cache/TopicRegister.java b/datachecker-check/src/main/java/org/opengauss/datachecker/check/cache/TopicRegister.java index 40d289503eeb5d6d70beebdd77488b8fea20d3c3..5e54bd6517a0f19b1f2d255d7bd2b4d5162b4aa4 100644 --- a/datachecker-check/src/main/java/org/opengauss/datachecker/check/cache/TopicRegister.java +++ b/datachecker-check/src/main/java/org/opengauss/datachecker/check/cache/TopicRegister.java @@ -27,6 +27,8 @@ import org.springframework.stereotype.Service; import java.util.Map; import java.util.Objects; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; /** * TopicRegister @@ -40,6 +42,8 @@ public class TopicRegister { private static final Logger log = LogUtils.getLogger(); private static volatile Map TOPIC_CACHE = new ConcurrentHashMap<>(); + private final Lock lock = new ReentrantLock(); + /** * add table's topic * @@ -49,7 +53,8 @@ public class TopicRegister { * @return topic */ public Topic register(String table, int ptnNum, Endpoint endpoint) { - synchronized (table) { + lock.lock(); + try { Topic topic = TOPIC_CACHE.get(table); if (Objects.isNull(topic)) { topic = new Topic(); @@ -59,8 +64,10 @@ public class TopicRegister { TOPIC_CACHE.put(table, topic); } log.debug("register topic {}-{}", endpoint, topic.toString()); + return TOPIC_CACHE.get(table); + } finally { + lock.unlock(); } - return TOPIC_CACHE.get(table); } private void setTopicName(String table, Topic topic) { diff --git a/datachecker-check/src/main/java/org/opengauss/datachecker/check/controller/TableKafkaController.java b/datachecker-check/src/main/java/org/opengauss/datachecker/check/controller/TableKafkaController.java index c585683ea088abbe21aeff729c9f26e8b7edb951..a3b42d7526f9a49f1a7df4257ebdaa240b4583e0 100644 --- a/datachecker-check/src/main/java/org/opengauss/datachecker/check/controller/TableKafkaController.java +++ b/datachecker-check/src/main/java/org/opengauss/datachecker/check/controller/TableKafkaController.java @@ -22,7 +22,6 @@ import org.opengauss.datachecker.check.service.TableKafkaService; import org.opengauss.datachecker.common.entry.check.TopicRecordInfo; import org.opengauss.datachecker.common.entry.enums.Endpoint; import org.opengauss.datachecker.common.entry.extract.Topic; -import org.springframework.lang.NonNull; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestParam; @@ -62,14 +61,26 @@ public class TableKafkaController { /** * register topic * - * @param table tableName - * @param ptnNum ptnNum - * @param endpoint current endpoint + * @param table tableName + * @param ptnNum ptnNum * @return topic */ - @PostMapping("/register/topic") - public Topic registerTopic(@RequestParam(value = "tableName") @NotEmpty String table, - @RequestParam(value = "ptnNum") int ptnNum, @RequestParam(value = "endpoint") @NonNull Endpoint endpoint) { - return topicRegister.register(table, ptnNum, endpoint); + @PostMapping("/source/register/topic") + public Topic sourceRegisterTopic(@RequestParam(value = "tableName") @NotEmpty String table, + @RequestParam(value = "ptnNum") int ptnNum) { + return topicRegister.register(table, ptnNum, Endpoint.SOURCE); + } + + /** + * register topic + * + * @param table tableName + * @param ptnNum ptnNum + * @return topic + */ + @PostMapping("/sink/register/topic") + public Topic sinkRegisterTopic(@RequestParam(value = "tableName") @NotEmpty String table, + @RequestParam(value = "ptnNum") int ptnNum) { + return topicRegister.register(table, ptnNum, Endpoint.SINK); } } diff --git a/datachecker-check/src/main/java/org/opengauss/datachecker/check/service/CsvProcessManagement.java b/datachecker-check/src/main/java/org/opengauss/datachecker/check/service/CsvProcessManagement.java index 27f36fe74557b8c2c0826d27400a31df22a25820..237cb0fc354b1db91a4a7f68b3603ae7c74eb703 100644 --- a/datachecker-check/src/main/java/org/opengauss/datachecker/check/service/CsvProcessManagement.java +++ b/datachecker-check/src/main/java/org/opengauss/datachecker/check/service/CsvProcessManagement.java @@ -49,11 +49,19 @@ public class CsvProcessManagement { private ScheduledExecutorService scheduledExecutor; private final BlockingQueue tableDispatcherQueue = new LinkedBlockingQueue<>(); + /** + * add task to Dispatcher queue + * + * @param completedTableList completedTableList + */ public void taskDispatcher(List completedTableList) { tableDispatcherQueue.addAll(completedTableList); log.info("add tables to task dispatcher queue [{}]", completedTableList); } + /** + * startTaskDispatcher + */ public void startTaskDispatcher() { scheduledExecutor = ThreadUtil.newSingleThreadScheduledExecutor("table-dispatcher"); int delay = ConfigCache.getIntValue(ConfigConstants.CSV_TASK_DISPATCHER_INTERVAL); @@ -71,8 +79,14 @@ public class CsvProcessManagement { log.info("create task dispatcher schedule period [{}] seconds", delay); } + /** + * closeTaskDispatcher + */ public void closeTaskDispatcher() { if (Objects.nonNull(scheduledExecutor)) { + while (!tableDispatcherQueue.isEmpty()) { + ThreadUtil.sleepHalfSecond(); + } scheduledExecutor.shutdownNow(); log.info("shutdown task dispatcher schedule"); } diff --git a/datachecker-common/src/main/java/org/opengauss/datachecker/common/entry/extract/ColumnsMetaData.java b/datachecker-common/src/main/java/org/opengauss/datachecker/common/entry/extract/ColumnsMetaData.java index 6145094025dc6163e31c29649f8b61c0bd60149e..8aeafd66fdd1b70a13cfb85b2fcd52afdfe08ee8 100644 --- a/datachecker-common/src/main/java/org/opengauss/datachecker/common/entry/extract/ColumnsMetaData.java +++ b/datachecker-common/src/main/java/org/opengauss/datachecker/common/entry/extract/ColumnsMetaData.java @@ -99,12 +99,21 @@ public class ColumnsMetaData implements Comparable { .contains(AUTO_INCREMENT); } + /** + * get datatype + * + * @return datatype + */ + public String getDataType() { + return dataType.split("\\(")[0]; + } + /** * pase resultSet metadata to ColumnsMetaData * * @param rs rs * @return ColumnsMetaData - * @throws SQLException + * @throws SQLException SQLException */ public static ColumnsMetaData parse(ResultSet rs) throws SQLException { ColumnsMetaData columnsMetaData = new ColumnsMetaData(); diff --git a/datachecker-common/src/main/java/org/opengauss/datachecker/common/util/HexUtil.java b/datachecker-common/src/main/java/org/opengauss/datachecker/common/util/HexUtil.java index c1ca7a17024cf714985210e2710d8b66f83646b9..606a9827a91f9f83c18dea5126789d1fde03e39c 100644 --- a/datachecker-common/src/main/java/org/opengauss/datachecker/common/util/HexUtil.java +++ b/datachecker-common/src/main/java/org/opengauss/datachecker/common/util/HexUtil.java @@ -26,13 +26,18 @@ public class HexUtil { private static final char[] CHARS = "0123456789ABCDEF".toCharArray(); public static final String HEX_ZERO_PREFIX = "0x"; public static final String HEX_PREFIX = "\\x"; + + /** + * hex openGauss prefix + */ + public static final String HEX_OG_PREFIX = "\\"; private static final String HEX_NO_PREFIX = ""; /** * Convert text string to hexadecimal string * * @param str str - * @return + * @return str */ public static String toHex(String str) { StringBuilder sb = new StringBuilder(""); @@ -52,7 +57,7 @@ public class HexUtil { * Convert byte array to hexadecimal string * * @param data data - * @return + * @return data */ public static String byteToHex(byte[] data) { StringBuilder result = new StringBuilder(); @@ -69,7 +74,7 @@ public class HexUtil { * 02AA -> 0x02AA * * @param data data - * @return + * @return data */ public static String byteToHexTrimBackslash(byte[] data) { return byteToHexTrim(data, HEX_PREFIX); @@ -80,7 +85,7 @@ public class HexUtil { * 02AA -> 02AA * * @param data data - * @return + * @return data */ public static String byteToHexTrim(byte[] data) { return byteToHexTrim(data, HEX_NO_PREFIX); @@ -91,7 +96,7 @@ public class HexUtil { * 02AA -> 0x02AA * * @param data data - * @return + * @return data */ public static String byteToHexTrimZero(byte[] data) { return byteToHexTrim(data, HEX_ZERO_PREFIX); @@ -163,11 +168,23 @@ public class HexUtil { private final String name; private final char hexChar; + /** + * BinaryHex + * + * @param name name + * @param hexChar hexChar + */ BinaryHex(String name, char hexChar) { this.name = name; this.hexChar = hexChar; } + /** + * hexOf + * + * @param name name + * @return char + */ public static char hexOf(String name) { return valueOf("B" + name).hexChar; } diff --git a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/client/CheckingFeignClient.java b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/client/CheckingFeignClient.java index ed0f8a0c5f28b50ade76355c233d3a1cc4e1d460..f5f03ac8e55af9143106c45f4e3f366c011f2f0b 100644 --- a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/client/CheckingFeignClient.java +++ b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/client/CheckingFeignClient.java @@ -70,14 +70,24 @@ public interface CheckingFeignClient { /** * register topic * - * @param table tableName - * @param ptnNum ptnNum - * @param endpoint current endpoint + * @param table tableName + * @param ptnNum ptnNum * @return topic */ - @PostMapping("/register/topic") - Topic registerTopic(@RequestParam(value = "tableName") @NotEmpty String table, - @RequestParam(value = "ptnNum") int ptnNum, @RequestParam(value = "endpoint") @NonNull Endpoint endpoint); + @PostMapping("/source/register/topic") + Topic sourceRegisterTopic(@RequestParam(value = "tableName") @NotEmpty String table, + @RequestParam(value = "ptnNum") int ptnNum); + + /** + * register topic + * + * @param table tableName + * @param ptnNum ptnNum + * @return topic + */ + @PostMapping("/sink/register/topic") + Topic sinkRegisterTopic(@RequestParam(value = "tableName") @NotEmpty String table, + @RequestParam(value = "ptnNum") int ptnNum); /** * health check diff --git a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/dml/DmlBuilder.java b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/dml/DmlBuilder.java index a1f3815f4be9541237453da6c0c6ab15f6215264..8fc6dc4fb4554bb06b74a240847f3bd7b673a217 100644 --- a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/dml/DmlBuilder.java +++ b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/dml/DmlBuilder.java @@ -105,6 +105,11 @@ public class DmlBuilder { protected DataBaseType dataBaseType; protected boolean isOgCompatibilityB; + /** + * hex prefix + */ + protected String hexPrefix; + public DmlBuilder() { } @@ -112,6 +117,7 @@ public class DmlBuilder { this.dataBaseType = databaseType; this.isOgCompatibilityB = ogCompatibility; this.checkMode = ConfigCache.getCheckMode(); + this.hexPrefix = Objects.equals(CheckMode.CSV, checkMode) ? HexUtil.HEX_OG_PREFIX : HexUtil.HEX_PREFIX; } /** @@ -179,10 +185,8 @@ public class DmlBuilder { final String columnName = columnMeta.getColumnName(); if (MetaDataUtil.isDigitKey(columnMeta)) { valueList.add(columnsValue.get(columnName)); - } else if (BLOB_LIST.contains(columnMeta.getDataType())) { - valueList.add(SINGLE_QUOTES + columnsValue.get(columnName) + SINGLE_QUOTES); - } else if (BINARY.contains(columnMeta.getDataType())) { - valueList.add(SINGLE_QUOTES + HexUtil.HEX_PREFIX + columnsValue.get(columnName) + SINGLE_QUOTES); + } else if (BLOB_LIST.contains(columnMeta.getDataType()) || BINARY.contains(columnMeta.getDataType())) { + valueList.add(SINGLE_QUOTES + hexPrefix + columnsValue.get(columnName) + SINGLE_QUOTES); } else { String value = columnsValue.get(columnName); if (Objects.isNull(value)) { diff --git a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/dml/UpdateDmlBuilder.java b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/dml/UpdateDmlBuilder.java index db38c6f3b157516096492a8a302fbad9c1278eb7..737b175ecfb880c2f73e7d3ec7990ed78f07df65 100644 --- a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/dml/UpdateDmlBuilder.java +++ b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/dml/UpdateDmlBuilder.java @@ -19,7 +19,6 @@ import org.opengauss.datachecker.common.entry.enums.ColumnKey; import org.opengauss.datachecker.common.entry.enums.DataBaseType; import org.opengauss.datachecker.common.entry.extract.ColumnsMetaData; import org.opengauss.datachecker.common.entry.extract.TableMetadata; -import org.opengauss.datachecker.common.util.HexUtil; import org.opengauss.datachecker.extract.util.MetaDataUtil; import javax.validation.constraints.NotNull; @@ -98,7 +97,8 @@ public class UpdateDmlBuilder extends DmlBuilder { } public String build() { - return Fragment.DML_UPDATE.replace(Fragment.SCHEMA, schema).replace(Fragment.TABLE_NAME, tableName) + return Fragment.DML_UPDATE.replace(Fragment.SCHEMA, schema) + .replace(Fragment.TABLE_NAME, tableName) .replace(Fragment.COLUMNS, buildColumnsValue()) .replace(Fragment.CONDITION, buildConditionCompositePrimary()); } @@ -107,13 +107,12 @@ public class UpdateDmlBuilder extends DmlBuilder { StringBuilder builder = new StringBuilder(); final List primaryMetaDatas = metadata.getPrimaryMetas(); for (ColumnsMetaData primaryMeta : primaryMetaDatas) { - builder.append(primaryMeta.getColumnName()).append(Fragment.EQUAL); + builder.append(primaryMeta.getColumnName()) + .append(Fragment.EQUAL); if (MetaDataUtil.isDigitKey(primaryMeta)) { builder.append(columnsValues.get(primaryMeta.getColumnName())); - } else if (BLOB_LIST.contains(primaryMeta.getDataType())) { - builder.append(convertValue(HexUtil.toHex(columnsValues.get(primaryMeta.getColumnName())))); - } else if (BINARY.contains(primaryMeta.getDataType())) { - builder.append(convertValue(HexUtil.HEX_PREFIX + columnsValues.get(primaryMeta.getColumnName()))); + } else if (BLOB_LIST.contains(primaryMeta.getDataType()) || BINARY.contains(primaryMeta.getDataType())) { + builder.append(convertValue(hexPrefix + columnsValues.get(primaryMeta.getColumnName()))); } else { builder.append(convertValue(columnsValues.get(primaryMeta.getColumnName()))); } @@ -128,8 +127,6 @@ public class UpdateDmlBuilder extends DmlBuilder { return Fragment.SINGLE_QUOTES + fieldValue + Fragment.SINGLE_QUOTES; } - - private String buildColumnsValue() { StringBuilder builder = new StringBuilder(); final List columnMetaDatas = metadata.getColumnsMetas(); @@ -138,14 +135,13 @@ public class UpdateDmlBuilder extends DmlBuilder { continue; } final String columnName = columnMeta.getColumnName(); - builder.append(columnName).append(Fragment.EQUAL); + builder.append(columnName) + .append(Fragment.EQUAL); final String columnValue = columnsValues.get(columnName); if (MetaDataUtil.isDigitKey(columnMeta)) { builder.append(columnValue); - } else if (BLOB_LIST.contains(columnMeta.getDataType())) { - builder.append(convertValue(HexUtil.toHex(columnValue))); - } else if (BINARY.contains(columnMeta.getDataType())) { - builder.append(convertValue(HexUtil.HEX_PREFIX + columnValue)); + } else if (BLOB_LIST.contains(columnMeta.getDataType()) || BINARY.contains(columnMeta.getDataType())) { + builder.append(convertValue(hexPrefix + columnValue)); } else { builder.append(convertValue(columnValue)); } diff --git a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/service/DataExtractServiceImpl.java b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/service/DataExtractServiceImpl.java index 97f4b32f5d5b5e24457df217b97bac36fb42f5c2..7523f27b3e14f8e5f8190ac7dfd17a04e92c9121 100644 --- a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/service/DataExtractServiceImpl.java +++ b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/service/DataExtractServiceImpl.java @@ -329,14 +329,12 @@ public class DataExtractServiceImpl implements DataExtractService { log.info("Perform data extraction tasks {}", task.getTaskName()); final String tableName = task.getTableName(); if (!tableCheckStatus.containsKey(tableName) || tableCheckStatus.get(tableName) == -1) { - log.warn("Abnormal table[{}] status, ignoring the current table data extraction task", - tableName); + log.warn("Abnormal table[{}] status, ignoring the current table data extraction task", tableName); return; } - ThreadUtil.requestConflictingSleeping(); registerTopic(task); while (!TopicCache.canCreateTopic(maximumTopicSize)) { - ThreadUtil.sleep(1000); + ThreadUtil.sleepHalfSecond(); } Topic topic = task.getTopic(); Endpoint endpoint = extractProperties.getEndpoint(); @@ -484,7 +482,12 @@ public class DataExtractServiceImpl implements DataExtractService { private void registerTopic(ExtractTask task) { int topicPartitions = TopicUtil.calcPartitions(task.getDivisionsTotalNumber()); Endpoint currentEndpoint = extractProperties.getEndpoint(); - Topic topic = checkingFeignClient.registerTopic(task.getTableName(), topicPartitions, currentEndpoint); + Topic topic; + if (Objects.equals(Endpoint.SOURCE, currentEndpoint)) { + topic = checkingFeignClient.sourceRegisterTopic(task.getTableName(), topicPartitions); + } else { + topic = checkingFeignClient.sinkRegisterTopic(task.getTableName(), topicPartitions); + } task.setTopic(topic); } diff --git a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/slice/SliceRegister.java b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/slice/SliceRegister.java index b04ec46ea942a10a1881449a14ae0267be0473c2..e2e40c4ccc769c6ebeca50861e9146eadb00cea4 100644 --- a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/slice/SliceRegister.java +++ b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/slice/SliceRegister.java @@ -78,8 +78,9 @@ public class SliceRegister { if (!TopicCache.canCreateTopic(ConfigCache.getIntValue(ConfigConstants.MAXIMUM_TOPIC_SIZE))) { return false; } - topic = checkingClient.registerTopic(tableName, ptnNum, ConfigCache.getEndPoint()); - if (kafkaAdminService.createTopic(topic.getTopicName(ConfigCache.getEndPoint()), topic.getPtnNum())) { + Endpoint endPoint = ConfigCache.getEndPoint(); + topic = endpointRegisterTopic(tableName, ptnNum, endPoint); + if (kafkaAdminService.createTopic(topic.getTopicName(endPoint), topic.getPtnNum())) { TopicCache.add(topic); return true; } else { @@ -87,6 +88,16 @@ public class SliceRegister { } } + private Topic endpointRegisterTopic(String tableName, int ptnNum, Endpoint endPoint) { + Topic topic; + if (Objects.equals(Endpoint.SOURCE, endPoint)) { + topic = checkingClient.sourceRegisterTopic(tableName, ptnNum); + } else { + topic = checkingClient.sinkRegisterTopic(tableName, ptnNum); + } + return topic; + } + /** * check table has registered topic *