diff --git a/datachecker-common/src/main/java/org/opengauss/datachecker/common/constant/ConfigConstants.java b/datachecker-common/src/main/java/org/opengauss/datachecker/common/constant/ConfigConstants.java index 442520a28134655a8dc79445e883217fe6f966b2..f45ecd837a9639fc4a6ea3772c65c17268aec400 100644 --- a/datachecker-common/src/main/java/org/opengauss/datachecker/common/constant/ConfigConstants.java +++ b/datachecker-common/src/main/java/org/opengauss/datachecker/common/constant/ConfigConstants.java @@ -23,64 +23,248 @@ package org.opengauss.datachecker.common.constant; * @since :11 */ public interface ConfigConstants { + /** + * csv config : spring.csv.schema + */ String CSV_SCHEMA = "spring.csv.schema"; + + /** + * csv config : spring.csv.path + */ String CSV_PATH = "spring.csv.path"; + + /** + * csv config : spring.csv.sync + */ String CSV_SYNC = "spring.csv.sync"; + + /** + * csv config : spring.csv.path:data + */ String CSV_DATA_PATH = "spring.csv.path:data"; + + /** + * csv config : spring.csv.path:reader + */ String CSV_READER_PATH = "spring.csv.path:reader"; + + /** + * csv config : spring.csv.path:writer + */ String CSV_WRITER_PATH = "spring.csv.path:writer"; + + /** + * csv config : spring.csv.schema-tables + */ String CSV_SCHEMA_TABLES_PATH = "spring.csv.schema-tables"; + + /** + * csv config : spring.csv.schema-columns + */ String CSV_SCHEMA_COLUMNS_PATH = "spring.csv.schema-columns"; + + /** + * csv config : spring.csv.sleep-interval + */ String CSV_SLEEP_INTERVAL = "spring.csv.sleep-interval"; + + /** + * csv config : spring.csv.task-dispatcher-interval + */ String CSV_TASK_DISPATCHER_INTERVAL = "spring.csv.task-dispatcher-interval"; + + /** + * csv config : spring.csv.max-dispatcher-size + */ String CSV_MAX_DISPATCHER_SIZE = "spring.csv.max-dispatcher-size"; + /** + * check_mode + */ String CHECK_MODE = "check_mode"; + + /** + * check.process.id + */ String PROCESS_NO = "check.process.id"; + + /** + * check.start-time + */ String START_LOCAL_TIME = "check.start-time"; + + /** + * jdbc.result-set.fetch-size + */ String FETCH_SIZE = "jdbc.result-set.fetch-size"; + /** + * spring.extract.endpoint + */ String ENDPOINT = "spring.extract.endpoint"; + + /** + * spring.extract.query-dop + */ String QUERY_DOP = "spring.extract.query-dop"; + + /** + * spring.extract.databaseType + */ String DATA_BASE_TYPE = "spring.extract.databaseType"; - String OBJECT_SIZE_EXPANSION_FACTOR = "spring.extract.object-size-expansion-factor"; + /** + * spring.extract.object-size-expansion-factor + */ + String OBJECT_SIZE_EXPANSION_FACTOR = "spring.extract.object-size-expansion-factor"; + /** + * spring.memory-monitor-enable + */ String MEMORY_MONITOR = "spring.memory-monitor-enable"; + /** + * spring.check.maximum-table-slice-size + */ String MAXIMUM_TABLE_SLICE_SIZE = "spring.check.maximum-table-slice-size"; + + /** + * spring.check.maximum-topic-size + */ String MAXIMUM_TOPIC_SIZE = "spring.check.maximum-topic-size"; + + /** + * spring.check.floating-point-data-supply-zero + */ String FLOATING_POINT_DATA_SUPPLY_ZERO = "spring.check.floating-point-data-supply-zero"; /** * with table slice check, config the maximum number of threads in the thread pool */ String EXTEND_MAXIMUM_POOL_SIZE = "spring.check.extend-maximum-pool-size"; + + /** + * data.check.bucket-expect-capacity + */ String BUCKET_CAPACITY = "data.check.bucket-expect-capacity"; + + /** + * data.check.data-path + */ String CHECK_PATH = "data.check.data-path"; + + /** + * data.check.sql_mode_pad_char_to_full_length + */ String SQL_MODE_PAD_CHAR_TO_FULL_LENGTH = "data.check.sql_mode_pad_char_to_full_length"; + + /** + * pad_char_to_full_length + */ String SQL_MODE_NAME_PAD_CHAR_TO_FULL_LENGTH = "pad_char_to_full_length"; + + /** + * data.check.sql_mode_value_cache + */ String SQL_MODE_VALUE_CACHE = "data.check.sql_mode_value_cache"; + + /** + * data.check.sql_mode_force_refresh + */ String SQL_MODE_FORCE_REFRESH = "data.check.sql_mode_force_refresh"; + /** + * spring.kafka.bootstrap-servers + */ String KAFKA_SERVERS = "spring.kafka.bootstrap-servers"; + + /** + * spring.kafka.consumer.enable-auto-commit + */ String KAFKA_AUTO_COMMIT = "spring.kafka.consumer.enable-auto-commit"; + + /** + * spring.kafka.consumer.group-id + */ String KAFKA_DEFAULT_GROUP_ID = "spring.kafka.consumer.group-id"; + + /** + * spring.kafka.consumer.auto-offset-reset + */ String KAFKA_AUTO_OFFSET_RESET = "spring.kafka.consumer.auto-offset-reset"; + + /** + * spring.kafka.consumer.request-timeout-ms + */ String KAFKA_REQUEST_TIMEOUT = "spring.kafka.consumer.request-timeout-ms"; + + /** + * spring.kafka.consumer.fetch-max-bytes + */ String KAFKA_FETCH_MAX_BYTES = "spring.kafka.consumer.fetch-max-bytes"; + + /** + * spring.kafka.consumer.max-poll-records + */ String KAFKA_MAX_POLL_RECORDS = "spring.kafka.consumer.max-poll-records"; + /** + * spring.datasource.driver-class-name + */ + String DRIVER_CLASS_NAME = "spring.datasource.driver-class-name"; + + /** + * spring.datasource.url + */ + String DS_URL = "spring.datasource.url"; + + /** + * spring.datasource.username + */ + String DS_USER_NAME = "spring.datasource.username"; + + /** + * spring.datasource.password + */ + String DS_PASSWORD = "spring.datasource.password"; + + /** + * spring.datasource.druid.initialSize + */ String DRUID_INITIAL_SIZE = "spring.datasource.druid.initialSize"; + + /** + * spring.datasource.druid.minIdle + */ String DRUID_MIN_IDLE = "spring.datasource.druid.minIdle"; + + /** + * spring.datasource.druid.maxActive + */ String DRUID_MAX_ACTIVE = "spring.datasource.druid.maxActive"; + + /** + * spring.datasource.druid.max-wait + */ String DRUID_MAX_WAIT = "spring.datasource.druid.max-wait"; + + /** + * spring.datasource.druid.validationQuery + */ String DRUID_VALIDATION_QUERY = "spring.datasource.druid.validationQuery"; + + /** + * spring.datasource.druid.min-evictable-idle-time-millis + */ String DRUID_MIN_EVICTABLE_IDLE_TIME_MILLIS = "spring.datasource.druid.min-evictable-idle-time-millis"; /** * lifecycle graceful shutdown wait time */ String TIMEOUT_PER_SHUTDOWN_PHASE = "spring.lifecycle.timeout-per-shutdown-phase"; + + /** + * openGauss.Compatibility.B + */ String OG_COMPATIBILITY_B = "openGauss.Compatibility.B"; } diff --git a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/resource/ConnectionMgr.java b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/resource/ConnectionMgr.java new file mode 100644 index 0000000000000000000000000000000000000000..cd46f3834430bf894b84e0b524bca4e61f184dbc --- /dev/null +++ b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/resource/ConnectionMgr.java @@ -0,0 +1,109 @@ +/* + * Copyright (c) 2022-2022 Huawei Technologies Co.,Ltd. + * + * openGauss is licensed under Mulan PSL v2. + * You can use this software according to the terms and conditions of the Mulan PSL v2. + * You may obtain a copy of Mulan PSL v2 at: + * + * http://license.coscl.org.cn/MulanPSL2 + * + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PSL v2 for more details. + */ + +package org.opengauss.datachecker.extract.resource; + +import org.apache.logging.log4j.Logger; +import org.opengauss.datachecker.common.config.ConfigCache; +import org.opengauss.datachecker.common.constant.ConfigConstants; +import org.opengauss.datachecker.common.util.LogUtils; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * ConnectionMgr + * + * @author :wangchao + * @date :Created in 2024/1/17 + * @since :11 + */ +public class ConnectionMgr { + private static final Logger log = LogUtils.getLogger(); + private static String driverClassName = ""; + private static String url = ""; + private static String username = ""; + private static String databasePassport = ""; + private static AtomicBoolean isFirstLoad = new AtomicBoolean(true); + + /** + * 获取JDBC链接 + * + * @return Connection + */ + public static synchronized Connection getConnection() { + if (isFirstLoad.get()) { + driverClassName = getPropertyValue(ConfigConstants.DRIVER_CLASS_NAME); + url = getPropertyValue(ConfigConstants.DS_URL); + username = getPropertyValue(ConfigConstants.DS_USER_NAME); + databasePassport = getPropertyValue(ConfigConstants.DS_PASSWORD); + try { + log.info("connection class loader ,[{}],[{}]", driverClassName, url); + Class.forName(driverClassName); + isFirstLoad.set(false); + } catch (ClassNotFoundException e) { + log.error("load driverClassName {} ", driverClassName, e); + } + } + Connection conn = null; + try { + conn = DriverManager.getConnection(url, username, databasePassport); + conn.setAutoCommit(false); + log.debug("Connection succeed!"); + } catch (SQLException exp) { + log.error("create connection [{},{}]:[{}]", username, databasePassport, url, exp); + } + return conn; + } + + private static String getPropertyValue(String key) { + return ConfigCache.getValue(key); + } + + /** + * 关闭数据库链接及 PreparedStatement、ResultSet结果集 + * + * @param connection connection + * @param ps PreparedStatement + * @param resultSet resultSet + */ + public static void close(Connection connection, PreparedStatement ps, ResultSet resultSet) { + if (resultSet != null) { + try { + resultSet.close(); + } catch (SQLException sql) { + sql.printStackTrace(); + } + } + if (ps != null) { + try { + ps.close(); + } catch (SQLException sql) { + sql.printStackTrace(); + } + } + if (connection != null) { + try { + connection.close(); + } catch (SQLException sql) { + sql.printStackTrace(); + } + } + } +} diff --git a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/resource/JdbcDataOperations.java b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/resource/JdbcDataOperations.java index c8ebd187193f50a79237667dbb1dc138b020d2da..4569ad473d8cd196857466dd01dff5ab7ea0b5e8 100644 --- a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/resource/JdbcDataOperations.java +++ b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/resource/JdbcDataOperations.java @@ -23,9 +23,7 @@ import org.opengauss.datachecker.common.entry.enums.DataBaseType; import org.opengauss.datachecker.common.exception.ExtractDataAccessException; import org.opengauss.datachecker.common.util.LogUtils; import org.opengauss.datachecker.common.util.ThreadUtil; -import org.springframework.jdbc.datasource.DataSourceUtils; -import javax.sql.DataSource; import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.SQLException; @@ -43,29 +41,37 @@ public class JdbcDataOperations { private static final String OPEN_GAUSS_PARALLEL_QUERY = "set query_dop to %s;"; private static final String OPEN_GAUSS_EXTRA_FLOAT_DIGITS = "set extra_float_digits to 0;"; private static final int LOG_WAIT_TIMES = 600; + private static final String DOLPHIN_B_COMPATIBILITY_MODE_ON = "set dolphin.b_compatibility_mode to on;"; - private final DataSource jdbcDataSource; + private final boolean isOpenGauss; + private final boolean isOgCompatibilityB; private final ResourceManager resourceManager; private final boolean isForceRefreshConnectionSqlMode; + private String sqlModeRefreshStatement = ""; /** * constructor * - * @param jdbcDataSource datasource * @param resourceManager resourceManager */ - public JdbcDataOperations(DataSource jdbcDataSource, ResourceManager resourceManager) { - this.jdbcDataSource = jdbcDataSource; + public JdbcDataOperations(ResourceManager resourceManager) { this.resourceManager = resourceManager; this.isForceRefreshConnectionSqlMode = ConfigCache.getBooleanValue(ConfigConstants.SQL_MODE_FORCE_REFRESH); + this.isOpenGauss = checkDatabaseIsOpenGauss(); + this.isOgCompatibilityB = isOpenGauss ? ConfigCache.getBooleanValue(ConfigConstants.OG_COMPATIBILITY_B) : false; initSqlModeRefreshStatement(); } + private boolean checkDatabaseIsOpenGauss() { + return Objects.equals(DataBaseType.OG, + ConfigCache.getValue(ConfigConstants.DATA_BASE_TYPE, DataBaseType.class)); + } + private void initSqlModeRefreshStatement() { if (isForceRefreshConnectionSqlMode) { String sqlMode = ConfigCache.getValue(ConfigConstants.SQL_MODE_VALUE_CACHE); - if (ConfigCache.getBooleanValue(ConfigConstants.OG_COMPATIBILITY_B)) { + if (isOgCompatibilityB) { // openGauss compatibility B set database sql mode must be set dolphin.sql_mode sqlModeRefreshStatement = "set dolphin.sql_mode ='" + sqlMode + "'"; } else { @@ -80,9 +86,8 @@ public class JdbcDataOperations { * * @param allocMemory allocMemory * @return Connection - * @throws SQLException SQLException */ - public synchronized Connection tryConnectionAndClosedAutoCommit(long allocMemory) throws SQLException { + public synchronized Connection tryConnectionAndClosedAutoCommit(long allocMemory) { takeConnection(allocMemory); return getConnectionAndClosedAutoCommit(); } @@ -91,32 +96,31 @@ public class JdbcDataOperations { * try to get a jdbc connection and close auto commit. * * @return Connection - * @throws SQLException SQLException */ - public synchronized Connection tryConnectionAndClosedAutoCommit() throws SQLException { + public synchronized Connection tryConnectionAndClosedAutoCommit() { takeConnection(0); return getConnectionAndClosedAutoCommit(); } - private Connection getConnectionAndClosedAutoCommit() throws SQLException { + private Connection getConnectionAndClosedAutoCommit() { if (isShutdown()) { String message = "extract service is shutdown ,task of table is canceled!"; throw new ExtractDataAccessException(message); } - Connection connection = jdbcDataSource.getConnection(); - if (connection.getAutoCommit()) { - connection.setAutoCommit(false); - setExtraFloatDigitsParameter(connection); - } - if (isForceRefreshConnectionSqlMode && StringUtils.isNotEmpty(sqlModeRefreshStatement)) { - execute(connection, sqlModeRefreshStatement); - } + Connection connection = ConnectionMgr.getConnection(); + initJdbcConnectionEnvParameter(connection); return connection; } - public void setExtraFloatDigitsParameter(Connection connection) { - if (Objects.equals(DataBaseType.OG, ConfigCache.getValue(ConfigConstants.DATA_BASE_TYPE, DataBaseType.class))) { + private void initJdbcConnectionEnvParameter(Connection connection) { + if (isOpenGauss) { execute(connection, OPEN_GAUSS_EXTRA_FLOAT_DIGITS); + if (isOgCompatibilityB) { + execute(connection, DOLPHIN_B_COMPATIBILITY_MODE_ON); + } + } + if (isForceRefreshConnectionSqlMode && StringUtils.isNotEmpty(sqlModeRefreshStatement)) { + execute(connection, sqlModeRefreshStatement); } } @@ -136,7 +140,7 @@ public class JdbcDataOperations { */ public synchronized void releaseConnection(Connection connection) { resourceManager.release(); - DataSourceUtils.releaseConnection(connection, jdbcDataSource); + ConnectionMgr.close(connection, null, null); } /** @@ -146,7 +150,7 @@ public class JdbcDataOperations { * @throws SQLException SQLException */ public void enableDatabaseParallelQuery(Connection connection, int queryDop) throws SQLException { - if (Objects.equals(DataBaseType.OG, ConfigCache.getValue(ConfigConstants.DATA_BASE_TYPE, DataBaseType.class))) { + if (isOpenGauss) { try (PreparedStatement ps = connection.prepareStatement( String.format(OPEN_GAUSS_PARALLEL_QUERY, queryDop))) { ps.execute(); diff --git a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/service/ConfigManagement.java b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/service/ConfigManagement.java index 66ae82f814abf6898d67b063407eb92960dfe911..df704c06541b1d5bff4299e4937b87a175b8a3aa 100644 --- a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/service/ConfigManagement.java +++ b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/service/ConfigManagement.java @@ -58,6 +58,14 @@ public class ConfigManagement { @Value("${spring.check.extend-maximum-pool-size}") private int extendMaxPoolSize = 10; + @Value("${spring.datasource.driver-class-name}") + private String driverClassName; + @Value("${spring.datasource.url}") + private String dsUrl; + @Value("${spring.datasource.username}") + private String dsUname; + @Value("${spring.datasource.password}") + private String dsPw; @Value("${spring.datasource.druid.initialSize}") private int initialSize; @Value("${spring.datasource.druid.minIdle}") @@ -132,6 +140,11 @@ public class ConfigManagement { ConfigCache.put(ConfigConstants.DRUID_MAX_ACTIVE, 100); return; } + ConfigCache.put(ConfigConstants.DRIVER_CLASS_NAME, driverClassName); + ConfigCache.put(ConfigConstants.DS_URL, dsUrl); + ConfigCache.put(ConfigConstants.DS_USER_NAME, dsUname); + ConfigCache.put(ConfigConstants.DS_PASSWORD, dsPw); + ConfigCache.put(ConfigConstants.DRUID_INITIAL_SIZE, initialSize); ConfigCache.put(ConfigConstants.DRUID_MIN_IDLE, minIdle); ConfigCache.put(ConfigConstants.DRUID_MAX_ACTIVE, maxActive); diff --git a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/slice/SliceProcessorContext.java b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/slice/SliceProcessorContext.java index 4fd28cf0e2e80c6e0de2c65a02459704aed818d2..434126075a79c19c55ce7a88472148393c156bae 100644 --- a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/slice/SliceProcessorContext.java +++ b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/slice/SliceProcessorContext.java @@ -15,7 +15,6 @@ package org.opengauss.datachecker.extract.slice; -import com.alibaba.druid.pool.DruidDataSource; import org.opengauss.datachecker.common.config.ConfigCache; import org.opengauss.datachecker.common.entry.extract.SliceExtend; import org.opengauss.datachecker.common.entry.extract.SliceVo; @@ -36,11 +35,9 @@ import org.opengauss.datachecker.extract.task.sql.FullQueryStatement; import org.opengauss.datachecker.extract.task.sql.QueryStatementFactory; import org.opengauss.datachecker.extract.task.sql.SliceQueryStatement; import org.springframework.kafka.core.KafkaTemplate; -import org.springframework.retry.annotation.Retryable; import org.springframework.stereotype.Component; import javax.annotation.Resource; -import javax.sql.DataSource; import java.util.Objects; /** @@ -105,11 +102,10 @@ public class SliceProcessorContext { /** * ceeate jdbc data operations * - * @param dataSource datasource * @return JdbcDataOperations */ - public JdbcDataOperations getJdbcDataOperations(DataSource dataSource) { - return new JdbcDataOperations(dataSource, resourceManager); + public JdbcDataOperations getJdbcDataOperations() { + return new JdbcDataOperations(resourceManager); } /** diff --git a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/slice/factory/SliceFactory.java b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/slice/factory/SliceFactory.java index c586200875b048837987c72983083b934a0904f1..444ddd74dcc2001eeaa71b311e0be0aa8b364b75 100644 --- a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/slice/factory/SliceFactory.java +++ b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/slice/factory/SliceFactory.java @@ -60,14 +60,21 @@ public class SliceFactory { if (Objects.isNull(datasource)) { return new CsvSliceProcessor(sliceVo, processorContext); } - return new JdbcSliceProcessor(datasource, sliceVo, processorContext); + return new JdbcSliceProcessor(sliceVo, processorContext); } + /** + * create slice processor
+ * + * @param table table + * @param tableFilePaths tableFilePaths + * @return SliceProcessor + */ public SliceProcessor createTableProcessor(String table, List tableFilePaths) { SliceProcessorContext processorContext = SpringUtil.getBean(SliceProcessorContext.class); if (Objects.isNull(datasource)) { return new CsvTableProcessor(table, tableFilePaths, processorContext); } - return new JdbcTableProcessor(datasource, table, processorContext); + return new JdbcTableProcessor(table, processorContext); } } diff --git a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/slice/process/JdbcSliceProcessor.java b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/slice/process/JdbcSliceProcessor.java index dc22cf0a953666fbd281fa6f0f88f877fd8efc58..10eb23f81769c38baeaef4c912cafb3c0c368880 100644 --- a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/slice/process/JdbcSliceProcessor.java +++ b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/slice/process/JdbcSliceProcessor.java @@ -21,7 +21,6 @@ import org.opengauss.datachecker.common.entry.extract.TableMetadata; import org.opengauss.datachecker.common.exception.ExtractDataAccessException; import org.opengauss.datachecker.extract.resource.JdbcDataOperations; import org.opengauss.datachecker.extract.slice.SliceProcessorContext; -import org.opengauss.datachecker.extract.slice.common.SliceKafkaAgents; import org.opengauss.datachecker.extract.slice.common.SliceResultSetSender; import org.opengauss.datachecker.extract.task.sql.FullQueryStatement; import org.opengauss.datachecker.extract.task.sql.QuerySqlEntry; @@ -29,7 +28,6 @@ import org.opengauss.datachecker.extract.task.sql.SliceQueryStatement; import org.springframework.kafka.support.SendResult; import org.springframework.util.concurrent.ListenableFuture; -import javax.sql.DataSource; import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.ResultSet; @@ -40,6 +38,7 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.TreeMap; +import java.util.concurrent.atomic.AtomicInteger; /** * JdbcSliceProcessor @@ -49,23 +48,32 @@ import java.util.TreeMap; * @since :11 */ public class JdbcSliceProcessor extends AbstractSliceProcessor { - private final JdbcDataOperations jdbcOperation; + private final String table; + private final AtomicInteger rowCount = new AtomicInteger(0); - public JdbcSliceProcessor(DataSource datasource, SliceVo slice, SliceProcessorContext context) { + /** + * JdbcSliceProcessor + * + * @param slice slice + * @param context context + */ + public JdbcSliceProcessor(SliceVo slice, SliceProcessorContext context) { super(slice, context); - this.jdbcOperation = context.getJdbcDataOperations(datasource); + this.jdbcOperation = context.getJdbcDataOperations(); + this.table = slice.getTable(); + this.rowCount.set(0); } @Override public void run() { log.info("table slice [{}] is beginning to extract data", slice.toSimpleString()); try { - TableMetadata tableMetadata = context.getTableMetaData(slice.getTable()); + TableMetadata tableMetadata = context.getTableMetaData(table); SliceExtend sliceExtend = createSliceExtend(tableMetadata.getTableHash()); if (!slice.isEmptyTable()) { QuerySqlEntry queryStatement = createQueryStatement(tableMetadata); - log.debug("table [{}] query statement : {}", queryStatement.getTable(), queryStatement.getSql()); + log.debug("table [{}] query statement : {}", table, queryStatement.getSql()); executeQueryStatement(queryStatement, tableMetadata, sliceExtend); } else { log.info("table slice [{}] is empty ", slice.toSimpleString()); @@ -79,7 +87,7 @@ public class JdbcSliceProcessor extends AbstractSliceProcessor { } } - protected QuerySqlEntry createQueryStatement(TableMetadata tableMetadata) { + private QuerySqlEntry createQueryStatement(TableMetadata tableMetadata) { if (slice.isSlice()) { SliceQueryStatement sliceStatement = context.createSliceQueryStatement(); return sliceStatement.buildSlice(tableMetadata, slice); @@ -89,10 +97,9 @@ public class JdbcSliceProcessor extends AbstractSliceProcessor { } } - protected SliceExtend executeQueryStatement(QuerySqlEntry sqlEntry, TableMetadata tableMetadata, + private SliceExtend executeQueryStatement(QuerySqlEntry sqlEntry, TableMetadata tableMetadata, SliceExtend sliceExtend) { final LocalDateTime start = LocalDateTime.now(); - int rowCount = 0; Connection connection = null; long jdbcQueryCost = 0; long sendDataCost = 0; @@ -103,8 +110,8 @@ public class JdbcSliceProcessor extends AbstractSliceProcessor { long estimatedMemorySize = estimatedMemorySize(tableMetadata.getAvgRowLength(), estimatedRowCount); connection = jdbcOperation.tryConnectionAndClosedAutoCommit(estimatedMemorySize); log.debug("slice [{}] fetch jdbc connection.", slice.getName()); - SliceKafkaAgents kafkaAgents = context.createSliceKafkaAgents(slice); - SliceResultSetSender sliceSender = new SliceResultSetSender(tableMetadata, kafkaAgents); + SliceResultSetSender sliceSender = createSliceResultSetSender(tableMetadata); + sliceExtend.setStartOffset(sliceSender.checkOffsetEnd()); try (PreparedStatement ps = connection.prepareStatement(sqlEntry.getSql()); ResultSet resultSet = ps.executeQuery()) { @@ -112,27 +119,8 @@ public class JdbcSliceProcessor extends AbstractSliceProcessor { LocalDateTime jdbcQuery = LocalDateTime.now(); jdbcQueryCost = durationBetweenToMillis(start, jdbcQuery); resultSet.setFetchSize(FETCH_SIZE); - ResultSetMetaData rsmd = resultSet.getMetaData(); - sliceExtend.setStartOffset(sliceSender.checkOffsetEnd()); - Map result = new TreeMap<>(); - List offsetList = new LinkedList<>(); - List>> batchFutures = new LinkedList<>(); - final String tableName = slice.getTable(); - while (resultSet.next()) { - rowCount++; - batchFutures.add(sliceSender.resultSetTranslateAndSendSync(tableName, rsmd, resultSet, result, slice.getNo())); - if (batchFutures.size() == FETCH_SIZE) { - offsetList.add(getBatchFutureRecordOffsetScope(batchFutures)); - batchFutures.clear(); - } - } - if (batchFutures.size() > 0) { - offsetList.add(getBatchFutureRecordOffsetScope(batchFutures)); - batchFutures.clear(); - } - sliceExtend.setStartOffset(getMinOffset(offsetList)); - sliceExtend.setEndOffset(getMaxOffset(offsetList)); - sliceExtend.setCount(rowCount); + List offsetList = sliceQueryResultAndSendSync(sliceSender, resultSet); + updateExtendSliceOffsetAndCount(sliceExtend, rowCount.get(), offsetList); sendDataCost = durationBetweenToMillis(jdbcQuery, LocalDateTime.now()); } return sliceExtend; @@ -146,4 +134,35 @@ public class JdbcSliceProcessor extends AbstractSliceProcessor { sendDataCost, sliceAllCost); } } + + private List sliceQueryResultAndSendSync(SliceResultSetSender sliceSender, ResultSet resultSet) + throws SQLException { + ResultSetMetaData rsmd = resultSet.getMetaData(); + Map result = new TreeMap<>(); + List offsetList = new LinkedList<>(); + List>> batchFutures = new LinkedList<>(); + while (resultSet.next()) { + this.rowCount.incrementAndGet(); + batchFutures.add(sliceSender.resultSetTranslateAndSendSync(table, rsmd, resultSet, result, slice.getNo())); + if (batchFutures.size() == FETCH_SIZE) { + offsetList.add(getBatchFutureRecordOffsetScope(batchFutures)); + batchFutures.clear(); + } + } + if (batchFutures.size() > 0) { + offsetList.add(getBatchFutureRecordOffsetScope(batchFutures)); + batchFutures.clear(); + } + return offsetList; + } + + private SliceResultSetSender createSliceResultSetSender(TableMetadata tableMetadata) { + return new SliceResultSetSender(tableMetadata, context.createSliceKafkaAgents(slice)); + } + + private void updateExtendSliceOffsetAndCount(SliceExtend sliceExtend, int rowCount, List offsetList) { + sliceExtend.setStartOffset(getMinOffset(offsetList)); + sliceExtend.setEndOffset(getMaxOffset(offsetList)); + sliceExtend.setCount(rowCount); + } } diff --git a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/slice/process/JdbcTableProcessor.java b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/slice/process/JdbcTableProcessor.java index c92c60d19e30225b2324266b0de524b272b92ff3..f3a1fda05ef5c3716a95481be01b689d24d8458f 100644 --- a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/slice/process/JdbcTableProcessor.java +++ b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/slice/process/JdbcTableProcessor.java @@ -24,7 +24,6 @@ import org.opengauss.datachecker.extract.task.sql.AutoSliceQueryStatement; import org.opengauss.datachecker.extract.task.sql.FullQueryStatement; import org.opengauss.datachecker.extract.task.sql.QuerySqlEntry; -import javax.sql.DataSource; import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.ResultSet; @@ -48,9 +47,15 @@ public class JdbcTableProcessor extends AbstractTableProcessor { private SliceResultSetSender sliceSender; - public JdbcTableProcessor(DataSource datasource, String table, SliceProcessorContext context) { + /** + * JdbcTableProcessor + * + * @param table table + * @param context context + */ + public JdbcTableProcessor(String table, SliceProcessorContext context) { super(table, context); - this.jdbcOperation = context.getJdbcDataOperations(datasource); + this.jdbcOperation = context.getJdbcDataOperations(); } @Override @@ -110,7 +115,8 @@ public class JdbcTableProcessor extends AbstractTableProcessor { } finally { jdbcOperation.releaseConnection(connection); log.info("query table [{}] row-count [{}] cost [{}] milliseconds", table, tableRowCount, - Duration.between(start, LocalDateTime.now()).toMillis()); + Duration.between(start, LocalDateTime.now()) + .toMillis()); } return tableRowCount; }