diff --git a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/slice/common/SliceResultSetSender.java b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/slice/common/SliceResultSetSender.java index 29181b63c184d429d9a106a3d6957c4028a56bac..4fa49a6b52d37c1674313c68fcfc0d6bdccfc4b0 100644 --- a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/slice/common/SliceResultSetSender.java +++ b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/slice/common/SliceResultSetSender.java @@ -15,11 +15,9 @@ package org.opengauss.datachecker.extract.slice.common; -import org.apache.logging.log4j.Logger; import org.opengauss.datachecker.common.entry.extract.ColumnsMetaData; import org.opengauss.datachecker.common.entry.extract.RowDataHash; import org.opengauss.datachecker.common.entry.extract.TableMetadata; -import org.opengauss.datachecker.common.util.LogUtils; import org.opengauss.datachecker.extract.task.ResultSetHandler; import org.opengauss.datachecker.extract.task.ResultSetHandlerFactory; import org.opengauss.datachecker.extract.util.HashHandler; @@ -41,9 +39,9 @@ import java.util.Map; * @since :11 */ public class SliceResultSetSender { - protected static final Logger log = LogUtils.getBusinessLogger(); - private static final HashHandler hashHandler = new HashHandler(); - private static final String csv_null_value = "null"; + private static final HashHandler HASH_HANDLER = new HashHandler(); + private static final String CSV_NULL_VALUE = "null"; + private final ResultSetHandler resultSetHandler; private final SliceKafkaAgents kafkaOperate; private final List columns; @@ -67,31 +65,42 @@ public class SliceResultSetSender { } /** - * parse result set to RowDataHash + * resultSetTranslateAndSendRandom * - * @param rs rs - * @param sNo sNo + * @param rsmd rsmd + * @param rs rs + * @param result result + * @param sNo sNo */ - public void resultSetTranslateAndSend(String tableName, ResultSetMetaData rsmd, ResultSet rs, - Map result, int sNo) { - RowDataHash dataHash = resultSetTranslate(tableName, rsmd, rs, result, sNo); - kafkaOperate.sendRowData(dataHash); - } - - public void resultSetTranslateAndSendRandom(String tableName, ResultSetMetaData rsmd, ResultSet rs, - Map result, int sNo) { - RowDataHash dataHash = resultSetTranslate(tableName, rsmd, rs, result, sNo); + public void resultSetTranslateAndSendRandom(ResultSetMetaData rsmd, ResultSet rs, Map result, + int sNo) { + RowDataHash dataHash = resultSetTranslate(rsmd, rs, result, sNo); kafkaOperate.sendRowDataRandomPartition(dataHash); } - public ListenableFuture> resultSetTranslateAndSendSync(String tableName, - ResultSetMetaData rsmd, ResultSet rs, Map result, int sNo) { - RowDataHash dataHash = resultSetTranslate(tableName, rsmd, rs, result, sNo); + /** + * resultSetTranslateAndSendSync + * + * @param rsmd rsmd + * @param rs rs + * @param result result + * @param sNo sNo + */ + public ListenableFuture> resultSetTranslateAndSendSync(ResultSetMetaData rsmd, + ResultSet rs, Map result, int sNo) { + RowDataHash dataHash = resultSetTranslate(rsmd, rs, result, sNo); return kafkaOperate.sendRowDataSync(dataHash); } - public RowDataHash resultSetTranslate(String tableName, ResultSetMetaData rsmd, ResultSet rs, - Map result, int sNo) { + /** + * resultSetTranslate + * + * @param rsmd rsmd + * @param rs rs + * @param result result + * @param sNo sNo + */ + public RowDataHash resultSetTranslate(ResultSetMetaData rsmd, ResultSet rs, Map result, int sNo) { resultSetHandler.putOneResultSetToMap(tableName, rsmd, rs, result); RowDataHash dataHash = handler(primary, columns, result); dataHash.setSNo(sNo); @@ -99,10 +108,18 @@ public class SliceResultSetSender { return dataHash; } + /** + * checkOffsetEnd + * + * @return checkOffsetEnd + */ public long checkOffsetEnd() { return kafkaOperate.checkTopicPartitionEndOffset(); } + /** + * agentsClosed + */ public void agentsClosed() { kafkaOperate.agentsClosed(); } @@ -121,9 +138,9 @@ public class SliceResultSetSender { * @return Returns the hash calculation result of extracted data */ private RowDataHash handler(List primary, List columns, Map rowData) { - long rowHash = hashHandler.xx3Hash(rowData, columns); - String primaryValue = hashHandler.value(rowData, primary); - long primaryHash = hashHandler.xx3Hash(rowData, primary); + long rowHash = HASH_HANDLER.xx3Hash(rowData, columns); + String primaryValue = HASH_HANDLER.value(rowData, primary); + long primaryHash = HASH_HANDLER.xx3Hash(rowData, primary); RowDataHash hashData = new RowDataHash(); hashData.setKey(primaryValue) .setKHash(primaryHash) @@ -183,12 +200,11 @@ public class SliceResultSetSender { int idx; for (ColumnsMetaData column : columnMetas) { idx = column.getOrdinalPosition() - 1; - if (csv_null_value.equalsIgnoreCase(nextLine[idx])) { - result.put(column.getColumnName(), csv_null_value); + if (CSV_NULL_VALUE.equalsIgnoreCase(nextLine[idx])) { + result.put(column.getColumnName(), CSV_NULL_VALUE); } else { result.put(column.getColumnName(), nextLine[idx]); } } -// log.info("data:{}", result); } } diff --git a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/slice/process/JdbcSliceProcessor.java b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/slice/process/JdbcSliceProcessor.java index 10eb23f81769c38baeaef4c912cafb3c0c368880..0577dbe949bec5eddb59335f917cb7ad97005bb3 100644 --- a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/slice/process/JdbcSliceProcessor.java +++ b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/slice/process/JdbcSliceProcessor.java @@ -71,13 +71,9 @@ public class JdbcSliceProcessor extends AbstractSliceProcessor { try { TableMetadata tableMetadata = context.getTableMetaData(table); SliceExtend sliceExtend = createSliceExtend(tableMetadata.getTableHash()); - if (!slice.isEmptyTable()) { - QuerySqlEntry queryStatement = createQueryStatement(tableMetadata); - log.debug("table [{}] query statement : {}", table, queryStatement.getSql()); - executeQueryStatement(queryStatement, tableMetadata, sliceExtend); - } else { - log.info("table slice [{}] is empty ", slice.toSimpleString()); - } + QuerySqlEntry queryStatement = createQueryStatement(tableMetadata); + log.debug("table [{}] query statement : {}", table, queryStatement.getSql()); + executeQueryStatement(queryStatement, tableMetadata, sliceExtend); feedbackStatus(sliceExtend); } catch (Exception ex) { log.error("table slice [{}] is error", slice.toSimpleString(), ex); @@ -97,8 +93,7 @@ public class JdbcSliceProcessor extends AbstractSliceProcessor { } } - private SliceExtend executeQueryStatement(QuerySqlEntry sqlEntry, TableMetadata tableMetadata, - SliceExtend sliceExtend) { + private void executeQueryStatement(QuerySqlEntry sqlEntry, TableMetadata tableMetadata, SliceExtend sliceExtend) { final LocalDateTime start = LocalDateTime.now(); Connection connection = null; long jdbcQueryCost = 0; @@ -123,7 +118,6 @@ public class JdbcSliceProcessor extends AbstractSliceProcessor { updateExtendSliceOffsetAndCount(sliceExtend, rowCount.get(), offsetList); sendDataCost = durationBetweenToMillis(jdbcQuery, LocalDateTime.now()); } - return sliceExtend; } catch (SQLException ex) { throw new ExtractDataAccessException(ex); } finally { @@ -143,7 +137,7 @@ public class JdbcSliceProcessor extends AbstractSliceProcessor { List>> batchFutures = new LinkedList<>(); while (resultSet.next()) { this.rowCount.incrementAndGet(); - batchFutures.add(sliceSender.resultSetTranslateAndSendSync(table, rsmd, resultSet, result, slice.getNo())); + batchFutures.add(sliceSender.resultSetTranslateAndSendSync(rsmd, resultSet, result, slice.getNo())); if (batchFutures.size() == FETCH_SIZE) { offsetList.add(getBatchFutureRecordOffsetScope(batchFutures)); batchFutures.clear(); diff --git a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/slice/process/JdbcTableProcessor.java b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/slice/process/JdbcTableProcessor.java index f3a1fda05ef5c3716a95481be01b689d24d8458f..1ab7690707fe68e2512b52fd661ad1a08ba3960b 100644 --- a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/slice/process/JdbcTableProcessor.java +++ b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/slice/process/JdbcTableProcessor.java @@ -102,7 +102,7 @@ public class JdbcTableProcessor extends AbstractTableProcessor { int rowCount = 0; while (resultSet.next()) { rowCount++; - sliceSender.resultSetTranslateAndSendRandom(table, rsmd, resultSet, result, i); + sliceSender.resultSetTranslateAndSendRandom(rsmd, resultSet, result, i); } sliceSender.resultFlush(); tableRowCount += rowCount; @@ -139,7 +139,7 @@ public class JdbcTableProcessor extends AbstractTableProcessor { int rowCount = 0; while (resultSet.next()) { rowCount++; - sliceSender.resultSetTranslateAndSendRandom(table, rsmd, resultSet, result, 0); + sliceSender.resultSetTranslateAndSendRandom(rsmd, resultSet, result, 0); } tableRowCount += rowCount; log.info("finish {} , {}: {}", table, rowCount, tableRowCount); diff --git a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/task/OpenGaussCsvResultSetHandler.java b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/task/OpenGaussCsvResultSetHandler.java index c027502c432ef4da3169822e93a5569c7183913a..5225b48c4f5a540cec99ef03b8960975a1c4727f 100644 --- a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/task/OpenGaussCsvResultSetHandler.java +++ b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/task/OpenGaussCsvResultSetHandler.java @@ -20,6 +20,8 @@ import org.springframework.lang.NonNull; import java.math.BigDecimal; import java.sql.ResultSet; import java.sql.SQLException; +import java.util.Locale; +import java.util.Objects; /** * OpenGaussCsvResultSetHandler @@ -41,4 +43,16 @@ public class OpenGaussCsvResultSetHandler extends OpenGaussResultSetHandler { } return floatValue; } + + @Override + protected String bitToString(ResultSet rs, String columnLabel) throws SQLException { + return rs.getString(columnLabel); + } + + @Override + protected String binaryToString(ResultSet rs, String columnLabel) throws SQLException { + String binary = rs.getString(columnLabel); + return rs.wasNull() ? NULL : Objects.isNull(binary) ? NULL : binary.substring(2) + .toUpperCase(Locale.ENGLISH); + } } diff --git a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/task/OpenGaussResultSetHandler.java b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/task/OpenGaussResultSetHandler.java index 7ba79bb008d5c201ba60bc651485a779e6a9a9fd..7f8f70567de418b67b44cfc35f3a6f049ba6305a 100644 --- a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/task/OpenGaussResultSetHandler.java +++ b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/task/OpenGaussResultSetHandler.java @@ -37,31 +37,40 @@ public class OpenGaussResultSetHandler extends ResultSetHandler { protected final Map typeHandlers = new ConcurrentHashMap<>(); { + // byte binary blob TypeHandler byteaToString = (rs, columnLabel) -> bytesToString(rs.getBytes(columnLabel)); + typeHandlers.put(OpenGaussType.BYTEA, byteaToString); + TypeHandler blobToString = (rs, columnLabel) -> rs.getString(columnLabel); + typeHandlers.put(OpenGaussType.BLOB, blobToString); + typeHandlers.put(OpenGaussType.TINYBLOB, blobToString); + typeHandlers.put(OpenGaussType.MEDIUMBLOB, blobToString); + typeHandlers.put(OpenGaussType.LONGBLOB, blobToString); + TypeHandler clobToString = (rs, columnLabel) -> rs.getString(columnLabel); + typeHandlers.put(OpenGaussType.CLOB, clobToString); + TypeHandler xmlToString = (rs, columnLabel) -> rs.getString(columnLabel); + typeHandlers.put(OpenGaussType.XML, xmlToString); + TypeHandler bitToString = (rs, columnLabel) -> bitToString(rs, columnLabel); + typeHandlers.put(OpenGaussType.BIT, bitToString); + TypeHandler binaryToString = (rs, columnLabel) -> binaryToString(rs, columnLabel); + typeHandlers.put(OpenGaussType.binary, binaryToString); + typeHandlers.put(OpenGaussType.varbinary, binaryToString); + TypeHandler booleanToString = (rs, columnLabel) -> booleanToString(rs, columnLabel); - TypeHandler numeric0ToString = (rs, columnLabel) -> numeric0ToString(rs, columnLabel); + typeHandlers.put(OpenGaussType.BOOLEAN, booleanToString); + TypeHandler bpCharToString = (rs, columnLabel) -> fixedLenCharToString(rs, columnLabel); + typeHandlers.put(OpenGaussType.BPCHAR, bpCharToString); // float4 - float real + TypeHandler numeric0ToString = (rs, columnLabel) -> numeric0ToString(rs, columnLabel); typeHandlers.put(OpenGaussType.INT4, numeric0ToString); typeHandlers.put(OpenGaussType.INTEGER, numeric0ToString); typeHandlers.put(OpenGaussType.NUMERIC0, numeric0ToString); - typeHandlers.put(OpenGaussType.BPCHAR, bpCharToString); - - // byte binary blob - typeHandlers.put(OpenGaussType.BYTEA, byteaToString); - typeHandlers.put(OpenGaussType.BLOB, blobToString); - typeHandlers.put(OpenGaussType.BOOLEAN, booleanToString); - typeHandlers.put(OpenGaussType.CLOB, clobToString); - typeHandlers.put(OpenGaussType.XML, xmlToString); - typeHandlers.put(OpenGaussType.BIT, bitToString); - typeHandlers.put(OpenGaussType.binary, binaryToString); - typeHandlers.put(OpenGaussType.varbinary, binaryToString); // date time timestamp typeHandlers.put(OpenGaussType.DATE, this::getDateFormat); @@ -70,28 +79,73 @@ public class OpenGaussResultSetHandler extends ResultSetHandler { typeHandlers.put(OpenGaussType.TIMESTAMPTZ, this::getTimestampFormat); } - private String binaryToString(ResultSet rs, String columnLabel) throws SQLException { - String binary = rs.getString(columnLabel); - return rs.wasNull() ? NULL : Objects.isNull(binary) ? NULL : binary.substring(2) - .toUpperCase(Locale.ENGLISH); - } - - private String bitToString(ResultSet rs, String columnLabel) throws SQLException { - return HexUtil.binaryToHex(rs.getString(columnLabel)); - } - + /** + * OpenGaussResultSetHandler + */ public OpenGaussResultSetHandler() { super(); } + /** + * OpenGaussResultSetHandler + * + * @param supplyZero supplyZero + */ public OpenGaussResultSetHandler(Boolean supplyZero) { super(supplyZero); } + /** + * binaryToString + * + * @param rs rs + * @param columnLabel columnLabel + * @return result + * @throws SQLException SQLException + */ + protected String binaryToString(ResultSet rs, String columnLabel) throws SQLException { + String binary = rs.getString(columnLabel); + return rs.wasNull() ? NULL : Objects.isNull(binary) ? NULL : binary.substring(2) + .toUpperCase(Locale.ENGLISH); + } + + /** + * bitToString + * + * @param rs rs + * @param columnLabel columnLabel + * @return result + * @throws SQLException SQLException + */ + protected String bitToString(ResultSet rs, String columnLabel) throws SQLException { + return HexUtil.binaryToHex(rs.getString(columnLabel)); + } + + /** + * intToString + * + * @param rs rs + * @param columnLabel columnLabel + * @return result + * @throws SQLException SQLException + */ protected String intToString(ResultSet rs, String columnLabel) throws SQLException { return rs.getString(columnLabel); } + /** + * intToString + * + * @param rs rs + * @param columnLabel columnLabel + * @return result + * @throws SQLException SQLException + */ + protected String booleanToString(ResultSet rs, String columnLabel) throws SQLException { + final int booleanVal = rs.getInt(columnLabel); + return booleanVal == 1 ? "true" : "false"; + } + @Override public String convert(ResultSet resultSet, int columnIdx, ResultSetMetaData rsmd) throws SQLException { String columnLabel = rsmd.getColumnLabel(columnIdx); @@ -135,7 +189,7 @@ public class OpenGaussResultSetHandler extends ResultSetHandler { } } - protected String getPgColumnTypeName(ResultSetMetaData rsmd, int columnIdx) throws SQLException { + private String getPgColumnTypeName(ResultSetMetaData rsmd, int columnIdx) throws SQLException { String columnTypeName = rsmd.getColumnTypeName(columnIdx); if (columnTypeName.contains(OpenGaussType.pg_catalog)) { columnTypeName = rsmd.getColumnTypeName(columnIdx) @@ -145,51 +199,208 @@ public class OpenGaussResultSetHandler extends ResultSetHandler { return columnTypeName; } - protected String booleanToString(ResultSet rs, String columnLabel) throws SQLException { - final int booleanVal = rs.getInt(columnLabel); - return booleanVal == 1 ? "true" : "false"; - } - @SuppressWarnings("all") - interface OpenGaussType { + private interface OpenGaussType { + /** + * opengauss data type : pg_catalog + */ String pg_catalog = "pg_catalog"; + + /** + * opengauss data type : pg_catalog type quotation + */ String pg_catalog_type_quotation = "\""; + + /** + * opengauss data type : pg_catalog type split prefex + */ String pg_catalog_type_split = "pg_catalog."; + + /** + * empty string constants + */ String empty = ""; + + /** + * opengauss data type : bytea + */ String BYTEA = "bytea"; + + /** + * opengauss data type : bool + */ String BOOLEAN = "bool"; + + /** + * opengauss data type : blob + */ String BLOB = "blob"; + + /** + * opengauss data type : tinyblob + */ + String TINYBLOB = "tinyblob"; + + /** + * opengauss data type : mediumblob + */ + String MEDIUMBLOB = "mediumblob"; + + /** + * opengauss data type : longblob + */ + String LONGBLOB = "longblob"; + + /** + * opengauss data type : numeric + */ String NUMERIC = "numeric"; + + /** + * opengauss data type : numeric0 + */ String NUMERIC0 = "numeric0"; + + /** + * opengauss data type : float2 + */ String FLOAT1 = "float1"; + + /** + * opengauss data type : float2 + */ String FLOAT2 = "float2"; + + /** + * opengauss data type : float4 + */ String FLOAT4 = "float4"; + + /** + * opengauss data type : float8 + */ String FLOAT8 = "float8"; - String INTEGER = "Integer"; + + /** + * opengauss data type : integer + */ + String INTEGER = "integer"; + + /** + * opengauss data type : int1 + */ String INT1 = "int1"; + + /** + * opengauss data type : int2 + */ String INT2 = "int2"; + + /** + * opengauss data type : int4 + */ String INT4 = "int4"; + + /** + * opengauss data type : int8 + */ String INT8 = "int8"; + + /** + * opengauss data type : uint1 + */ String UINT1 = "uint1"; + + /** + * opengauss data type : uint2 + */ String UINT2 = "uint2"; + + /** + * opengauss data type : uint4 + */ String UINT4 = "uint4"; + + /** + * opengauss data type : uint8 + */ String UINT8 = "uint8"; + + /** + * opengauss data type : varchar + */ String VARCHAR = "varchar"; + + /** + * opengauss data type : bpchar + */ String BPCHAR = "bpchar"; + + /** + * opengauss data type : date + */ String DATE = "date"; + + /** + * opengauss data type : time + */ String TIME = "time"; + + /** + * opengauss data type : timestamp + */ String TIMESTAMP = "timestamp"; + + /** + * opengauss data type : timestamptz + */ String TIMESTAMPTZ = "timestamptz"; + + /** + * opengauss data type : clob + */ String CLOB = "clob"; + + /** + * opengauss data type : xml + */ String XML = "xml"; + + /** + * opengauss data type : bit + */ String BIT = "bit"; + + /** + * opengauss data type : binary + */ String binary = "binary"; + + /** + * opengauss data type : varbinary + */ String varbinary = "varbinary"; + + /** + * opengauss data type : all digit type + */ List digit = List.of(NUMERIC, INT1, INT2, INT4, INT8, UINT1, UINT2, UINT4, UINT8, FLOAT1, FLOAT2, FLOAT4, FLOAT8, INTEGER); + + /** + * opengauss data type : all integer type + */ List integerList = List.of(INT1, INT2, INT4, INT8, UINT1, UINT2, UINT4, UINT8, INTEGER); + + /** + * opengauss data type : bigint and unsigned bigint + */ List bigintegerList = List.of(INT8, UINT8); + + /** + * opengauss data type : all float + */ List floatList = List.of(FLOAT1, FLOAT2, FLOAT4, FLOAT8); /** diff --git a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/task/ResultSetHandler.java b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/task/ResultSetHandler.java index a2383b50ee0c29db64cc4c32fb48ac421b9d4a40..9d6dd4d7bebbea2a4c9307f13f6f4fc89219cbd1 100644 --- a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/task/ResultSetHandler.java +++ b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/task/ResultSetHandler.java @@ -99,7 +99,6 @@ public abstract class ResultSetHandler { } catch (SQLException ex) { log.error(" parse data metadata information exception", ex); } -// log.info("data:{}", values); return values; }