diff --git a/datachecker-check/src/main/java/org/opengauss/datachecker/check/client/ExtractFallbackFactory.java b/datachecker-check/src/main/java/org/opengauss/datachecker/check/client/ExtractFallbackFactory.java index 74599b4d586ec39888b7ec5bc9177037866355b8..3d207528cfa2a07092429888a572d6d4f435f8a2 100644 --- a/datachecker-check/src/main/java/org/opengauss/datachecker/check/client/ExtractFallbackFactory.java +++ b/datachecker-check/src/main/java/org/opengauss/datachecker/check/client/ExtractFallbackFactory.java @@ -199,5 +199,10 @@ public class ExtractFallbackFactory implements FallbackFactory fetchCheckTableCount() { return null; } + + @Override + public void dispatcherTables(List list) { + + } } } diff --git a/datachecker-check/src/main/java/org/opengauss/datachecker/check/client/ExtractFeignClient.java b/datachecker-check/src/main/java/org/opengauss/datachecker/check/client/ExtractFeignClient.java index bd8776eaea09a2a8bc2514c381625c0fef040770..7306e2e91fb3b2b015200376ff62ba5d5ee1d97d 100644 --- a/datachecker-check/src/main/java/org/opengauss/datachecker/check/client/ExtractFeignClient.java +++ b/datachecker-check/src/main/java/org/opengauss/datachecker/check/client/ExtractFeignClient.java @@ -229,4 +229,12 @@ public interface ExtractFeignClient { @GetMapping("/fetch/check/table/count") Result fetchCheckTableCount(); + + /** + * csv dispatcher tables + * + * @param list tables + */ + @PostMapping("/csv/dispatcher/tables") + void dispatcherTables(@RequestBody List list); } diff --git a/datachecker-check/src/main/java/org/opengauss/datachecker/check/client/FeignClientService.java b/datachecker-check/src/main/java/org/opengauss/datachecker/check/client/FeignClientService.java index ef4640f85cb24389464335319f04627c0632f19e..d68c78f4ed5bf316cddf512571b6a69bb442855a 100644 --- a/datachecker-check/src/main/java/org/opengauss/datachecker/check/client/FeignClientService.java +++ b/datachecker-check/src/main/java/org/opengauss/datachecker/check/client/FeignClientService.java @@ -367,4 +367,9 @@ public class FeignClientService { throw new DispatchClientException(Endpoint.SOURCE, "check database error: " + ignored.getMessage()); } } + + public void dispatcherTables(List list) { + getClient(Endpoint.SOURCE).dispatcherTables(list); + getClient(Endpoint.SINK).dispatcherTables(list); + } } diff --git a/datachecker-check/src/main/java/org/opengauss/datachecker/check/config/CsvProperties.java b/datachecker-check/src/main/java/org/opengauss/datachecker/check/config/CsvProperties.java index 640656327f3ca25a7f183eca93bc4bf0a900e408..9c96b9afccb0394de6f633f90b0c05580c9a4f35 100644 --- a/datachecker-check/src/main/java/org/opengauss/datachecker/check/config/CsvProperties.java +++ b/datachecker-check/src/main/java/org/opengauss/datachecker/check/config/CsvProperties.java @@ -33,7 +33,8 @@ import org.springframework.stereotype.Component; @Component @ConfigurationProperties(prefix = "spring.csv") @JSONType( - orders = {"sync", "schema", "path", "data", "reader", "writer", "schemaTables", "schemaColumns", "sleepInterval"}) + orders = {"sync", "schema", "path", "data", "reader", "writer", "schemaTables", "schemaColumns", "sleepInterval", + "taskDispatcherInterval", "maxDispatcherSize"}) public class CsvProperties { private boolean sync; private String schema; @@ -44,6 +45,8 @@ public class CsvProperties { private String schemaTables; private String schemaColumns; private long sleepInterval = 100; + private int taskDispatcherInterval = 3; + private int maxDispatcherSize = 5; /** * translate properties to csv path config diff --git a/datachecker-check/src/main/java/org/opengauss/datachecker/check/controller/CheckCsvController.java b/datachecker-check/src/main/java/org/opengauss/datachecker/check/controller/CheckCsvController.java new file mode 100644 index 0000000000000000000000000000000000000000..7bd5cc18cf6c94e1d9628f35df98e3162c35c5f6 --- /dev/null +++ b/datachecker-check/src/main/java/org/opengauss/datachecker/check/controller/CheckCsvController.java @@ -0,0 +1,47 @@ +/* + * Copyright (c) 2022-2022 Huawei Technologies Co.,Ltd. + * + * openGauss is licensed under Mulan PSL v2. + * You can use this software according to the terms and conditions of the Mulan PSL v2. + * You may obtain a copy of Mulan PSL v2 at: + * + * http://license.coscl.org.cn/MulanPSL2 + * + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PSL v2 for more details. + */ + +package org.opengauss.datachecker.check.controller; + +import org.opengauss.datachecker.check.service.CsvProcessManagement; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.RequestBody; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RestController; + +import java.util.List; + +/** + * @author :wangchao + * @date :Created in 2022/5/25 + * @since :11 + */ +@RestController +@RequestMapping +public class CheckCsvController { + @Autowired + private CsvProcessManagement csvProcessManagement; + + /** + * csv task dispatcher + * + * @param completedTableList completedTableList + */ + @PostMapping("/notify/check/table/index/completed") + public void notifyTableIndexCompleted(@RequestBody List completedTableList) { + csvProcessManagement.taskDispatcher(completedTableList); + } +} diff --git a/datachecker-check/src/main/java/org/opengauss/datachecker/check/load/CheckStartCsvLoader.java b/datachecker-check/src/main/java/org/opengauss/datachecker/check/load/CheckStartCsvLoader.java index ba1e3f8ffcfd708f9c47805d945679131c5c5dca..8e9867fa8ff1374ee05a0a3bdb57f49b6ac65ba2 100644 --- a/datachecker-check/src/main/java/org/opengauss/datachecker/check/load/CheckStartCsvLoader.java +++ b/datachecker-check/src/main/java/org/opengauss/datachecker/check/load/CheckStartCsvLoader.java @@ -18,6 +18,7 @@ package org.opengauss.datachecker.check.load; import org.opengauss.datachecker.check.client.FeignClientService; import org.opengauss.datachecker.check.event.KafkaTopicDeleteProvider; import org.opengauss.datachecker.check.modules.report.SliceProgressService; +import org.opengauss.datachecker.check.service.CsvProcessManagement; import org.opengauss.datachecker.check.service.TaskRegisterCenter; import org.opengauss.datachecker.common.config.ConfigCache; import org.opengauss.datachecker.common.entry.enums.CheckMode; @@ -46,6 +47,8 @@ public class CheckStartCsvLoader extends AbstractCheckLoader { private SliceProgressService sliceProgressService; @Resource private KafkaTopicDeleteProvider kafkaTopicDeleteProvider; + @Resource + private CsvProcessManagement csvProcessManagement; @Override public void load(CheckEnvironment checkEnvironment) { @@ -54,7 +57,7 @@ public class CheckStartCsvLoader extends AbstractCheckLoader { sliceProgressService.startProgressing(); int count = feignClient.fetchCheckTableCount(); sliceProgressService.updateTotalTableCount(count); - + csvProcessManagement.startTaskDispatcher(); feignClient.enableCsvExtractService(); log.info("enabled data check csv mode"); kafkaTopicDeleteProvider.deleteTopicIfTableCheckedCompleted(); @@ -64,6 +67,7 @@ public class CheckStartCsvLoader extends AbstractCheckLoader { while (!registerCenter.checkCompletedAll(count)) { ThreadUtil.sleepOneSecond(); } + csvProcessManagement.closeTaskDispatcher(); shutdown(CSV_CHECK_COMPLETED); } } diff --git a/datachecker-check/src/main/java/org/opengauss/datachecker/check/service/ConfigManagement.java b/datachecker-check/src/main/java/org/opengauss/datachecker/check/service/ConfigManagement.java index 4783fd700a3250b0cd47f872e3c32d2f160fb43e..f1f711c739847fd9cd6384a3bd17978a075e2493 100644 --- a/datachecker-check/src/main/java/org/opengauss/datachecker/check/service/ConfigManagement.java +++ b/datachecker-check/src/main/java/org/opengauss/datachecker/check/service/ConfigManagement.java @@ -87,6 +87,8 @@ public class ConfigManagement { ConfigCache.put(ConfigConstants.CSV_SLEEP_INTERVAL, config.getSleepInterval()); ConfigCache.put(ConfigConstants.CSV_SCHEMA_TABLES_PATH, config.getSchemaTables()); ConfigCache.put(ConfigConstants.CSV_SCHEMA_COLUMNS_PATH, config.getSchemaColumns()); + ConfigCache.put(ConfigConstants.CSV_TASK_DISPATCHER_INTERVAL, config.getTaskDispatcherInterval()); + ConfigCache.put(ConfigConstants.CSV_MAX_DISPATCHER_SIZE, config.getMaxDispatcherSize()); ConfigCache.put(ConfigConstants.CHECK_MODE, CheckMode.CSV); } } diff --git a/datachecker-check/src/main/java/org/opengauss/datachecker/check/service/CsvProcessManagement.java b/datachecker-check/src/main/java/org/opengauss/datachecker/check/service/CsvProcessManagement.java index d986aade7467f8bf9aec63c509fccccb5908d54b..27f36fe74557b8c2c0826d27400a31df22a25820 100644 --- a/datachecker-check/src/main/java/org/opengauss/datachecker/check/service/CsvProcessManagement.java +++ b/datachecker-check/src/main/java/org/opengauss/datachecker/check/service/CsvProcessManagement.java @@ -15,10 +15,23 @@ package org.opengauss.datachecker.check.service; +import org.apache.commons.collections4.CollectionUtils; +import org.apache.logging.log4j.Logger; import org.opengauss.datachecker.check.client.FeignClientService; +import org.opengauss.datachecker.common.config.ConfigCache; +import org.opengauss.datachecker.common.constant.ConfigConstants; +import org.opengauss.datachecker.common.util.LogUtils; +import org.opengauss.datachecker.common.util.ThreadUtil; import org.springframework.stereotype.Component; import javax.annotation.Resource; +import java.util.LinkedList; +import java.util.List; +import java.util.Objects; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; /** * CsvProcessManagement @@ -29,17 +42,39 @@ import javax.annotation.Resource; */ @Component public class CsvProcessManagement { + private static final Logger log = LogUtils.getLogger(); + @Resource private FeignClientService feignClient; + private ScheduledExecutorService scheduledExecutor; + private final BlockingQueue tableDispatcherQueue = new LinkedBlockingQueue<>(); + + public void taskDispatcher(List completedTableList) { + tableDispatcherQueue.addAll(completedTableList); + log.info("add tables to task dispatcher queue [{}]", completedTableList); + } + + public void startTaskDispatcher() { + scheduledExecutor = ThreadUtil.newSingleThreadScheduledExecutor("table-dispatcher"); + int delay = ConfigCache.getIntValue(ConfigConstants.CSV_TASK_DISPATCHER_INTERVAL); + int maxDispatcherSize = ConfigCache.getIntValue(ConfigConstants.CSV_MAX_DISPATCHER_SIZE); + scheduledExecutor.scheduleWithFixedDelay(() -> { + List list = new LinkedList<>(); + while (!tableDispatcherQueue.isEmpty() && list.size() < maxDispatcherSize) { + list.add(tableDispatcherQueue.poll()); + } + if (CollectionUtils.isNotEmpty(list)) { + feignClient.dispatcherTables(list); + log.info("dispatcher tables to extract service [{}]", list); + } + }, delay, delay, TimeUnit.SECONDS); + log.info("create task dispatcher schedule period [{}] seconds", delay); + } - /** - * csv process management, start csv extract process
- * listener slice task status,source and sink callback slice info and status
- * check slice data (match source and sink slice success,then checked it )
- * summary slice check result, refresh process log
- * summary check result, refresh process log
- */ - public void start() { - feignClient.enableCsvExtractService(); + public void closeTaskDispatcher() { + if (Objects.nonNull(scheduledExecutor)) { + scheduledExecutor.shutdownNow(); + log.info("shutdown task dispatcher schedule"); + } } } diff --git a/datachecker-check/src/main/resources/application.yml b/datachecker-check/src/main/resources/application.yml index ca7c80b83c3c26a7d6db3f168489b5ac05763026..faa83eb272ed79b4252d1afa46fbe6a4461dc9b7 100644 --- a/datachecker-check/src/main/resources/application.yml +++ b/datachecker-check/src/main/resources/application.yml @@ -41,7 +41,8 @@ spring: schema-tables: ${spring.csv.path}/${spring.csv.schema}_information_schema_tables.csv schema-columns: ${spring.csv.path}/${spring.csv.schema}_information_schema_columns.csv sleep-interval: 100 # 毫秒 - + task-dispatcher-interval: 3 # csv task dispatcher period (unit second) + max-dispatcher-size: 5 # csv task dispatcher size data: check: data-path: local_path/xxx diff --git a/datachecker-common/src/main/java/org/opengauss/datachecker/common/constant/ConfigConstants.java b/datachecker-common/src/main/java/org/opengauss/datachecker/common/constant/ConfigConstants.java index 34ad178887f73b3ed71a17144c63b214b18fb3f2..0c314a6200511be68ae4ce1223dd32f0c589d74e 100644 --- a/datachecker-common/src/main/java/org/opengauss/datachecker/common/constant/ConfigConstants.java +++ b/datachecker-common/src/main/java/org/opengauss/datachecker/common/constant/ConfigConstants.java @@ -32,6 +32,8 @@ public interface ConfigConstants { String CSV_SCHEMA_TABLES_PATH = "spring.csv.schema-tables"; String CSV_SCHEMA_COLUMNS_PATH = "spring.csv.schema-columns"; String CSV_SLEEP_INTERVAL = "spring.csv.sleep-interval"; + String CSV_TASK_DISPATCHER_INTERVAL = "spring.csv.task-dispatcher-interval"; + String CSV_MAX_DISPATCHER_SIZE = "spring.csv.max-dispatcher-size"; String CHECK_MODE = "check_mode"; String PROCESS_NO = "check.process.id"; @@ -76,4 +78,5 @@ public interface ConfigConstants { * lifecycle graceful shutdown wait time */ String TIMEOUT_PER_SHUTDOWN_PHASE = "spring.lifecycle.timeout-per-shutdown-phase"; + } diff --git a/datachecker-common/src/main/java/org/opengauss/datachecker/common/entry/csv/CsvPathConfig.java b/datachecker-common/src/main/java/org/opengauss/datachecker/common/entry/csv/CsvPathConfig.java index 927b8de8c5e09d670137aa87e337d23b2ddda924..489fe9e75cd9428321aec12e9fab6e21a7d6c534 100644 --- a/datachecker-common/src/main/java/org/opengauss/datachecker/common/entry/csv/CsvPathConfig.java +++ b/datachecker-common/src/main/java/org/opengauss/datachecker/common/entry/csv/CsvPathConfig.java @@ -27,7 +27,8 @@ import lombok.Data; */ @Data @JSONType( - orders = {"sync", "schema", "path", "data", "reader", "writer", "schemaTables", "schemaColumns", "sleepInterval"}) + orders = {"sync", "schema", "path", "data", "reader", "writer", "schemaTables", "schemaColumns", "sleepInterval", + "taskDispatcherInterval", "maxDispatcherSize"}) public class CsvPathConfig { private boolean sync; private String schema; @@ -38,4 +39,6 @@ public class CsvPathConfig { private String schemaTables; private String schemaColumns; private long sleepInterval = 100; + private int taskDispatcherInterval = 3; + private int maxDispatcherSize = 5; } diff --git a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/client/CheckingFeignClient.java b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/client/CheckingFeignClient.java index 8c0ad3e5ec64d02cff947c773c0688a0db5424a8..c8f3af1d077e8f48edcd3aafd2f6b72b681ffe7d 100644 --- a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/client/CheckingFeignClient.java +++ b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/client/CheckingFeignClient.java @@ -70,15 +70,14 @@ public interface CheckingFeignClient { /** * register topic * - * @param table tableName - * @param ptnNum ptnNum - * @param endpoint current endpoint + * @param table tableName + * @param ptnNum ptnNum + * @param endpoint current endpoint * @return topic */ @PostMapping("/register/topic") Topic registerTopic(@RequestParam(value = "tableName") @NotEmpty String table, - @RequestParam(value = "ptnNum") int ptnNum, - @RequestParam(value = "endpoint") @NonNull Endpoint endpoint); + @RequestParam(value = "ptnNum") int ptnNum, @RequestParam(value = "endpoint") @NonNull Endpoint endpoint); /** * health check @@ -95,7 +94,8 @@ public interface CheckingFeignClient { Map queryTableCheckStatus(); @GetMapping("/get/feign/request") - boolean getFeignRequest(@RequestParam(value = "requestName") String requestName, @RequestParam(value = "value") String value); + boolean getFeignRequest(@RequestParam(value = "requestName") String requestName, + @RequestParam(value = "value") String value); @GetMapping("/release/feign/request") boolean releaseFeignRequest(@RequestParam(value = "requestName") String requestName); @@ -133,12 +133,14 @@ public interface CheckingFeignClient { /** * register table checkpoint list * - * @param endpoint endpoint - * @param tableName tableName + * @param endpoint endpoint + * @param tableName tableName * @param checkPoint checkPoint */ @PostMapping("/register/checkpoint") void registerCheckPoint(@RequestParam(name = "endpoint") @NotEmpty Endpoint endpoint, - @RequestParam(name = "tableName") @NotEmpty String tableName, - @RequestBody List checkPoint); + @RequestParam(name = "tableName") @NotEmpty String tableName, @RequestBody List checkPoint); + + @PostMapping("/notify/check/table/index/completed") + void notifyTableIndexCompleted(@RequestBody List completedTableList); } \ No newline at end of file diff --git a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/controller/CheckCsvController.java b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/controller/CheckCsvController.java index 78d67a28b3f967eca3c0a7b97071747bbc669396..732079e849f149c917a4c0c4cf84dd820a29108d 100644 --- a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/controller/CheckCsvController.java +++ b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/controller/CheckCsvController.java @@ -20,10 +20,12 @@ import org.opengauss.datachecker.common.constant.ConfigConstants; import org.opengauss.datachecker.common.web.Result; import org.opengauss.datachecker.extract.service.CsvManagementService; import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.RequestBody; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import javax.annotation.Resource; +import java.util.List; /** * CheckCsvController @@ -53,4 +55,14 @@ public class CheckCsvController { } return Result.success(); } + + /** + * csv dispatcher tables + * + * @param list tables + */ + @PostMapping("/csv/dispatcher/tables") + public void dispatcherTables(@RequestBody List list) { + csvManagementService.dispatcherTables(list); + } } diff --git a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/data/csv/CsvListener.java b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/data/csv/CsvListener.java index 01d3569c7b551d13a98f878bdd5b728ffd520bc0..cc92764d0bddf4b6690aae35bee8edfbd2a88b62 100644 --- a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/data/csv/CsvListener.java +++ b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/data/csv/CsvListener.java @@ -16,8 +16,11 @@ package org.opengauss.datachecker.extract.data.csv; import org.opengauss.datachecker.common.entry.extract.SliceVo; +import org.opengauss.datachecker.extract.client.CheckingFeignClient; import org.opengauss.datachecker.extract.constants.ExtConstants; +import java.util.List; + /** * CsvListener * @@ -29,16 +32,18 @@ public interface CsvListener { /** * init csv listener + * + * @param checkingClient */ - void initCsvListener(); + void initCsvListener(CheckingFeignClient checkingClient); /** - * poll slice vo from listener queue - * Retrieves and removes the head of this queue, or returns null if this queue is empty + * fetch table's all slices * - * @return slice + * @param table + * @return slices */ - SliceVo poll(); + List fetchTableSliceList(String table); /** * stop tailer listener @@ -47,13 +52,14 @@ public interface CsvListener { /** * check slice ptn num, if ptn num is invalid, set min ptn num + * * @param slice slice */ default void checkSlicePtnNum(SliceVo slice) { if (slice.getPtnNum() < ExtConstants.MIN_TOPIC_PTN_NUM) { slice.setPtnNum(ExtConstants.MIN_TOPIC_PTN_NUM); } - }; + } /** * listener is finished @@ -61,4 +67,10 @@ public interface CsvListener { * @return true | false */ boolean isFinished(); + + /** + * release slice cache of table + * @param table table + */ + void releaseSliceCache(String table); } diff --git a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/data/csv/CsvReaderListener.java b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/data/csv/CsvReaderListener.java index 90362bb0ae61709be51c01ec09679a8e956aadfb..4bba86803ac1e5864dbee79789174f20609196e6 100644 --- a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/data/csv/CsvReaderListener.java +++ b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/data/csv/CsvReaderListener.java @@ -23,12 +23,15 @@ import org.apache.logging.log4j.Logger; import org.opengauss.datachecker.common.config.ConfigCache; import org.opengauss.datachecker.common.entry.extract.SliceVo; import org.opengauss.datachecker.common.util.LogUtils; +import org.opengauss.datachecker.common.util.MapUtils; +import org.opengauss.datachecker.extract.client.CheckingFeignClient; import org.opengauss.datachecker.extract.constants.ExtConstants; import java.io.File; +import java.util.List; +import java.util.Map; import java.util.Objects; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ConcurrentHashMap; /** * CsvReaderListener @@ -39,13 +42,12 @@ import java.util.concurrent.LinkedBlockingQueue; */ public class CsvReaderListener implements CsvListener { private static final Logger log = LogUtils.getLogger(); - private final BlockingQueue listenerQueue = new LinkedBlockingQueue<>(); + private final Map> readerSliceMap = new ConcurrentHashMap<>(); private Tailer tailer; - private TailerMonitor tailerMonitor; private boolean isTailEnd = false; @Override - public void initCsvListener() { + public void initCsvListener(CheckingFeignClient checkingClient) { log.info("csv reader listener is starting ."); // creates and starts a Tailer for read writer logs in real time tailer = Tailer.create(new File(ConfigCache.getReader()), new TailerListenerAdapter() { @@ -68,34 +70,34 @@ public class CsvReaderListener implements CsvListener { return; } checkSlicePtnNum(slice); - listenerQueue.add(slice); + MapUtils.put(readerSliceMap, slice.getTable(), slice); log.debug("reader add log :{}", line); } catch (Exception ex) { log.error("reader log listener error :" + ex.getMessage()); } } }, ConfigCache.getCsvLogMonitorInterval(), false); - - tailerMonitor = new TailerMonitor(tailer, listenerQueue); - Thread thread = new Thread(tailerMonitor); - thread.start(); log.info("csv reader listener is started ."); } @Override - public SliceVo poll() { - return listenerQueue.poll(); + public List fetchTableSliceList(String table) { + return readerSliceMap.get(table); + } + + @Override + public void releaseSliceCache(String table) { + readerSliceMap.remove(table); } @Override public boolean isFinished() { - return isTailEnd && listenerQueue.isEmpty(); + return isTailEnd && readerSliceMap.isEmpty(); } @Override public void stop() { if (Objects.nonNull(tailer)) { - tailerMonitor.stop(); tailer.stop(); } } diff --git a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/data/csv/CsvWriterListener.java b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/data/csv/CsvWriterListener.java index 410403213b68deb657793ac19c89daacb6a3b572..5aefc299f641afd0dcd273dc21e417f5fa03bb3a 100644 --- a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/data/csv/CsvWriterListener.java +++ b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/data/csv/CsvWriterListener.java @@ -17,26 +17,33 @@ package org.opengauss.datachecker.extract.data.csv; import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.parser.Feature; +import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.io.input.Tailer; import org.apache.commons.io.input.TailerListenerAdapter; import org.apache.commons.lang3.StringUtils; import org.apache.logging.log4j.Logger; import org.opengauss.datachecker.common.config.ConfigCache; +import org.opengauss.datachecker.common.constant.ConfigConstants; import org.opengauss.datachecker.common.entry.csv.SliceIndexVo; import org.opengauss.datachecker.common.entry.enums.SliceIndexStatus; import org.opengauss.datachecker.common.entry.enums.SliceLogType; import org.opengauss.datachecker.common.entry.extract.SliceVo; import org.opengauss.datachecker.common.util.LogUtils; import org.opengauss.datachecker.common.util.MapUtils; +import org.opengauss.datachecker.common.util.ThreadUtil; +import org.opengauss.datachecker.extract.client.CheckingFeignClient; import org.opengauss.datachecker.extract.constants.ExtConstants; import java.io.File; +import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Objects; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; /** * CsvWriterListener @@ -47,14 +54,16 @@ import java.util.concurrent.LinkedBlockingQueue; */ public class CsvWriterListener implements CsvListener { private static final Logger log = LogUtils.getLogger(); - private final Map> tableSliceLogs = new ConcurrentSkipListMap<>(); - private final BlockingQueue listenerQueue = new LinkedBlockingQueue<>(); + private final Map> writerSliceMap = new ConcurrentSkipListMap<>(); + private final BlockingQueue tableIndexCompletedList = new LinkedBlockingQueue<>(); private Tailer tailer; - private TailerMonitor tailerMonitor; private boolean isTailEnd = false; + private CheckingFeignClient checkingClient; + private ScheduledExecutorService scheduledExecutor; @Override - public void initCsvListener() { + public void initCsvListener(CheckingFeignClient checkingClient) { + this.checkingClient = checkingClient; log.info("csv writer listener is starting ."); // creates and starts a Tailer for read writer logs in real time tailer = Tailer.create(new File(ConfigCache.getWriter()), new TailerListenerAdapter() { @@ -81,21 +90,13 @@ public class CsvWriterListener implements CsvListener { if (Objects.equals(sliceLogType, SliceLogType.SLICE)) { SliceVo slice = JSONObject.parseObject(line, SliceVo.class); checkSlicePtnNum(slice); - MapUtils.put(tableSliceLogs, slice.getTable(), slice); + MapUtils.put(writerSliceMap, slice.getTable(), slice); } else if (Objects.equals(sliceLogType, SliceLogType.INDEX)) { SliceIndexVo sliceIndex = JSONObject.parseObject(line, SliceIndexVo.class, Feature.AllowISO8601DateFormat); - if (Objects.equals(sliceIndex.getIndexStatus(), SliceIndexStatus.END)) { - if (tableSliceLogs.containsKey(sliceIndex.getTable())) { - List sliceList = tableSliceLogs.get(sliceIndex.getTable()); - listenerQueue.addAll(sliceList); - tableSliceLogs.remove(sliceIndex.getTable()); - log.info("writer add table :{}", sliceIndex.getTable()); - } else { - log.error("csv writer log [{}] not found!", sliceIndex.getTable()); - } - } else if (Objects.equals(sliceIndex.getIndexStatus(), SliceIndexStatus.NONE)) { - tableSliceLogs.remove(sliceIndex.getTable()); + if (Objects.equals(sliceIndex.getIndexStatus(), SliceIndexStatus.END) || Objects.equals( + sliceIndex.getIndexStatus(), SliceIndexStatus.NONE)) { + tableIndexCompletedList.add(sliceIndex.getTable()); } } log.debug("writer add log :{}", line); @@ -103,32 +104,57 @@ public class CsvWriterListener implements CsvListener { log.error("writer log listener error :{}", line, ex); } } - }, ConfigCache.getCsvLogMonitorInterval(), false); - - tailerMonitor = new TailerMonitor(tailer, listenerQueue); - Thread thread = new Thread(tailerMonitor); - thread.start(); + startNotifyScheduledExecutor(); log.info("csv writer listener is started."); } + public void startNotifyScheduledExecutor() { + scheduledExecutor = ThreadUtil.newSingleThreadScheduledExecutor("table-index-completed-feedback"); + int interval = ConfigCache.getIntValue(ConfigConstants.CSV_TASK_DISPATCHER_INTERVAL); + int maxDispatcherSize = ConfigCache.getIntValue(ConfigConstants.CSV_MAX_DISPATCHER_SIZE); + scheduledExecutor.scheduleWithFixedDelay(() -> { + try { + if (tableIndexCompletedList.size() > 0) { + List completedTableList = new LinkedList<>(); + while (!tableIndexCompletedList.isEmpty() && completedTableList.size() < maxDispatcherSize) { + completedTableList.add(tableIndexCompletedList.poll()); + } + if (CollectionUtils.isNotEmpty(completedTableList)) { + checkingClient.notifyTableIndexCompleted(completedTableList); + } + log.info("notify table can start checking [{}]", completedTableList); + completedTableList.clear(); + } + } catch (Exception ex) { + log.error("table-index-completed-feedbac error: ", ex); + } + + }, interval, interval, TimeUnit.SECONDS); + } + + @Override + public List fetchTableSliceList(String table) { + return writerSliceMap.get(table); + } + @Override - public SliceVo poll() { - return listenerQueue.poll(); + public void releaseSliceCache(String table) { + writerSliceMap.remove(table); } @Override public void stop() { if (Objects.nonNull(tailer)) { - tailerMonitor.stop(); tailer.stop(); - tableSliceLogs.clear(); + writerSliceMap.clear(); + scheduledExecutor.shutdownNow(); } } @Override public boolean isFinished() { - return isTailEnd && listenerQueue.isEmpty(); + return isTailEnd && writerSliceMap.isEmpty(); } private boolean skipNoInvalidSlice(JSONObject slice) { diff --git a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/service/ConfigManagement.java b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/service/ConfigManagement.java index b89073abf0e69f01e99c98ef1abdd9b138df5b03..66ae82f814abf6898d67b063407eb92960dfe911 100644 --- a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/service/ConfigManagement.java +++ b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/service/ConfigManagement.java @@ -94,6 +94,8 @@ public class ConfigManagement { ConfigCache.put(ConfigConstants.CSV_SLEEP_INTERVAL, config.getSleepInterval()); ConfigCache.put(ConfigConstants.CSV_SCHEMA_TABLES_PATH, config.getSchemaTables()); ConfigCache.put(ConfigConstants.CSV_SCHEMA_COLUMNS_PATH, config.getSchemaColumns()); + ConfigCache.put(ConfigConstants.CSV_TASK_DISPATCHER_INTERVAL, config.getTaskDispatcherInterval()); + ConfigCache.put(ConfigConstants.CSV_MAX_DISPATCHER_SIZE, config.getMaxDispatcherSize()); ConfigCache.put(ConfigConstants.CHECK_MODE, CheckMode.CSV); } diff --git a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/service/CsvManagementService.java b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/service/CsvManagementService.java index 11ac9a9e01761930e269a9b1d8aef4296d282162..0f8f999796a5615e3301ea8cc684c70d5b1f8797 100644 --- a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/service/CsvManagementService.java +++ b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/service/CsvManagementService.java @@ -21,6 +21,7 @@ import org.opengauss.datachecker.common.constant.ConfigConstants; import org.opengauss.datachecker.common.entry.enums.Endpoint; import org.opengauss.datachecker.common.service.DynamicThreadPoolManager; import org.opengauss.datachecker.common.util.LogUtils; +import org.opengauss.datachecker.extract.client.CheckingFeignClient; import org.opengauss.datachecker.extract.slice.SliceDispatcher; import org.opengauss.datachecker.extract.slice.SliceRegister; import org.opengauss.datachecker.extract.data.BaseDataService; @@ -31,6 +32,7 @@ import org.opengauss.datachecker.extract.slice.TableDispatcher; import org.springframework.stereotype.Service; import javax.annotation.Resource; +import java.util.List; import java.util.Objects; /** @@ -50,6 +52,8 @@ public class CsvManagementService { private DynamicThreadPoolManager dynamicThreadPoolManager; @Resource private SliceRegister sliceRegister; + @Resource + private CheckingFeignClient checkingClient; private CsvListener listener; private SliceDispatcher sliceDispatcher = null; @@ -70,7 +74,7 @@ public class CsvManagementService { listener = new CsvWriterListener(); } baseDataService.queryTableMetadataList(); - listener.initCsvListener(); + listener.initCsvListener(checkingClient); // start slice dispatcher core thread sliceDispatcher = new SliceDispatcher(dynamicThreadPoolManager, sliceRegister, baseDataService, listener); Thread thread = new Thread(sliceDispatcher); @@ -94,4 +98,8 @@ public class CsvManagementService { sliceDispatcher.stop(); } } + + public void dispatcherTables(List list) { + sliceDispatcher.addSliceTables(list); + } } diff --git a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/slice/SliceDispatcher.java b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/slice/SliceDispatcher.java index 2ac45ab60cd957efad2b15beffb4f677d9713dc5..1702f74efd9ea2a90d51e3d16cab9dc1c2191838 100644 --- a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/slice/SliceDispatcher.java +++ b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/slice/SliceDispatcher.java @@ -15,6 +15,8 @@ package org.opengauss.datachecker.extract.slice; +import org.apache.commons.collections4.CollectionUtils; +import org.apache.commons.lang3.StringUtils; import org.apache.logging.log4j.Logger; import org.opengauss.datachecker.common.config.ConfigCache; import org.opengauss.datachecker.common.constant.ConfigConstants; @@ -32,7 +34,10 @@ import org.opengauss.datachecker.extract.data.csv.CsvListener; import org.opengauss.datachecker.extract.slice.factory.SliceFactory; import java.io.File; +import java.util.List; import java.util.Objects; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import static org.opengauss.datachecker.common.constant.DynamicTpConstant.EXTRACT_EXECUTOR; @@ -48,7 +53,7 @@ public class SliceDispatcher implements Runnable { private static final Logger log = LogUtils.getLogger(); private static final int MAX_DISPATCHER_QUEUE_SIZE = 100; private static final int WAIT_ONE_SECOND = 1000; - + private final BlockingQueue tableQueue = new LinkedBlockingQueue<>(); private final Object lock = new Object(); private final SliceFactory sliceFactory; private final CsvListener listener; @@ -57,6 +62,7 @@ public class SliceDispatcher implements Runnable { private final DynamicThreadPoolManager dynamicThreadPoolManager; private TopicCache topicCache; private boolean isRunning = true; + private final int maxFetchSize; /** * construct slice dispatcher @@ -74,6 +80,7 @@ public class SliceDispatcher implements Runnable { this.dynamicThreadPoolManager = dynamicThreadPoolManager; this.sliceFactory = new SliceFactory(baseDataService.getDataSource()); this.topicCache = SpringUtil.getBean(TopicCache.class); + this.maxFetchSize = ConfigCache.getIntValue(ConfigConstants.MAXIMUM_TABLE_SLICE_SIZE); } @Override @@ -85,27 +92,33 @@ public class SliceDispatcher implements Runnable { Endpoint endPoint = ConfigCache.getEndPoint(); while (isRunning) { waitingForIdle(executor); - SliceVo sliceVo = listener.poll(); - if (Objects.nonNull(sliceVo)) { - // check table by rule of table - String table = sliceVo.getTable(); - if (!baseDataService.checkTableContains(table)) { - log.info("distributors ignore [{}] table shards based on table rules", table); - notifyIgnoreCsvName(endPoint, sliceVo.getName(), "table rules"); - continue; - } - TableMetadata tableMetadata = baseDataService.getTableMetadata(table); - if (!tableMetadata.hasPrimary()) { - log.info("distributors ignore [{}] table because of it's no primary", table); - notifyIgnoreCsvName(endPoint, sliceVo.getName(), "table no primary"); - continue; - } - sliceVo.setEndpoint(endPoint); - register(sliceVo); - doTableSlice(executor, sliceVo); - } else { - ThreadUtil.sleepMax2Second(); + String table = tableQueue.poll(); + if (StringUtils.isEmpty(table)) { + continue; + } + // check table by rule of table + if (!baseDataService.checkTableContains(table)) { + log.info("distributors ignore [{}] table shards based on table rules", table); + notifyIgnoreCsvName(endPoint, table, "table rules"); + listener.releaseSliceCache(table); + continue; + } + TableMetadata tableMetadata = baseDataService.queryTableMetadata(table); + if (!tableMetadata.hasPrimary()) { + log.info("distributors ignore [{}] table because of it's no primary", table); + notifyIgnoreCsvName(endPoint, table, "table no primary"); + listener.releaseSliceCache(table); + continue; + } + List tableSliceList = listener.fetchTableSliceList(table); + if (CollectionUtils.isNotEmpty(tableSliceList)) { + tableSliceList.forEach(sliceVo -> { + sliceVo.setEndpoint(endPoint); + register(sliceVo); + doTableSlice(executor, sliceVo); + }); } + if (listener.isFinished()) { log.info("listener is finished , and will be closed"); listener.stop(); @@ -124,13 +137,17 @@ public class SliceDispatcher implements Runnable { } } - private void notifyIgnoreCsvName(Endpoint endPoint, String ignoreCsvName, String reason) { + private void notifyIgnoreCsvName(Endpoint endPoint, String ignoreCsvTableName, String reason) { if (Objects.equals(Endpoint.SOURCE, endPoint)) { String csvDataPath = ConfigCache.getCsvData(); - File file = new File(csvDataPath, ignoreCsvName); - if (file.exists() && file.renameTo(new File(csvDataPath, ignoreCsvName + ".check"))) { - log.debug("rename csv sharding completed [{}] by {}", ignoreCsvName, reason); - } + List tableSliceList = listener.fetchTableSliceList(ignoreCsvTableName); + tableSliceList.forEach(slice -> { + String ignoreCsvName = slice.getName(); + File file = new File(csvDataPath, ignoreCsvName); + if (file.exists() && file.renameTo(new File(csvDataPath, ignoreCsvName + ".check"))) { + log.debug("rename csv sharding completed [{}] by {}", ignoreCsvName, reason); + } + }); } } @@ -190,7 +207,6 @@ public class SliceDispatcher implements Runnable { while (sliceVo.getPtnNum() > 0 && !sliceRegister.registerTopic(sliceVo.getTable(), sliceVo.getPtnNum())) { ThreadUtil.sleepMax2Second(); } - int maxFetchSize = ConfigCache.getIntValue(ConfigConstants.MAXIMUM_TABLE_SLICE_SIZE); updateSliceFetchSize(maxFetchSize, sliceVo); sliceRegister.register(sliceVo); } @@ -201,4 +217,10 @@ public class SliceDispatcher implements Runnable { public void stop() { isRunning = false; } + + public void addSliceTables(List list) { + if (CollectionUtils.isNotEmpty(list)) { + tableQueue.addAll(list); + } + } } \ No newline at end of file