diff --git a/datachecker-common/src/main/java/org/opengauss/datachecker/common/constant/ConfigConstants.java b/datachecker-common/src/main/java/org/opengauss/datachecker/common/constant/ConfigConstants.java
index 442520a28134655a8dc79445e883217fe6f966b2..f45ecd837a9639fc4a6ea3772c65c17268aec400 100644
--- a/datachecker-common/src/main/java/org/opengauss/datachecker/common/constant/ConfigConstants.java
+++ b/datachecker-common/src/main/java/org/opengauss/datachecker/common/constant/ConfigConstants.java
@@ -23,64 +23,248 @@ package org.opengauss.datachecker.common.constant;
* @since :11
*/
public interface ConfigConstants {
+ /**
+ * csv config : spring.csv.schema
+ */
String CSV_SCHEMA = "spring.csv.schema";
+
+ /**
+ * csv config : spring.csv.path
+ */
String CSV_PATH = "spring.csv.path";
+
+ /**
+ * csv config : spring.csv.sync
+ */
String CSV_SYNC = "spring.csv.sync";
+
+ /**
+ * csv config : spring.csv.path:data
+ */
String CSV_DATA_PATH = "spring.csv.path:data";
+
+ /**
+ * csv config : spring.csv.path:reader
+ */
String CSV_READER_PATH = "spring.csv.path:reader";
+
+ /**
+ * csv config : spring.csv.path:writer
+ */
String CSV_WRITER_PATH = "spring.csv.path:writer";
+
+ /**
+ * csv config : spring.csv.schema-tables
+ */
String CSV_SCHEMA_TABLES_PATH = "spring.csv.schema-tables";
+
+ /**
+ * csv config : spring.csv.schema-columns
+ */
String CSV_SCHEMA_COLUMNS_PATH = "spring.csv.schema-columns";
+
+ /**
+ * csv config : spring.csv.sleep-interval
+ */
String CSV_SLEEP_INTERVAL = "spring.csv.sleep-interval";
+
+ /**
+ * csv config : spring.csv.task-dispatcher-interval
+ */
String CSV_TASK_DISPATCHER_INTERVAL = "spring.csv.task-dispatcher-interval";
+
+ /**
+ * csv config : spring.csv.max-dispatcher-size
+ */
String CSV_MAX_DISPATCHER_SIZE = "spring.csv.max-dispatcher-size";
+ /**
+ * check_mode
+ */
String CHECK_MODE = "check_mode";
+
+ /**
+ * check.process.id
+ */
String PROCESS_NO = "check.process.id";
+
+ /**
+ * check.start-time
+ */
String START_LOCAL_TIME = "check.start-time";
+
+ /**
+ * jdbc.result-set.fetch-size
+ */
String FETCH_SIZE = "jdbc.result-set.fetch-size";
+ /**
+ * spring.extract.endpoint
+ */
String ENDPOINT = "spring.extract.endpoint";
+
+ /**
+ * spring.extract.query-dop
+ */
String QUERY_DOP = "spring.extract.query-dop";
+
+ /**
+ * spring.extract.databaseType
+ */
String DATA_BASE_TYPE = "spring.extract.databaseType";
- String OBJECT_SIZE_EXPANSION_FACTOR = "spring.extract.object-size-expansion-factor";
+ /**
+ * spring.extract.object-size-expansion-factor
+ */
+ String OBJECT_SIZE_EXPANSION_FACTOR = "spring.extract.object-size-expansion-factor";
+ /**
+ * spring.memory-monitor-enable
+ */
String MEMORY_MONITOR = "spring.memory-monitor-enable";
+ /**
+ * spring.check.maximum-table-slice-size
+ */
String MAXIMUM_TABLE_SLICE_SIZE = "spring.check.maximum-table-slice-size";
+
+ /**
+ * spring.check.maximum-topic-size
+ */
String MAXIMUM_TOPIC_SIZE = "spring.check.maximum-topic-size";
+
+ /**
+ * spring.check.floating-point-data-supply-zero
+ */
String FLOATING_POINT_DATA_SUPPLY_ZERO = "spring.check.floating-point-data-supply-zero";
/**
* with table slice check, config the maximum number of threads in the thread pool
*/
String EXTEND_MAXIMUM_POOL_SIZE = "spring.check.extend-maximum-pool-size";
+
+ /**
+ * data.check.bucket-expect-capacity
+ */
String BUCKET_CAPACITY = "data.check.bucket-expect-capacity";
+
+ /**
+ * data.check.data-path
+ */
String CHECK_PATH = "data.check.data-path";
+
+ /**
+ * data.check.sql_mode_pad_char_to_full_length
+ */
String SQL_MODE_PAD_CHAR_TO_FULL_LENGTH = "data.check.sql_mode_pad_char_to_full_length";
+
+ /**
+ * pad_char_to_full_length
+ */
String SQL_MODE_NAME_PAD_CHAR_TO_FULL_LENGTH = "pad_char_to_full_length";
+
+ /**
+ * data.check.sql_mode_value_cache
+ */
String SQL_MODE_VALUE_CACHE = "data.check.sql_mode_value_cache";
+
+ /**
+ * data.check.sql_mode_force_refresh
+ */
String SQL_MODE_FORCE_REFRESH = "data.check.sql_mode_force_refresh";
+ /**
+ * spring.kafka.bootstrap-servers
+ */
String KAFKA_SERVERS = "spring.kafka.bootstrap-servers";
+
+ /**
+ * spring.kafka.consumer.enable-auto-commit
+ */
String KAFKA_AUTO_COMMIT = "spring.kafka.consumer.enable-auto-commit";
+
+ /**
+ * spring.kafka.consumer.group-id
+ */
String KAFKA_DEFAULT_GROUP_ID = "spring.kafka.consumer.group-id";
+
+ /**
+ * spring.kafka.consumer.auto-offset-reset
+ */
String KAFKA_AUTO_OFFSET_RESET = "spring.kafka.consumer.auto-offset-reset";
+
+ /**
+ * spring.kafka.consumer.request-timeout-ms
+ */
String KAFKA_REQUEST_TIMEOUT = "spring.kafka.consumer.request-timeout-ms";
+
+ /**
+ * spring.kafka.consumer.fetch-max-bytes
+ */
String KAFKA_FETCH_MAX_BYTES = "spring.kafka.consumer.fetch-max-bytes";
+
+ /**
+ * spring.kafka.consumer.max-poll-records
+ */
String KAFKA_MAX_POLL_RECORDS = "spring.kafka.consumer.max-poll-records";
+ /**
+ * spring.datasource.driver-class-name
+ */
+ String DRIVER_CLASS_NAME = "spring.datasource.driver-class-name";
+
+ /**
+ * spring.datasource.url
+ */
+ String DS_URL = "spring.datasource.url";
+
+ /**
+ * spring.datasource.username
+ */
+ String DS_USER_NAME = "spring.datasource.username";
+
+ /**
+ * spring.datasource.password
+ */
+ String DS_PASSWORD = "spring.datasource.password";
+
+ /**
+ * spring.datasource.druid.initialSize
+ */
String DRUID_INITIAL_SIZE = "spring.datasource.druid.initialSize";
+
+ /**
+ * spring.datasource.druid.minIdle
+ */
String DRUID_MIN_IDLE = "spring.datasource.druid.minIdle";
+
+ /**
+ * spring.datasource.druid.maxActive
+ */
String DRUID_MAX_ACTIVE = "spring.datasource.druid.maxActive";
+
+ /**
+ * spring.datasource.druid.max-wait
+ */
String DRUID_MAX_WAIT = "spring.datasource.druid.max-wait";
+
+ /**
+ * spring.datasource.druid.validationQuery
+ */
String DRUID_VALIDATION_QUERY = "spring.datasource.druid.validationQuery";
+
+ /**
+ * spring.datasource.druid.min-evictable-idle-time-millis
+ */
String DRUID_MIN_EVICTABLE_IDLE_TIME_MILLIS = "spring.datasource.druid.min-evictable-idle-time-millis";
/**
* lifecycle graceful shutdown wait time
*/
String TIMEOUT_PER_SHUTDOWN_PHASE = "spring.lifecycle.timeout-per-shutdown-phase";
+
+ /**
+ * openGauss.Compatibility.B
+ */
String OG_COMPATIBILITY_B = "openGauss.Compatibility.B";
}
diff --git a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/resource/ConnectionMgr.java b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/resource/ConnectionMgr.java
new file mode 100644
index 0000000000000000000000000000000000000000..cd46f3834430bf894b84e0b524bca4e61f184dbc
--- /dev/null
+++ b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/resource/ConnectionMgr.java
@@ -0,0 +1,109 @@
+/*
+ * Copyright (c) 2022-2022 Huawei Technologies Co.,Ltd.
+ *
+ * openGauss is licensed under Mulan PSL v2.
+ * You can use this software according to the terms and conditions of the Mulan PSL v2.
+ * You may obtain a copy of Mulan PSL v2 at:
+ *
+ * http://license.coscl.org.cn/MulanPSL2
+ *
+ * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
+ * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
+ * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
+ * See the Mulan PSL v2 for more details.
+ */
+
+package org.opengauss.datachecker.extract.resource;
+
+import org.apache.logging.log4j.Logger;
+import org.opengauss.datachecker.common.config.ConfigCache;
+import org.opengauss.datachecker.common.constant.ConfigConstants;
+import org.opengauss.datachecker.common.util.LogUtils;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * ConnectionMgr
+ *
+ * @author :wangchao
+ * @date :Created in 2024/1/17
+ * @since :11
+ */
+public class ConnectionMgr {
+ private static final Logger log = LogUtils.getLogger();
+ private static String driverClassName = "";
+ private static String url = "";
+ private static String username = "";
+ private static String databasePassport = "";
+ private static AtomicBoolean isFirstLoad = new AtomicBoolean(true);
+
+ /**
+ * 获取JDBC链接
+ *
+ * @return Connection
+ */
+ public static synchronized Connection getConnection() {
+ if (isFirstLoad.get()) {
+ driverClassName = getPropertyValue(ConfigConstants.DRIVER_CLASS_NAME);
+ url = getPropertyValue(ConfigConstants.DS_URL);
+ username = getPropertyValue(ConfigConstants.DS_USER_NAME);
+ databasePassport = getPropertyValue(ConfigConstants.DS_PASSWORD);
+ try {
+ log.info("connection class loader ,[{}],[{}]", driverClassName, url);
+ Class.forName(driverClassName);
+ isFirstLoad.set(false);
+ } catch (ClassNotFoundException e) {
+ log.error("load driverClassName {} ", driverClassName, e);
+ }
+ }
+ Connection conn = null;
+ try {
+ conn = DriverManager.getConnection(url, username, databasePassport);
+ conn.setAutoCommit(false);
+ log.debug("Connection succeed!");
+ } catch (SQLException exp) {
+ log.error("create connection [{},{}]:[{}]", username, databasePassport, url, exp);
+ }
+ return conn;
+ }
+
+ private static String getPropertyValue(String key) {
+ return ConfigCache.getValue(key);
+ }
+
+ /**
+ * 关闭数据库链接及 PreparedStatement、ResultSet结果集
+ *
+ * @param connection connection
+ * @param ps PreparedStatement
+ * @param resultSet resultSet
+ */
+ public static void close(Connection connection, PreparedStatement ps, ResultSet resultSet) {
+ if (resultSet != null) {
+ try {
+ resultSet.close();
+ } catch (SQLException sql) {
+ sql.printStackTrace();
+ }
+ }
+ if (ps != null) {
+ try {
+ ps.close();
+ } catch (SQLException sql) {
+ sql.printStackTrace();
+ }
+ }
+ if (connection != null) {
+ try {
+ connection.close();
+ } catch (SQLException sql) {
+ sql.printStackTrace();
+ }
+ }
+ }
+}
diff --git a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/resource/JdbcDataOperations.java b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/resource/JdbcDataOperations.java
index c8ebd187193f50a79237667dbb1dc138b020d2da..4569ad473d8cd196857466dd01dff5ab7ea0b5e8 100644
--- a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/resource/JdbcDataOperations.java
+++ b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/resource/JdbcDataOperations.java
@@ -23,9 +23,7 @@ import org.opengauss.datachecker.common.entry.enums.DataBaseType;
import org.opengauss.datachecker.common.exception.ExtractDataAccessException;
import org.opengauss.datachecker.common.util.LogUtils;
import org.opengauss.datachecker.common.util.ThreadUtil;
-import org.springframework.jdbc.datasource.DataSourceUtils;
-import javax.sql.DataSource;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
@@ -43,29 +41,37 @@ public class JdbcDataOperations {
private static final String OPEN_GAUSS_PARALLEL_QUERY = "set query_dop to %s;";
private static final String OPEN_GAUSS_EXTRA_FLOAT_DIGITS = "set extra_float_digits to 0;";
private static final int LOG_WAIT_TIMES = 600;
+ private static final String DOLPHIN_B_COMPATIBILITY_MODE_ON = "set dolphin.b_compatibility_mode to on;";
- private final DataSource jdbcDataSource;
+ private final boolean isOpenGauss;
+ private final boolean isOgCompatibilityB;
private final ResourceManager resourceManager;
private final boolean isForceRefreshConnectionSqlMode;
+
private String sqlModeRefreshStatement = "";
/**
* constructor
*
- * @param jdbcDataSource datasource
* @param resourceManager resourceManager
*/
- public JdbcDataOperations(DataSource jdbcDataSource, ResourceManager resourceManager) {
- this.jdbcDataSource = jdbcDataSource;
+ public JdbcDataOperations(ResourceManager resourceManager) {
this.resourceManager = resourceManager;
this.isForceRefreshConnectionSqlMode = ConfigCache.getBooleanValue(ConfigConstants.SQL_MODE_FORCE_REFRESH);
+ this.isOpenGauss = checkDatabaseIsOpenGauss();
+ this.isOgCompatibilityB = isOpenGauss ? ConfigCache.getBooleanValue(ConfigConstants.OG_COMPATIBILITY_B) : false;
initSqlModeRefreshStatement();
}
+ private boolean checkDatabaseIsOpenGauss() {
+ return Objects.equals(DataBaseType.OG,
+ ConfigCache.getValue(ConfigConstants.DATA_BASE_TYPE, DataBaseType.class));
+ }
+
private void initSqlModeRefreshStatement() {
if (isForceRefreshConnectionSqlMode) {
String sqlMode = ConfigCache.getValue(ConfigConstants.SQL_MODE_VALUE_CACHE);
- if (ConfigCache.getBooleanValue(ConfigConstants.OG_COMPATIBILITY_B)) {
+ if (isOgCompatibilityB) {
// openGauss compatibility B set database sql mode must be set dolphin.sql_mode
sqlModeRefreshStatement = "set dolphin.sql_mode ='" + sqlMode + "'";
} else {
@@ -80,9 +86,8 @@ public class JdbcDataOperations {
*
* @param allocMemory allocMemory
* @return Connection
- * @throws SQLException SQLException
*/
- public synchronized Connection tryConnectionAndClosedAutoCommit(long allocMemory) throws SQLException {
+ public synchronized Connection tryConnectionAndClosedAutoCommit(long allocMemory) {
takeConnection(allocMemory);
return getConnectionAndClosedAutoCommit();
}
@@ -91,32 +96,31 @@ public class JdbcDataOperations {
* try to get a jdbc connection and close auto commit.
*
* @return Connection
- * @throws SQLException SQLException
*/
- public synchronized Connection tryConnectionAndClosedAutoCommit() throws SQLException {
+ public synchronized Connection tryConnectionAndClosedAutoCommit() {
takeConnection(0);
return getConnectionAndClosedAutoCommit();
}
- private Connection getConnectionAndClosedAutoCommit() throws SQLException {
+ private Connection getConnectionAndClosedAutoCommit() {
if (isShutdown()) {
String message = "extract service is shutdown ,task of table is canceled!";
throw new ExtractDataAccessException(message);
}
- Connection connection = jdbcDataSource.getConnection();
- if (connection.getAutoCommit()) {
- connection.setAutoCommit(false);
- setExtraFloatDigitsParameter(connection);
- }
- if (isForceRefreshConnectionSqlMode && StringUtils.isNotEmpty(sqlModeRefreshStatement)) {
- execute(connection, sqlModeRefreshStatement);
- }
+ Connection connection = ConnectionMgr.getConnection();
+ initJdbcConnectionEnvParameter(connection);
return connection;
}
- public void setExtraFloatDigitsParameter(Connection connection) {
- if (Objects.equals(DataBaseType.OG, ConfigCache.getValue(ConfigConstants.DATA_BASE_TYPE, DataBaseType.class))) {
+ private void initJdbcConnectionEnvParameter(Connection connection) {
+ if (isOpenGauss) {
execute(connection, OPEN_GAUSS_EXTRA_FLOAT_DIGITS);
+ if (isOgCompatibilityB) {
+ execute(connection, DOLPHIN_B_COMPATIBILITY_MODE_ON);
+ }
+ }
+ if (isForceRefreshConnectionSqlMode && StringUtils.isNotEmpty(sqlModeRefreshStatement)) {
+ execute(connection, sqlModeRefreshStatement);
}
}
@@ -136,7 +140,7 @@ public class JdbcDataOperations {
*/
public synchronized void releaseConnection(Connection connection) {
resourceManager.release();
- DataSourceUtils.releaseConnection(connection, jdbcDataSource);
+ ConnectionMgr.close(connection, null, null);
}
/**
@@ -146,7 +150,7 @@ public class JdbcDataOperations {
* @throws SQLException SQLException
*/
public void enableDatabaseParallelQuery(Connection connection, int queryDop) throws SQLException {
- if (Objects.equals(DataBaseType.OG, ConfigCache.getValue(ConfigConstants.DATA_BASE_TYPE, DataBaseType.class))) {
+ if (isOpenGauss) {
try (PreparedStatement ps = connection.prepareStatement(
String.format(OPEN_GAUSS_PARALLEL_QUERY, queryDop))) {
ps.execute();
diff --git a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/service/ConfigManagement.java b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/service/ConfigManagement.java
index 66ae82f814abf6898d67b063407eb92960dfe911..df704c06541b1d5bff4299e4937b87a175b8a3aa 100644
--- a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/service/ConfigManagement.java
+++ b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/service/ConfigManagement.java
@@ -58,6 +58,14 @@ public class ConfigManagement {
@Value("${spring.check.extend-maximum-pool-size}")
private int extendMaxPoolSize = 10;
+ @Value("${spring.datasource.driver-class-name}")
+ private String driverClassName;
+ @Value("${spring.datasource.url}")
+ private String dsUrl;
+ @Value("${spring.datasource.username}")
+ private String dsUname;
+ @Value("${spring.datasource.password}")
+ private String dsPw;
@Value("${spring.datasource.druid.initialSize}")
private int initialSize;
@Value("${spring.datasource.druid.minIdle}")
@@ -132,6 +140,11 @@ public class ConfigManagement {
ConfigCache.put(ConfigConstants.DRUID_MAX_ACTIVE, 100);
return;
}
+ ConfigCache.put(ConfigConstants.DRIVER_CLASS_NAME, driverClassName);
+ ConfigCache.put(ConfigConstants.DS_URL, dsUrl);
+ ConfigCache.put(ConfigConstants.DS_USER_NAME, dsUname);
+ ConfigCache.put(ConfigConstants.DS_PASSWORD, dsPw);
+
ConfigCache.put(ConfigConstants.DRUID_INITIAL_SIZE, initialSize);
ConfigCache.put(ConfigConstants.DRUID_MIN_IDLE, minIdle);
ConfigCache.put(ConfigConstants.DRUID_MAX_ACTIVE, maxActive);
diff --git a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/slice/SliceProcessorContext.java b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/slice/SliceProcessorContext.java
index 4fd28cf0e2e80c6e0de2c65a02459704aed818d2..434126075a79c19c55ce7a88472148393c156bae 100644
--- a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/slice/SliceProcessorContext.java
+++ b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/slice/SliceProcessorContext.java
@@ -15,7 +15,6 @@
package org.opengauss.datachecker.extract.slice;
-import com.alibaba.druid.pool.DruidDataSource;
import org.opengauss.datachecker.common.config.ConfigCache;
import org.opengauss.datachecker.common.entry.extract.SliceExtend;
import org.opengauss.datachecker.common.entry.extract.SliceVo;
@@ -36,11 +35,9 @@ import org.opengauss.datachecker.extract.task.sql.FullQueryStatement;
import org.opengauss.datachecker.extract.task.sql.QueryStatementFactory;
import org.opengauss.datachecker.extract.task.sql.SliceQueryStatement;
import org.springframework.kafka.core.KafkaTemplate;
-import org.springframework.retry.annotation.Retryable;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
-import javax.sql.DataSource;
import java.util.Objects;
/**
@@ -105,11 +102,10 @@ public class SliceProcessorContext {
/**
* ceeate jdbc data operations
*
- * @param dataSource datasource
* @return JdbcDataOperations
*/
- public JdbcDataOperations getJdbcDataOperations(DataSource dataSource) {
- return new JdbcDataOperations(dataSource, resourceManager);
+ public JdbcDataOperations getJdbcDataOperations() {
+ return new JdbcDataOperations(resourceManager);
}
/**
diff --git a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/slice/factory/SliceFactory.java b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/slice/factory/SliceFactory.java
index c586200875b048837987c72983083b934a0904f1..444ddd74dcc2001eeaa71b311e0be0aa8b364b75 100644
--- a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/slice/factory/SliceFactory.java
+++ b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/slice/factory/SliceFactory.java
@@ -60,14 +60,21 @@ public class SliceFactory {
if (Objects.isNull(datasource)) {
return new CsvSliceProcessor(sliceVo, processorContext);
}
- return new JdbcSliceProcessor(datasource, sliceVo, processorContext);
+ return new JdbcSliceProcessor(sliceVo, processorContext);
}
+ /**
+ * create slice processor
+ *
+ * @param table table
+ * @param tableFilePaths tableFilePaths
+ * @return SliceProcessor
+ */
public SliceProcessor createTableProcessor(String table, List tableFilePaths) {
SliceProcessorContext processorContext = SpringUtil.getBean(SliceProcessorContext.class);
if (Objects.isNull(datasource)) {
return new CsvTableProcessor(table, tableFilePaths, processorContext);
}
- return new JdbcTableProcessor(datasource, table, processorContext);
+ return new JdbcTableProcessor(table, processorContext);
}
}
diff --git a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/slice/process/JdbcSliceProcessor.java b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/slice/process/JdbcSliceProcessor.java
index dc22cf0a953666fbd281fa6f0f88f877fd8efc58..10eb23f81769c38baeaef4c912cafb3c0c368880 100644
--- a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/slice/process/JdbcSliceProcessor.java
+++ b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/slice/process/JdbcSliceProcessor.java
@@ -21,7 +21,6 @@ import org.opengauss.datachecker.common.entry.extract.TableMetadata;
import org.opengauss.datachecker.common.exception.ExtractDataAccessException;
import org.opengauss.datachecker.extract.resource.JdbcDataOperations;
import org.opengauss.datachecker.extract.slice.SliceProcessorContext;
-import org.opengauss.datachecker.extract.slice.common.SliceKafkaAgents;
import org.opengauss.datachecker.extract.slice.common.SliceResultSetSender;
import org.opengauss.datachecker.extract.task.sql.FullQueryStatement;
import org.opengauss.datachecker.extract.task.sql.QuerySqlEntry;
@@ -29,7 +28,6 @@ import org.opengauss.datachecker.extract.task.sql.SliceQueryStatement;
import org.springframework.kafka.support.SendResult;
import org.springframework.util.concurrent.ListenableFuture;
-import javax.sql.DataSource;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
@@ -40,6 +38,7 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
+import java.util.concurrent.atomic.AtomicInteger;
/**
* JdbcSliceProcessor
@@ -49,23 +48,32 @@ import java.util.TreeMap;
* @since :11
*/
public class JdbcSliceProcessor extends AbstractSliceProcessor {
-
private final JdbcDataOperations jdbcOperation;
+ private final String table;
+ private final AtomicInteger rowCount = new AtomicInteger(0);
- public JdbcSliceProcessor(DataSource datasource, SliceVo slice, SliceProcessorContext context) {
+ /**
+ * JdbcSliceProcessor
+ *
+ * @param slice slice
+ * @param context context
+ */
+ public JdbcSliceProcessor(SliceVo slice, SliceProcessorContext context) {
super(slice, context);
- this.jdbcOperation = context.getJdbcDataOperations(datasource);
+ this.jdbcOperation = context.getJdbcDataOperations();
+ this.table = slice.getTable();
+ this.rowCount.set(0);
}
@Override
public void run() {
log.info("table slice [{}] is beginning to extract data", slice.toSimpleString());
try {
- TableMetadata tableMetadata = context.getTableMetaData(slice.getTable());
+ TableMetadata tableMetadata = context.getTableMetaData(table);
SliceExtend sliceExtend = createSliceExtend(tableMetadata.getTableHash());
if (!slice.isEmptyTable()) {
QuerySqlEntry queryStatement = createQueryStatement(tableMetadata);
- log.debug("table [{}] query statement : {}", queryStatement.getTable(), queryStatement.getSql());
+ log.debug("table [{}] query statement : {}", table, queryStatement.getSql());
executeQueryStatement(queryStatement, tableMetadata, sliceExtend);
} else {
log.info("table slice [{}] is empty ", slice.toSimpleString());
@@ -79,7 +87,7 @@ public class JdbcSliceProcessor extends AbstractSliceProcessor {
}
}
- protected QuerySqlEntry createQueryStatement(TableMetadata tableMetadata) {
+ private QuerySqlEntry createQueryStatement(TableMetadata tableMetadata) {
if (slice.isSlice()) {
SliceQueryStatement sliceStatement = context.createSliceQueryStatement();
return sliceStatement.buildSlice(tableMetadata, slice);
@@ -89,10 +97,9 @@ public class JdbcSliceProcessor extends AbstractSliceProcessor {
}
}
- protected SliceExtend executeQueryStatement(QuerySqlEntry sqlEntry, TableMetadata tableMetadata,
+ private SliceExtend executeQueryStatement(QuerySqlEntry sqlEntry, TableMetadata tableMetadata,
SliceExtend sliceExtend) {
final LocalDateTime start = LocalDateTime.now();
- int rowCount = 0;
Connection connection = null;
long jdbcQueryCost = 0;
long sendDataCost = 0;
@@ -103,8 +110,8 @@ public class JdbcSliceProcessor extends AbstractSliceProcessor {
long estimatedMemorySize = estimatedMemorySize(tableMetadata.getAvgRowLength(), estimatedRowCount);
connection = jdbcOperation.tryConnectionAndClosedAutoCommit(estimatedMemorySize);
log.debug("slice [{}] fetch jdbc connection.", slice.getName());
- SliceKafkaAgents kafkaAgents = context.createSliceKafkaAgents(slice);
- SliceResultSetSender sliceSender = new SliceResultSetSender(tableMetadata, kafkaAgents);
+ SliceResultSetSender sliceSender = createSliceResultSetSender(tableMetadata);
+ sliceExtend.setStartOffset(sliceSender.checkOffsetEnd());
try (PreparedStatement ps = connection.prepareStatement(sqlEntry.getSql());
ResultSet resultSet = ps.executeQuery()) {
@@ -112,27 +119,8 @@ public class JdbcSliceProcessor extends AbstractSliceProcessor {
LocalDateTime jdbcQuery = LocalDateTime.now();
jdbcQueryCost = durationBetweenToMillis(start, jdbcQuery);
resultSet.setFetchSize(FETCH_SIZE);
- ResultSetMetaData rsmd = resultSet.getMetaData();
- sliceExtend.setStartOffset(sliceSender.checkOffsetEnd());
- Map result = new TreeMap<>();
- List offsetList = new LinkedList<>();
- List>> batchFutures = new LinkedList<>();
- final String tableName = slice.getTable();
- while (resultSet.next()) {
- rowCount++;
- batchFutures.add(sliceSender.resultSetTranslateAndSendSync(tableName, rsmd, resultSet, result, slice.getNo()));
- if (batchFutures.size() == FETCH_SIZE) {
- offsetList.add(getBatchFutureRecordOffsetScope(batchFutures));
- batchFutures.clear();
- }
- }
- if (batchFutures.size() > 0) {
- offsetList.add(getBatchFutureRecordOffsetScope(batchFutures));
- batchFutures.clear();
- }
- sliceExtend.setStartOffset(getMinOffset(offsetList));
- sliceExtend.setEndOffset(getMaxOffset(offsetList));
- sliceExtend.setCount(rowCount);
+ List offsetList = sliceQueryResultAndSendSync(sliceSender, resultSet);
+ updateExtendSliceOffsetAndCount(sliceExtend, rowCount.get(), offsetList);
sendDataCost = durationBetweenToMillis(jdbcQuery, LocalDateTime.now());
}
return sliceExtend;
@@ -146,4 +134,35 @@ public class JdbcSliceProcessor extends AbstractSliceProcessor {
sendDataCost, sliceAllCost);
}
}
+
+ private List sliceQueryResultAndSendSync(SliceResultSetSender sliceSender, ResultSet resultSet)
+ throws SQLException {
+ ResultSetMetaData rsmd = resultSet.getMetaData();
+ Map result = new TreeMap<>();
+ List offsetList = new LinkedList<>();
+ List>> batchFutures = new LinkedList<>();
+ while (resultSet.next()) {
+ this.rowCount.incrementAndGet();
+ batchFutures.add(sliceSender.resultSetTranslateAndSendSync(table, rsmd, resultSet, result, slice.getNo()));
+ if (batchFutures.size() == FETCH_SIZE) {
+ offsetList.add(getBatchFutureRecordOffsetScope(batchFutures));
+ batchFutures.clear();
+ }
+ }
+ if (batchFutures.size() > 0) {
+ offsetList.add(getBatchFutureRecordOffsetScope(batchFutures));
+ batchFutures.clear();
+ }
+ return offsetList;
+ }
+
+ private SliceResultSetSender createSliceResultSetSender(TableMetadata tableMetadata) {
+ return new SliceResultSetSender(tableMetadata, context.createSliceKafkaAgents(slice));
+ }
+
+ private void updateExtendSliceOffsetAndCount(SliceExtend sliceExtend, int rowCount, List offsetList) {
+ sliceExtend.setStartOffset(getMinOffset(offsetList));
+ sliceExtend.setEndOffset(getMaxOffset(offsetList));
+ sliceExtend.setCount(rowCount);
+ }
}
diff --git a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/slice/process/JdbcTableProcessor.java b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/slice/process/JdbcTableProcessor.java
index c92c60d19e30225b2324266b0de524b272b92ff3..f3a1fda05ef5c3716a95481be01b689d24d8458f 100644
--- a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/slice/process/JdbcTableProcessor.java
+++ b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/slice/process/JdbcTableProcessor.java
@@ -24,7 +24,6 @@ import org.opengauss.datachecker.extract.task.sql.AutoSliceQueryStatement;
import org.opengauss.datachecker.extract.task.sql.FullQueryStatement;
import org.opengauss.datachecker.extract.task.sql.QuerySqlEntry;
-import javax.sql.DataSource;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
@@ -48,9 +47,15 @@ public class JdbcTableProcessor extends AbstractTableProcessor {
private SliceResultSetSender sliceSender;
- public JdbcTableProcessor(DataSource datasource, String table, SliceProcessorContext context) {
+ /**
+ * JdbcTableProcessor
+ *
+ * @param table table
+ * @param context context
+ */
+ public JdbcTableProcessor(String table, SliceProcessorContext context) {
super(table, context);
- this.jdbcOperation = context.getJdbcDataOperations(datasource);
+ this.jdbcOperation = context.getJdbcDataOperations();
}
@Override
@@ -110,7 +115,8 @@ public class JdbcTableProcessor extends AbstractTableProcessor {
} finally {
jdbcOperation.releaseConnection(connection);
log.info("query table [{}] row-count [{}] cost [{}] milliseconds", table, tableRowCount,
- Duration.between(start, LocalDateTime.now()).toMillis());
+ Duration.between(start, LocalDateTime.now())
+ .toMillis());
}
return tableRowCount;
}