From 1ce30b1bf95cde63aa5097b8fb0fff661afc3d7e Mon Sep 17 00:00:00 2001 From: caoyangit Date: Mon, 2 Dec 2024 11:23:56 +0800 Subject: [PATCH 1/2] =?UTF-8?q?=E6=94=AF=E6=8C=81postgres=E5=8D=8F?= =?UTF-8?q?=E8=AE=AE=E8=A7=A3=E6=9E=90?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../org/opengauss/tool/parse/ParseThread.java | 8 +- .../tool/parse/object/DatabaseTypeEnum.java | 8 + .../parse/postgres/NumberValueConverter.java | 262 +++++++++++++ .../tool/parse/postgres/PGValueConverter.java | 36 ++ .../postgres/PostgresDataTypeConverter.java | 236 ++++++++++++ .../tool/parse/postgres/PostgresParser.java | 361 ++++++++++++++++++ .../postgres/PostgresProtocolConstant.java | 131 +++++++ .../opengauss/tool/utils/CommonParser.java | 44 ++- 8 files changed, 1079 insertions(+), 7 deletions(-) create mode 100644 dynamic_sql_collection/transcribe-replay-tool/src/main/java/org/opengauss/tool/parse/postgres/NumberValueConverter.java create mode 100644 dynamic_sql_collection/transcribe-replay-tool/src/main/java/org/opengauss/tool/parse/postgres/PGValueConverter.java create mode 100644 dynamic_sql_collection/transcribe-replay-tool/src/main/java/org/opengauss/tool/parse/postgres/PostgresDataTypeConverter.java create mode 100644 dynamic_sql_collection/transcribe-replay-tool/src/main/java/org/opengauss/tool/parse/postgres/PostgresParser.java create mode 100644 dynamic_sql_collection/transcribe-replay-tool/src/main/java/org/opengauss/tool/parse/postgres/PostgresProtocolConstant.java diff --git a/dynamic_sql_collection/transcribe-replay-tool/src/main/java/org/opengauss/tool/parse/ParseThread.java b/dynamic_sql_collection/transcribe-replay-tool/src/main/java/org/opengauss/tool/parse/ParseThread.java index d5e4642..a51d670 100644 --- a/dynamic_sql_collection/transcribe-replay-tool/src/main/java/org/opengauss/tool/parse/ParseThread.java +++ b/dynamic_sql_collection/transcribe-replay-tool/src/main/java/org/opengauss/tool/parse/ParseThread.java @@ -229,7 +229,13 @@ public class ParseThread extends Thread { return null; } - private PacketData peekNextPacket() { + /** + * get next packetData in thread + * + * @return PacketData return next PacketData, return null if PacketQueue is empty and all packet in + * tcp File are distributed + */ + protected PacketData peekNextPacket() { do { PacketData next = packetQueue.peek(); if (next != null) { diff --git a/dynamic_sql_collection/transcribe-replay-tool/src/main/java/org/opengauss/tool/parse/object/DatabaseTypeEnum.java b/dynamic_sql_collection/transcribe-replay-tool/src/main/java/org/opengauss/tool/parse/object/DatabaseTypeEnum.java index 46d84cc..c68246a 100644 --- a/dynamic_sql_collection/transcribe-replay-tool/src/main/java/org/opengauss/tool/parse/object/DatabaseTypeEnum.java +++ b/dynamic_sql_collection/transcribe-replay-tool/src/main/java/org/opengauss/tool/parse/object/DatabaseTypeEnum.java @@ -17,6 +17,7 @@ package org.opengauss.tool.parse.object; import org.opengauss.tool.parse.ParseThread; import org.opengauss.tool.parse.ogparser.OgMessageParser; +import org.opengauss.tool.parse.postgres.PostgresParser; /** * Database type enum @@ -42,6 +43,13 @@ public enum DatabaseTypeEnum { public ParseThread getSuitableProtocolParser(String clientId) { return new OgMessageParser(clientId); } + }, + + POSTGRESQL { + @Override + public ParseThread getSuitableProtocolParser(String clientId) { + return new PostgresParser(clientId); + } }; /** diff --git a/dynamic_sql_collection/transcribe-replay-tool/src/main/java/org/opengauss/tool/parse/postgres/NumberValueConverter.java b/dynamic_sql_collection/transcribe-replay-tool/src/main/java/org/opengauss/tool/parse/postgres/NumberValueConverter.java new file mode 100644 index 0000000..ef68f02 --- /dev/null +++ b/dynamic_sql_collection/transcribe-replay-tool/src/main/java/org/opengauss/tool/parse/postgres/NumberValueConverter.java @@ -0,0 +1,262 @@ +/* + * Copyright (c) Huawei Technologies Co., Ltd. 2024-2024. All rights reserved. + * + * openGauss is licensed under Mulan PSL v2. + * You can use this software according to the terms and conditions of the Mulan PSL v2. + * You may obtain a copy of Mulan PSL v2 at: + * + * http://license.coscl.org.cn/MulanPSL2 + * + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PSL v2 for more details. + * ------------------------------------------------------------------------- + */ + +package org.opengauss.tool.parse.postgres; + +import lombok.Getter; +import org.postgresql.util.ByteConverter; + +import java.math.BigDecimal; +import java.math.BigInteger; + +/** + * Description: NumberValueConverter parse BigDecimal Type + * + * @author : caoyang + * @since : 2024/11/27 + */ +public class NumberValueConverter { + private static final short NUMERIC_NEG = 0x4000; + private static final short NUMERIC_NAN = (short) 0xC000; + private static final int BIG_DECIMAL_HEADER_LEN = 8; + private static final int[] INT_TEN_ARRAY = new int[] {1, 10, 100, 1000, 10000, 100000}; + private static final BigInteger[] BIG_INT_TEN_ARRAY = new BigInteger[32]; + private static final BigInteger BI_TEN_THOUSAND = BigInteger.valueOf(10000); + + static { + BIG_INT_TEN_ARRAY[0] = BigInteger.ONE; + for (int i = 1; i < BIG_INT_TEN_ARRAY.length; i++) { + BIG_INT_TEN_ARRAY[i] = BigInteger.TEN.multiply(BIG_INT_TEN_ARRAY[i - 1]); + } + } + + private NumberValueConverter() {} + + @Getter + private static class Context { + short weight; + short sign; + short scale; + + int effectiveWeight; + int effectiveScale; + + BigInteger usBigInt; + int usIntVal; + + protected Context(byte[] bytes, int pos) { + this.weight = ByteConverter.int2(bytes, pos + 2); + this.sign = ByteConverter.int2(bytes, pos + 4); + this.scale = ByteConverter.int2(bytes, pos + 6); + + this.effectiveWeight = this.weight; + this.effectiveScale = this.scale; + + if (!(this.sign == 0x0000 || this.sign == NUMERIC_NEG || this.sign == NUMERIC_NAN)) { + throw new IllegalArgumentException("invalid sign in numeric value"); + } + + if ((this.scale & 0x00003FFF) != this.scale) { + throw new IllegalArgumentException("invalid scale in numeric value"); + } + usBigInt = null; + usIntVal = 0; + } + + private BigDecimal getValue() { + if (this.usBigInt == null) { + this.usBigInt = BigInteger.valueOf(this.usIntVal); + } + if (this.effectiveWeight > 0) { + this.usBigInt = tenPower(this.effectiveWeight * 4).multiply(this.usBigInt); + } + if (this.effectiveScale > 0) { + this.usBigInt = tenPower(this.effectiveScale).multiply(this.usBigInt); + } + if (this.sign == NUMERIC_NEG) { + this.usBigInt = usBigInt.negate(); + } + return new BigDecimal(this.usBigInt, this.scale); + } + + private void multiplyTenThousand() { + if (this.usBigInt == null) { + this.usIntVal = this.usIntVal * 10000; + } else { + this.usBigInt = BI_TEN_THOUSAND.multiply(this.usBigInt); + } + } + + private short tenPowerUp(short num) { + if (this.usBigInt == null) { + this.usIntVal = this.usIntVal * INT_TEN_ARRAY[this.effectiveScale]; + } else { + this.usBigInt = this.usBigInt.multiply(tenPower(this.effectiveScale)); + } + short returnVal = (short) (num / INT_TEN_ARRAY[4 - this.effectiveScale]); + this.effectiveScale = 0; + return returnVal; + } + } + + /** + * Parse BigDecimal from byte array + * + * @param bytes the byte array to parse + * @param pos this start index of byte array + * @param numBytes num of bytes + * @return Number (BigDecimal or Double.Nan) parse result + */ + public static Number toNumber(byte[] bytes, int pos, int numBytes) { + int len = ByteConverter.int2(bytes, pos) & 0xFFFF; + if (numBytes != (len * 2 + BIG_DECIMAL_HEADER_LEN)) { + throw new IllegalArgumentException("invalid length of bytes numeric value"); + } + + Context context = new Context(bytes, pos); + if (context.sign == NUMERIC_NAN) { + return Double.NaN; + } + if (len == 0) { + return new BigDecimal(BigInteger.ZERO, context.scale); + } + + if (context.weight < 0) { + return processWeight(bytes, context, len, pos + BIG_DECIMAL_HEADER_LEN); + } + + if (context.scale == 0) { + return processNoScale(bytes, context, len, pos + BIG_DECIMAL_HEADER_LEN); + } + + return process(bytes, context, len, pos + BIG_DECIMAL_HEADER_LEN); + } + + private static BigDecimal process(byte[] bytes, Context context, int len, int idx) { + short num = ByteConverter.int2(bytes, idx); + context.usIntVal = num; + int ptr = idx; + for (int i = 1; i < len; i++) { + if (i == 4) { + context.usBigInt = BigInteger.valueOf(context.usIntVal); + } + ptr += 2; + num = ByteConverter.int2(bytes, ptr); + if (context.effectiveWeight > 0) { + context.effectiveWeight--; + context.multiplyTenThousand(); + } else if (context.effectiveScale >= 4) { + context.effectiveScale = context.effectiveScale - 4; + context.multiplyTenThousand(); + } else { + num = context.tenPowerUp(num); + } + if (context.usBigInt == null) { + context.usIntVal = context.usIntVal + num; + } else if (num != 0) { + context.usBigInt = context.usBigInt.add(BigInteger.valueOf(num)); + } else { + throw new IllegalArgumentException("invalid bytes numeric value"); + } + } + + return context.getValue(); + } + + private static BigDecimal processNoScale(byte[] bytes, Context context, int len, int idx) { + BigInteger unscaledBI = null; + short num = ByteConverter.int2(bytes, idx); + int ptr = idx; + long unscaledInt = num; + for (int i = 1; i < len; i++) { + if (i == 4) { + unscaledBI = BigInteger.valueOf(unscaledInt); + } + ptr += 2; + num = ByteConverter.int2(bytes, ptr); + if (unscaledBI == null) { + unscaledInt = 10000 * unscaledInt; + unscaledInt = unscaledInt + num; + } else { + unscaledBI = BI_TEN_THOUSAND.multiply(unscaledBI); + if (num != 0) { + unscaledBI = unscaledBI.add(BigInteger.valueOf(num)); + } + } + } + if (unscaledBI == null) { + unscaledBI = BigInteger.valueOf(unscaledInt); + } + if (context.sign == NUMERIC_NEG) { + unscaledBI = unscaledBI.negate(); + } + int bigDecScale = (len - (context.weight + 1)) * 4; + return bigDecScale == 0 ? new BigDecimal(unscaledBI) : new BigDecimal(unscaledBI, bigDecScale).setScale(0); + } + + private static BigDecimal processWeight(byte[] bytes, Context context, int len, int idx) { + short num = ByteConverter.int2(bytes, idx); + context.weight++; + if (context.weight < 0) { + context.effectiveScale = 4 * context.weight + context.effectiveScale; + } + int i = 1; + int ptr = idx; + for (; i < len && num == 0; i++) { + context.effectiveScale = context.effectiveScale - 4; + ptr = ptr + 2; + num = ByteConverter.int2(bytes, ptr); + } + + if (context.effectiveScale >= 4) { + context.effectiveScale = context.effectiveScale - 4; + } else { + num = (short) (num / INT_TEN_ARRAY[4 - context.effectiveScale]); + context.effectiveScale = 0; + } + + context.usIntVal = num; + for (; i < len; i++) { + if (i == 4 && context.effectiveScale > 2) { + context.usBigInt = BigInteger.valueOf(context.usIntVal); + } + ptr = ptr + 2; + num = ByteConverter.int2(bytes, ptr); + if (context.effectiveScale >= 4) { + context.effectiveScale = context.effectiveScale - 4; + context.multiplyTenThousand(); + } else { + num = context.tenPowerUp(num); + } + if (context.usBigInt == null) { + context.usIntVal = context.usIntVal + num; + } else if (num != 0) { + context.usBigInt = context.usBigInt.add(BigInteger.valueOf(num)); + } else { + throw new IllegalArgumentException("invalid bytes numeric value"); + } + } + return context.getValue(); + } + + private static BigInteger tenPower(int exponent) { + if (BIG_INT_TEN_ARRAY.length <= exponent) { + return BigInteger.TEN.pow(exponent); + } else { + return BIG_INT_TEN_ARRAY[exponent]; + } + } +} \ No newline at end of file diff --git a/dynamic_sql_collection/transcribe-replay-tool/src/main/java/org/opengauss/tool/parse/postgres/PGValueConverter.java b/dynamic_sql_collection/transcribe-replay-tool/src/main/java/org/opengauss/tool/parse/postgres/PGValueConverter.java new file mode 100644 index 0000000..10ae3e6 --- /dev/null +++ b/dynamic_sql_collection/transcribe-replay-tool/src/main/java/org/opengauss/tool/parse/postgres/PGValueConverter.java @@ -0,0 +1,36 @@ +/* + * Copyright (c) Huawei Technologies Co., Ltd. 2024-2024. All rights reserved. + * + * openGauss is licensed under Mulan PSL v2. + * You can use this software according to the terms and conditions of the Mulan PSL v2. + * You may obtain a copy of Mulan PSL v2 at: + * + * http://license.coscl.org.cn/MulanPSL2 + * + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PSL v2 for more details. + */ + +package org.opengauss.tool.parse.postgres; + +import org.opengauss.tool.parse.object.PreparedValue; + +/** + * Description: Postgres ValueConverter interface + * + * @author caoyang + * @since 2024/05/15 + **/ +public interface PGValueConverter { + /** + * Convert param value + * + * @param packet byte[] the packet + * @param start int the start index + * @param length int the length of bytes + * @return PreparedValue the prepared value + */ + PreparedValue convert(byte[] packet, int start, int length); +} diff --git a/dynamic_sql_collection/transcribe-replay-tool/src/main/java/org/opengauss/tool/parse/postgres/PostgresDataTypeConverter.java b/dynamic_sql_collection/transcribe-replay-tool/src/main/java/org/opengauss/tool/parse/postgres/PostgresDataTypeConverter.java new file mode 100644 index 0000000..f8fdb01 --- /dev/null +++ b/dynamic_sql_collection/transcribe-replay-tool/src/main/java/org/opengauss/tool/parse/postgres/PostgresDataTypeConverter.java @@ -0,0 +1,236 @@ +/* + * Copyright (c) Huawei Technologies Co., Ltd. 2024-2024. All rights reserved. + * + * openGauss is licensed under Mulan PSL v2. + * You can use this software according to the terms and conditions of the Mulan PSL v2. + * You may obtain a copy of Mulan PSL v2 at: + * + * http://license.coscl.org.cn/MulanPSL2 + * + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PSL v2 for more details. + * ------------------------------------------------------------------------- + */ + +package org.opengauss.tool.parse.postgres; + +import lombok.Getter; +import org.opengauss.tool.parse.object.PreparedValue; +import org.opengauss.tool.utils.CommonParser; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.math.BigDecimal; +import java.util.Optional; + +/** + * Description: Postgres Data type converter + * + * @author : caoyang + * @since : 2024/11/29 + */ +public final class PostgresDataTypeConverter { + private static final Logger LOGGER = LoggerFactory.getLogger(PostgresDataTypeConverter.class); + + /** + * Description: Postgres DataType accepted enum + * + * @author : caoyang + * @since : 2024/11/29 + */ + @Getter + public enum PgDataType { + BOOL(16, "bool", PostgresDataTypeConverter::convertBoolValue), + INT2(21, "int2", PostgresDataTypeConverter::convertIntValue), + INT4(23, "int4", PostgresDataTypeConverter::convertIntValue), + INT8(20, "int8", PostgresDataTypeConverter::convertLongValue), + VARCHAR(1043, "varchar", PostgresDataTypeConverter::convertStringValue), + FLOAT8(701, "float8", PostgresDataTypeConverter::convertDoubleValue), + FLOAT4(700, "float4", PostgresDataTypeConverter::convertFloatValue), + NUMERIC(1700, "numeric", PostgresDataTypeConverter::convertNumericValue); + + private final int oid; + private final String typeName; + private final PGValueConverter valueConverter; + + PgDataType(int oid, String typeName, PGValueConverter valueConverter) { + this.oid = oid; + this.typeName = typeName; + this.valueConverter = valueConverter; + } + + /** + * get PG_DATA_TYPE by type oid + * + * @param typeOid data type oid + * @return PG_DATA_TYPE dataType enum + */ + public static Optional getDataType(String typeOid) { + for (PgDataType type : PgDataType.values()) { + if (String.valueOf(type.getOid()).equals(typeOid)) { + return Optional.of(type); + } + } + return Optional.empty(); + } + } + + /** + * Get prepared value + * + * @param dataType String the data type + * @param data byte[] the data + * @param startIndex int the start index + * @param formatArray int[] the format tye array defined by previous data + * @param paramNo int the param index in parameter list + * @return PreparedValue the prepared parameter + */ + public static PreparedValue getValue(String dataType, byte[] data, int startIndex, int[] formatArray, int paramNo) { + int paramValueLength = CommonParser.parseSignedIntByBigEndian(data, startIndex, + PostgresProtocolConstant.PARAM_VALUE_LENGTH_BYTES); + if (paramValueLength <= 0) { + PreparedValue preparedValue = new PreparedValue(null, PostgresProtocolConstant.PARAM_VALUE_LENGTH_BYTES); + preparedValue.setType(dataType); + return preparedValue; + } + + boolean isText = isText(formatArray, paramNo); + if (isText) { + PreparedValue preparedValue = convertStringValue(data, + startIndex + PostgresProtocolConstant.PARAM_VALUE_LENGTH_BYTES, + paramValueLength); + preparedValue.setType(dataType); + return preparedValue; + } + Optional dataTypeOptional = PgDataType.getDataType(dataType); + if (!dataTypeOptional.isPresent()) { + LOGGER.error("unsupported data type, oid: {}", dataType); + } + PGValueConverter converter = dataTypeOptional.isPresent() + ? dataTypeOptional.get().getValueConverter() + : PostgresDataTypeConverter::unsupportedTypeConverter; + PreparedValue preparedValue = converter.convert( + data, + startIndex + PostgresProtocolConstant.PARAM_VALUE_LENGTH_BYTES, + paramValueLength); + preparedValue.setType(dataType); + return preparedValue; + } + + private static boolean isText(int[] paramFormatTypeArr, int idx) { + return paramFormatTypeArr.length == 1 ? paramFormatTypeArr[0] == 0 : paramFormatTypeArr[idx] == 0; + } + + /** + * Convert bool type + * + * @param packet byte[] the packet + * @param start int the start index + * @param length int the length of bytes + * @return PreparedValue the preparedValue + */ + public static PreparedValue convertBoolValue(byte[] packet, int start, int length) { + String value = String.valueOf(packet[start] == 0x01); + return new PreparedValue(value, length + PostgresProtocolConstant.PARAM_VALUE_LENGTH_BYTES); + } + + /** + * Convert int type + * + * @param packet byte[] the packet + * @param start int the start index + * @param length int the length of bytes + * @return PreparedValue the preparedValue + */ + public static PreparedValue convertIntValue(byte[] packet, int start, int length) { + String value = String.valueOf(CommonParser.parseSignedIntByBigEndian(packet, start, length)); + return new PreparedValue(value, length + PostgresProtocolConstant.PARAM_VALUE_LENGTH_BYTES); + } + + /** + * Convert int type + * + * @param packet byte[] the packet + * @param start int the start index + * @param length int the length of bytes + * @return PreparedValue the preparedValue + */ + public static PreparedValue convertLongValue(byte[] packet, int start, int length) { + String value = String.valueOf(CommonParser.parseSignedLongByBigEndian(packet, start)); + return new PreparedValue(value, length + PostgresProtocolConstant.PARAM_VALUE_LENGTH_BYTES); + } + + /** + * Convert int type + * + * @param packet byte[] the packet + * @param start int the start index + * @param length int the length of bytes + * @return PreparedValue the preparedValue + */ + public static PreparedValue convertFloatValue(byte[] packet, int start, int length) { + int hex = CommonParser.parseSignedIntByBigEndian(packet, start, length); + String value = String.valueOf(Float.intBitsToFloat(hex)); + return new PreparedValue(value, length + PostgresProtocolConstant.PARAM_VALUE_LENGTH_BYTES); + } + + /** + * Convert double type + * + * @param packet byte[] the packet + * @param start int the start index + * @param length int the length of bytes + * @return PreparedValue the preparedValue + */ + public static PreparedValue convertDoubleValue(byte[] packet, int start, int length) { + long hex = CommonParser.parseSignedLongByBigEndian(packet, start); + String value = String.valueOf(Double.longBitsToDouble(hex)); + return new PreparedValue(value, length + PostgresProtocolConstant.PARAM_VALUE_LENGTH_BYTES); + } + + /** + * Convert String value + * + * @param packet byte[] the packet + * @param start int the start index + * @param length int the length of bytes + * @return PreparedValue the preparedValue + */ + public static PreparedValue convertStringValue(byte[] packet, int start, int length) { + String value = CommonParser.parseByteToString(packet, start, start + length); + return new PreparedValue(value, length + PostgresProtocolConstant.PARAM_VALUE_LENGTH_BYTES); + } + + /** + * Convert numeric value + * + * @param packet byte[] the packet + * @param start int the start index + * @param length int the length of bytes + * @return PreparedValue the preparedValue + */ + public static PreparedValue convertNumericValue(byte[] packet, int start, int length) { + Number number = NumberValueConverter.toNumber(packet, start, length); + String value; + if (number instanceof BigDecimal) { + value = ((BigDecimal) number).toPlainString(); + } else { + value = String.valueOf(number); + } + return new PreparedValue(value, length + PostgresProtocolConstant.PARAM_VALUE_LENGTH_BYTES); + } + + /** + * not supported type converter + * + * @param packet byte[] the packet + * @param start int the start index + * @param length int the length of bytes + * @return PreparedValue the preparedValue + */ + public static PreparedValue unsupportedTypeConverter(byte[] packet, int start, int length) { + return new PreparedValue(null, length + PostgresProtocolConstant.PARAM_VALUE_LENGTH_BYTES); + } +} diff --git a/dynamic_sql_collection/transcribe-replay-tool/src/main/java/org/opengauss/tool/parse/postgres/PostgresParser.java b/dynamic_sql_collection/transcribe-replay-tool/src/main/java/org/opengauss/tool/parse/postgres/PostgresParser.java new file mode 100644 index 0000000..e6c8112 --- /dev/null +++ b/dynamic_sql_collection/transcribe-replay-tool/src/main/java/org/opengauss/tool/parse/postgres/PostgresParser.java @@ -0,0 +1,361 @@ +/* + * Copyright (c) Huawei Technologies Co., Ltd. 2024-2024. All rights reserved. + * + * openGauss is licensed under Mulan PSL v2. + * You can use this software according to the terms and conditions of the Mulan PSL v2. + * You may obtain a copy of Mulan PSL v2 at: + * + * http://license.coscl.org.cn/MulanPSL2 + * + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PSL v2 for more details. + * ------------------------------------------------------------------------- + */ + +package org.opengauss.tool.parse.postgres; + +import lombok.EqualsAndHashCode; +import org.apache.commons.lang3.StringUtils; +import org.opengauss.tool.parse.ParseThread; +import org.opengauss.tool.parse.object.PacketData; +import org.opengauss.tool.parse.object.PreparedValue; +import org.opengauss.tool.parse.object.ProtocolConstant; +import org.opengauss.tool.parse.object.SqlInfo; +import org.opengauss.tool.utils.CommonParser; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.util.CollectionUtils; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * Description: Parse postgres packet thread + * + * @author : caoyang + * @since : 2024/11/29 + */ +@EqualsAndHashCode(callSuper = true) +public class PostgresParser extends ParseThread { + private static final Logger LOGGER = LoggerFactory.getLogger(PostgresParser.class); + + private final Map prepareSqlMap; + + private final List pgIncompleteSqlList; + + /** + * Constructor + * + * @param sessionId String the session id + */ + public PostgresParser(String sessionId) { + super(sessionId); + prepareSqlMap = new HashMap<>(); + pgIncompleteSqlList = new ArrayList<>(); + } + + /** + * Is quit message + * + * @param packet PacketData the packet data + * @return true if current massage is quit message + */ + protected boolean isQuitMessage(PacketData packet) { + if (packet.getData().length == 5) { + String requestType = CommonParser.parseByteToString(packet.getData(), 0, 1); + if (PostgresProtocolConstant.COM_TERMINATE.equals(requestType)) { + quit(packet); + return true; + } + } + return false; + } + + @Override + protected void parsePacket(PacketData packet) { + if (packet.getData() == null) { + return; + } + parsePostgresqlRequestPacket(packet); + } + + private void parsePostgresqlRequestPacket(PacketData packet) { + if (StringUtils.isEmpty(this.username) && isStartUpPacket(packet)) { + initLoginRequest(packet); + skipResponsePacket(); + return; + } + + List packetDataList = splitPacketData(packet); + try { + for (PacketData subPacket : packetDataList) { + String requestType = CommonParser.parseByteToString(subPacket.getData(), 0, 1); + switch (requestType) { + case PostgresProtocolConstant.COM_QUERY: + parseSql(subPacket); + continue; + case PostgresProtocolConstant.COM_STMT_PARSE: + parsePreparedSql(subPacket); + continue; + case PostgresProtocolConstant.COM_STMT_BIND: + parsePreparedParameter(subPacket); + continue; + case PostgresProtocolConstant.COM_STMT_EXECUTE: + case PostgresProtocolConstant.COM_STMT_CLOSE: + case PostgresProtocolConstant.COM_STMT_RESET: + case PostgresProtocolConstant.COM_STMT_SYNC: + case PostgresProtocolConstant.DESCRIBE: + case PostgresProtocolConstant.FLUSH: + case PostgresProtocolConstant.FUNCTION_CALL: + case PostgresProtocolConstant.COPY_DATA: + case PostgresProtocolConstant.COPY_DONE: + case PostgresProtocolConstant.COPY_FAIL: + default: + } + } + submitIncompleteSqlList(packet); + } catch (IllegalArgumentException e) { + LOGGER.error("parse postgresql packet error, packet id: {}", packet.getPacketId(), e); + } catch (Exception e) { + LOGGER.error("parse postgresql packet unexcepted error, packet id: {}", packet.getPacketId(), e); + } + } + + private void submitIncompleteSqlList(PacketData packet) { + if (this.pgIncompleteSqlList != null && !this.pgIncompleteSqlList.isEmpty()) { + PacketData next = peekNextPacket(); + if (next == null || !ProtocolConstant.RESPONSE.equals(next.getPacketType())) { + LOGGER.error("no response for prepared Sql, packet id: {}", packet.getPacketId()); + return; + } + for (SqlInfo sqlInfo : this.pgIncompleteSqlList) { + sqlInfo.setExecuteDuration(next.getMicrosecondTimestamp()); + this.incompleteSql = sqlInfo; + addSqLToQueue(); + } + skipResponsePacket(); + this.pgIncompleteSqlList.clear(); + } + } + + private List splitPacketData(PacketData packet) { + List subPacketList = new ArrayList<>(); + byte[] data = packet.getData(); + int start = 0; + int length; + while (start < data.length) { + String requestType = CommonParser.parseByteToString(packet.getData(), 0, 1); + if (!PostgresProtocolConstant.REQUEST_TYPE_SET.contains(requestType)) { + break; + } + length = CommonParser.parseSignedIntByBigEndian(data, start + 1, 4); + PacketData subPacket = new PacketData(); + subPacket.copyFrom(packet); + subPacket.clonePacketData(data, start, 1 + length); + subPacketList.add(subPacket); + start += (1 + length); + } + return subPacketList; + } + + @Override + protected void parsePreparedSql(PacketData packet) { + byte[] data = packet.getData(); + + int statementIdStartIdx = 5; + int statementIdEndIdx = getStringEndIndex(data, statementIdStartIdx); + if (statementIdEndIdx < 0) { + LOGGER.error("parse statementId in prepare error, packetId: {}", packet.getPacketId()); + return; + } + + int startIdx = statementIdEndIdx + 1; + int sqlEndIdx = getStringEndIndex(data, startIdx); + String sql = CommonParser.parseByteToString(data, startIdx, sqlEndIdx).trim() + ";"; + + int paramCount = CommonParser.parseSignedIntByBigEndian(data, sqlEndIdx + 1, 2); + int paramIdx = sqlEndIdx + 3; + + List paramTypeList = new ArrayList<>(paramCount); + for (int i = 0; i < paramCount; i++) { + int paramType = CommonParser.parseSignedIntByBigEndian(data, paramIdx, 4); + paramTypeList.add(String.valueOf(paramType)); + paramIdx = paramIdx + 4; + sql = StringUtils.replace(sql, "$" + (i + 1), "?"); + } + + String statementId = CommonParser.parseByteToString(data, statementIdStartIdx, statementIdEndIdx).trim(); + SqlInfo sqlObject = new SqlInfo(packet.getPacketId(), true, sql); + sqlObject.encapsulateSql(username, schema, sessionId, paramCount); + sqlObject.setTypeList(paramTypeList); + prepareSqlMap.put(statementId, sqlObject); + } + + @Override + protected void parsePreparedParameter(PacketData packet) { + byte[] data = packet.getData(); + int parseIdx = 5; + + // skip portal + parseIdx = getStringEndIndex(data, parseIdx) + 1; + if (parseIdx < 0) { + return; + } + + int statementIdEndIdx = getStringEndIndex(data, parseIdx); + if (statementIdEndIdx < 0) { + LOGGER.error("parse statementId in bind param error, packetId: {}", packet.getPacketId()); + return; + } + String bindStatementId = CommonParser.parseByteToString(data, parseIdx, statementIdEndIdx).trim(); + if (!prepareSqlMap.containsKey(bindStatementId) || prepareSqlMap.get(bindStatementId) == null) { + return; + } + int[] paramFormatTypeArr; + parseIdx = statementIdEndIdx + 1; + int paramFormatLength = CommonParser.parseSignedIntByBigEndian(data, parseIdx, 2); + + parseIdx = parseIdx + 2; + if (paramFormatLength == 0) { + paramFormatTypeArr = new int[0]; + } else if (paramFormatLength == 1) { + paramFormatTypeArr = new int[]{CommonParser.parseSignedIntByBigEndian(data, parseIdx, 2)}; + parseIdx = parseIdx + 2; + } else { + paramFormatTypeArr = new int[paramFormatLength]; + for (int i = 0; i < paramFormatLength; i++) { + paramFormatTypeArr[i] = CommonParser.parseSignedIntByBigEndian(data, parseIdx, 2); + parseIdx = parseIdx + 2; + } + } + List preparedValues = parsePrepareValues(packet, parseIdx, paramFormatTypeArr, + prepareSqlMap.get(bindStatementId)); + newOrMergeBatchParam(packet, prepareSqlMap.get(bindStatementId), preparedValues); + } + + private List parsePrepareValues(PacketData packet, int startIdx, int[] paramFormatTypeArr, + SqlInfo pbeSqlInfo) { + int paramLength = CommonParser.parseSignedIntByBigEndian(packet.getData(), startIdx, 2); + int parseIdx = startIdx + 2; + + List preparedValues = new ArrayList<>(); + if (paramLength > 0) { + List typeList = pbeSqlInfo.getTypeList(); + if (paramLength != typeList.size()) { + throw new IllegalArgumentException("parameter value count and type count not match"); + } + for (int i = 0; i < paramLength; i++) { + PreparedValue preparedValue = PostgresDataTypeConverter.getValue(typeList.get(i), packet.getData(), + parseIdx, paramFormatTypeArr, i); + preparedValues.add(preparedValue); + parseIdx = parseIdx + preparedValue.getOffset(); + } + } + return preparedValues; + } + + private void newOrMergeBatchParam(PacketData packet, SqlInfo pbeSqlInfo, List preparedValues) { + if (CollectionUtils.isEmpty(this.pgIncompleteSqlList)) { + pgIncompleteSqlList.add(buildNewSqlInfo(packet, pbeSqlInfo, preparedValues)); + } else { + int idx = this.pgIncompleteSqlList.size() - 1; + for (; idx >= 0; idx--) { + SqlInfo sqlInfo = this.pgIncompleteSqlList.get(idx); + if (StringUtils.equals(sqlInfo.getSql(), pbeSqlInfo.getSql())) { + this.pgIncompleteSqlList.get(pgIncompleteSqlList.size() - 1).getParameterList() + .addAll(preparedValues); + break; + } + } + if (idx < 0) { + pgIncompleteSqlList.add(buildNewSqlInfo(packet, pbeSqlInfo, preparedValues)); + } + } + } + + private SqlInfo buildNewSqlInfo(PacketData packet, SqlInfo sqlInfo, List preparedValues) { + SqlInfo sql = sqlInfo.clone(); + sql.setSessionId(sessionId); + sql.setSqlId(packet.getPacketId()); + sql.setStartTime(packet.getMicrosecondTimestamp()); + sqlInfo.getParameterList().clear(); + sql.setParameterList(preparedValues); + return sql; + } + + @Override + protected void parseSql(PacketData packet) { + byte[] data = packet.getData(); + int endIdx = 5; + for (; endIdx < data.length; endIdx++) { + if (data[endIdx] == 0x00) { + break; + } + } + String sql = CommonParser.parseByteToString(data, 5, endIdx).trim(); + SqlInfo sqlObject = new SqlInfo(packet.getPacketId(), false, sql); + sqlObject.encapsulateSql(username, schema, sessionId, 0); + sqlObject.setStartTime(packet.getMicrosecondTimestamp()); + pgIncompleteSqlList.add(sqlObject); + } + + @Override + protected void initLoginRequest(PacketData packet) { + byte[] data = packet.getData(); + int ptr = 8; + Map loginParamMap = new HashMap<>(); + + while (ptr < data.length) { + ptr = extractParam(data, ptr, PostgresProtocolConstant.USER_IN_STARTUP, loginParamMap); + ptr = extractParam(data, ptr, PostgresProtocolConstant.DATABASE_IN_STARTUP, loginParamMap); + ptr++; + } + if (loginParamMap.containsKey(PostgresProtocolConstant.USER_IN_STARTUP) + && loginParamMap.containsKey(PostgresProtocolConstant.DATABASE_IN_STARTUP)) { + this.username = loginParamMap.get(PostgresProtocolConstant.USER_IN_STARTUP); + this.schema = loginParamMap.get(PostgresProtocolConstant.DATABASE_IN_STARTUP); + } + } + + private int extractParam(byte[] data, int ptr, String paramField, Map loginParamMap) { + int endPtr = ptr; + if (data.length - ptr >= paramField.length()) { + String fieldName = CommonParser.parseByteToString(data, ptr, ptr + paramField.length()); + if (paramField.equals(fieldName)) { + int userStartPtr = ptr + paramField.length() + 1; + int userNameEndPtr = getStringEndIndex(data, userStartPtr); + if (userNameEndPtr >= userStartPtr) { + String username = CommonParser.parseByteToString(data, userStartPtr, userNameEndPtr); + loginParamMap.put(paramField, username); + endPtr = userNameEndPtr + 1; + } + } + } + return endPtr; + } + + private boolean isStartUpPacket(PacketData packet) { + int dataLength = packet.getData().length; + if (dataLength < 8) { + return false; + } + try { + int lengthInHead = CommonParser.parseSignedIntByBigEndian(packet.getData(), 0, 4); + if (dataLength != Math.toIntExact(lengthInHead)) { + return false; + } + } catch (IllegalArgumentException | ArithmeticException e) { + return false; + } + for (int i = 0; i < 4; i++) { + if (PostgresProtocolConstant.PROTOCOL_VER_IN_STARTUP[i] != packet.getData()[i + 4]) { + return false; + } + } + return true; + } +} \ No newline at end of file diff --git a/dynamic_sql_collection/transcribe-replay-tool/src/main/java/org/opengauss/tool/parse/postgres/PostgresProtocolConstant.java b/dynamic_sql_collection/transcribe-replay-tool/src/main/java/org/opengauss/tool/parse/postgres/PostgresProtocolConstant.java new file mode 100644 index 0000000..315b539 --- /dev/null +++ b/dynamic_sql_collection/transcribe-replay-tool/src/main/java/org/opengauss/tool/parse/postgres/PostgresProtocolConstant.java @@ -0,0 +1,131 @@ +/* + * Copyright (c) Huawei Technologies Co., Ltd. 2024-2024. All rights reserved. + * + * openGauss is licensed under Mulan PSL v2. + * You can use this software according to the terms and conditions of the Mulan PSL v2. + * You may obtain a copy of Mulan PSL v2 at: + * + * http://license.coscl.org.cn/MulanPSL2 + * + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PSL v2 for more details. + * ------------------------------------------------------------------------- + */ + +package org.opengauss.tool.parse.postgres; + +import com.google.common.collect.ImmutableSet; + +import java.util.Set; + +/** + * Description: Postgresql Protocol constant value + * + * @author : caoyang + * @since : 2024/11/28 + */ +public final class PostgresProtocolConstant { + /** + * Request type set of postgresql packet + */ + public static final Set REQUEST_TYPE_SET; + + /** + * protocol version in startup + */ + public static final byte[] PROTOCOL_VER_IN_STARTUP = {0, 3, 0, 0}; + + /** + * user in startup + */ + public static final String USER_IN_STARTUP = "user"; + + /** + * database in startup + */ + public static final String DATABASE_IN_STARTUP = "database"; + + /** + * query packet type + */ + public static final String COM_QUERY = "Q"; + + /** + * terminate conn + */ + public static final String COM_TERMINATE = "X"; + + /** + * statement prepared packet type + */ + public static final String COM_STMT_PARSE = "P"; + + /** + * statement prepared packet type + */ + public static final String COM_STMT_BIND = "B"; + + /** + * statement execute packet type + */ + public static final String COM_STMT_EXECUTE = "E"; + + /** + * statement close packet type + */ + public static final String COM_STMT_CLOSE = "C"; + + /** + * statement close packet type + */ + public static final String COM_STMT_SYNC = "S"; + + /** + * statement reset packet type + */ + public static final String COM_STMT_RESET = "a"; + + /** + * copy fail + */ + public static final String COPY_DATA = "d"; + + /** + * copy done + */ + public static final String COPY_DONE = "c"; + + /** + * copy fail + */ + public static final String COPY_FAIL = "f"; + + /** + * describe + */ + public static final String DESCRIBE = "D"; + + /** + * flush + */ + public static final String FLUSH = "H"; + + /** + * function call + */ + public static final String FUNCTION_CALL = "F"; + + /** + * number of bytes which present the param value length in postgres protocol + */ + public static final int PARAM_VALUE_LENGTH_BYTES = 4; + + static { + REQUEST_TYPE_SET = ImmutableSet.builder().add(COM_QUERY, COM_STMT_PARSE, COM_STMT_PARSE, COM_STMT_BIND, + COM_STMT_EXECUTE, COM_STMT_CLOSE, COM_STMT_SYNC, COM_STMT_RESET, COPY_DATA, COPY_DONE, COPY_FAIL, + DESCRIBE, FLUSH, FUNCTION_CALL) + .build(); + } +} diff --git a/dynamic_sql_collection/transcribe-replay-tool/src/main/java/org/opengauss/tool/utils/CommonParser.java b/dynamic_sql_collection/transcribe-replay-tool/src/main/java/org/opengauss/tool/utils/CommonParser.java index 0df01fe..e5a39f3 100644 --- a/dynamic_sql_collection/transcribe-replay-tool/src/main/java/org/opengauss/tool/utils/CommonParser.java +++ b/dynamic_sql_collection/transcribe-replay-tool/src/main/java/org/opengauss/tool/utils/CommonParser.java @@ -15,6 +15,7 @@ package org.opengauss.tool.utils; +import org.postgresql.util.ByteConverter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -94,9 +95,8 @@ public final class CommonParser { * Parse int by big endian * * @param packet byte[] the packet - * @param start int the start index - * @param end int the end index - * + * @param start int the start index + * @param end int the end index * @return int the parse result */ public static int parseIntByBigEndian(byte[] packet, int start, int end) { @@ -107,12 +107,44 @@ public final class CommonParser { * Parse int by big endian * * @param packet byte[] the packet - * @param start int the start index - * @param end int the end index - * + * @param start int the start index + * @param end int the end index * @return int the parse result */ public static int parseIntByLittleEndian(byte[] packet, int start, int end) { return Integer.parseInt(parseByLittleEndian(packet, start, end), 16); } + + /** + * Convert a variable length array of bytes to an integer + * + * @param bytes array of bytes that can be decoded as an integer + * @param startIdx the start index + * @param length integer length + * @return int the parse result + */ + public static int parseSignedIntByBigEndian(byte[] bytes, int startIdx, int length) { + if (length == 1) { + return bytes[startIdx]; + } + if (length == 2) { + return ByteConverter.int2(bytes, startIdx); + } + if (length == 4) { + return ByteConverter.int4(bytes, startIdx); + } else { + throw new IllegalArgumentException("Argument bytes is empty"); + } + } + + /** + * Convert a variable length array of bytes to a long + * + * @param bytes array of bytes that can be decoded as a signed long + * @param startIdx the start index + * @return long the parse result + */ + public static long parseSignedLongByBigEndian(byte[] bytes, int startIdx) { + return ByteConverter.int8(bytes, startIdx); + } } -- Gitee From 99fd3a12da05a90a62f2650fea52f986766d55d3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=BB=BB=E8=95=BE=E7=A6=8F?= Date: Fri, 24 Jan 2025 16:41:14 +0800 Subject: [PATCH 2/2] =?UTF-8?q?=E5=90=88=E5=B9=B6commit?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 支持panwei回放 cherrypick 1 replayPanWei_v2 replayPanWei_v3 cherrypick 2 根据规范修改代码 开源已提交的代码,保持现状System.exit(-1); cherrypick 3 merge最新版master 修改代码审查出现的问题 cherrypick 4 删除未用的import 完成cherrypick 合并commit 根据评论修改内容 规范代码 --- .../config/replay.properties | 2 + .../transcribe-replay-tool/pom.xml | 5 ++ .../opengauss/tool/config/DatabaseConfig.java | 1 + .../tool/config/replay/ReplayConfig.java | 1 + .../tool/parse/object/DatabaseTypeEnum.java | 3 + .../postgres/PostgresDataTypeConverter.java | 83 ++++++++++++++----- .../tool/parse/postgres/PostgresParser.java | 5 +- .../factory/ReplayConnectionFactory.java | 13 ++- .../tool/replay/model/ParameterTypeEnum.java | 66 ++++++++++++++- .../replay/operator/ReplaySqlOperator.java | 7 +- .../tool/replay/task/SingleReplayThread.java | 1 + .../opengauss/tool/utils/ConfigReader.java | 5 ++ .../tool/utils/ConnectionFactory.java | 4 +- 13 files changed, 167 insertions(+), 29 deletions(-) diff --git a/dynamic_sql_collection/transcribe-replay-tool/config/replay.properties b/dynamic_sql_collection/transcribe-replay-tool/config/replay.properties index 019982c..329fd8e 100644 --- a/dynamic_sql_collection/transcribe-replay-tool/config/replay.properties +++ b/dynamic_sql_collection/transcribe-replay-tool/config/replay.properties @@ -34,6 +34,8 @@ source.time.interval.replay=false sql.replay.database.ip=127.0.0.1 # port sql.replay.database.port=5432 +# type: MYSQL OPENGAUSS +sql.replay.database.type=OPENGAUSS # schema mapping, separate with ';', default schema is public sql.replay.database.schema.map=test_db1:replay.test_db1;test_db2:replay; # username diff --git a/dynamic_sql_collection/transcribe-replay-tool/pom.xml b/dynamic_sql_collection/transcribe-replay-tool/pom.xml index 2535853..df8c811 100644 --- a/dynamic_sql_collection/transcribe-replay-tool/pom.xml +++ b/dynamic_sql_collection/transcribe-replay-tool/pom.xml @@ -55,6 +55,11 @@ mysql-connector-java 8.0.27 + + org.postgresql + postgresql + 42.3.2 + org.projectlombok lombok diff --git a/dynamic_sql_collection/transcribe-replay-tool/src/main/java/org/opengauss/tool/config/DatabaseConfig.java b/dynamic_sql_collection/transcribe-replay-tool/src/main/java/org/opengauss/tool/config/DatabaseConfig.java index 265baad..2a3ac7d 100644 --- a/dynamic_sql_collection/transcribe-replay-tool/src/main/java/org/opengauss/tool/config/DatabaseConfig.java +++ b/dynamic_sql_collection/transcribe-replay-tool/src/main/java/org/opengauss/tool/config/DatabaseConfig.java @@ -34,6 +34,7 @@ public class DatabaseConfig { private String dbName; private String password; private String tableName; + private String dbType; private boolean isCluster; /** diff --git a/dynamic_sql_collection/transcribe-replay-tool/src/main/java/org/opengauss/tool/config/replay/ReplayConfig.java b/dynamic_sql_collection/transcribe-replay-tool/src/main/java/org/opengauss/tool/config/replay/ReplayConfig.java index 9b2648e..ea2d54b 100644 --- a/dynamic_sql_collection/transcribe-replay-tool/src/main/java/org/opengauss/tool/config/replay/ReplayConfig.java +++ b/dynamic_sql_collection/transcribe-replay-tool/src/main/java/org/opengauss/tool/config/replay/ReplayConfig.java @@ -93,6 +93,7 @@ public class ReplayConfig { targetDbConfig.setDbPort(props.getProperty(ConfigReader.SQL_REPLAY_DATABASE_PORT)); targetDbConfig.setUsername(props.getProperty(ConfigReader.SQL_REPLAY_DATABASE_USERNAME)); targetDbConfig.setPassword(props.getProperty(ConfigReader.SQL_REPLAY_DATABASE_PASSWORD)); + targetDbConfig.setDbType(props.getProperty(ConfigReader.SQL_REPLAY_DATABASE_TYPE)); targetDbConfig.setCluster(targetDbConfig.getDbIp().contains(",")); if (ConfigReader.DB.equalsIgnoreCase(storageMode)) { sourceDbConfig = new DatabaseConfig(); diff --git a/dynamic_sql_collection/transcribe-replay-tool/src/main/java/org/opengauss/tool/parse/object/DatabaseTypeEnum.java b/dynamic_sql_collection/transcribe-replay-tool/src/main/java/org/opengauss/tool/parse/object/DatabaseTypeEnum.java index c68246a..cb30747 100644 --- a/dynamic_sql_collection/transcribe-replay-tool/src/main/java/org/opengauss/tool/parse/object/DatabaseTypeEnum.java +++ b/dynamic_sql_collection/transcribe-replay-tool/src/main/java/org/opengauss/tool/parse/object/DatabaseTypeEnum.java @@ -45,6 +45,9 @@ public enum DatabaseTypeEnum { } }, + /** + * postGreSql + */ POSTGRESQL { @Override public ParseThread getSuitableProtocolParser(String clientId) { diff --git a/dynamic_sql_collection/transcribe-replay-tool/src/main/java/org/opengauss/tool/parse/postgres/PostgresDataTypeConverter.java b/dynamic_sql_collection/transcribe-replay-tool/src/main/java/org/opengauss/tool/parse/postgres/PostgresDataTypeConverter.java index f8fdb01..ce7adef 100644 --- a/dynamic_sql_collection/transcribe-replay-tool/src/main/java/org/opengauss/tool/parse/postgres/PostgresDataTypeConverter.java +++ b/dynamic_sql_collection/transcribe-replay-tool/src/main/java/org/opengauss/tool/parse/postgres/PostgresDataTypeConverter.java @@ -1,5 +1,5 @@ /* - * Copyright (c) Huawei Technologies Co., Ltd. 2024-2024. All rights reserved. + * Copyright (c) Huawei Technologies Co., Ltd. 2024-2025. All rights reserved. * * openGauss is licensed under Mulan PSL v2. * You can use this software according to the terms and conditions of the Mulan PSL v2. @@ -23,6 +23,10 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.math.BigDecimal; +import java.util.Date; +import java.time.Instant; +import java.time.ZoneId; +import java.time.format.DateTimeFormatter; import java.util.Optional; /** @@ -47,9 +51,16 @@ public final class PostgresDataTypeConverter { INT4(23, "int4", PostgresDataTypeConverter::convertIntValue), INT8(20, "int8", PostgresDataTypeConverter::convertLongValue), VARCHAR(1043, "varchar", PostgresDataTypeConverter::convertStringValue), + VARCHAR2(9129, "varchar2", PostgresDataTypeConverter::convertStringValue), + BPCHAR(1042, "bpchar", PostgresDataTypeConverter::convertStringValue), FLOAT8(701, "float8", PostgresDataTypeConverter::convertDoubleValue), FLOAT4(700, "float4", PostgresDataTypeConverter::convertFloatValue), - NUMERIC(1700, "numeric", PostgresDataTypeConverter::convertNumericValue); + NUMERIC(1700, "numeric", PostgresDataTypeConverter::convertNumericValue), + NVARCHAR2(9025, "nvarchar2", PostgresDataTypeConverter::convertStringValue), + TEXT(25, "text", PostgresDataTypeConverter::convertStringValue), + TIMESTAMP(1114, "timestamp", PostgresDataTypeConverter::convertTimeStampValue), + DATE(1082, "date", PostgresDataTypeConverter::convertDateValue), + STRING(0, "string", PostgresDataTypeConverter::convertStringValue); private final int oid; private final String typeName; @@ -80,9 +91,9 @@ public final class PostgresDataTypeConverter { /** * Get prepared value * - * @param dataType String the data type - * @param data byte[] the data - * @param startIndex int the start index + * @param dataType String the data type + * @param data byte[] the data + * @param startIndex int the start index * @param formatArray int[] the format tye array defined by previous data * @param paramNo int the param index in parameter list * @return PreparedValue the prepared parameter @@ -97,25 +108,25 @@ public final class PostgresDataTypeConverter { } boolean isText = isText(formatArray, paramNo); - if (isText) { - PreparedValue preparedValue = convertStringValue(data, - startIndex + PostgresProtocolConstant.PARAM_VALUE_LENGTH_BYTES, - paramValueLength); - preparedValue.setType(dataType); - return preparedValue; - } Optional dataTypeOptional = PgDataType.getDataType(dataType); if (!dataTypeOptional.isPresent()) { LOGGER.error("unsupported data type, oid: {}", dataType); } - PGValueConverter converter = dataTypeOptional.isPresent() - ? dataTypeOptional.get().getValueConverter() - : PostgresDataTypeConverter::unsupportedTypeConverter; - PreparedValue preparedValue = converter.convert( - data, - startIndex + PostgresProtocolConstant.PARAM_VALUE_LENGTH_BYTES, - paramValueLength); - preparedValue.setType(dataType); + PreparedValue preparedValue; + if (isText) { + preparedValue = convertStringValue(data, + startIndex + PostgresProtocolConstant.PARAM_VALUE_LENGTH_BYTES, + paramValueLength); + preparedValue.setType(dataTypeOptional.map(Enum::name).orElse(dataType)); + } else { + PGValueConverter converter = dataTypeOptional.isPresent() + ? dataTypeOptional.get().getValueConverter() + : PostgresDataTypeConverter::unsupportedTypeConverter; + preparedValue = converter.convert(data, + startIndex + PostgresProtocolConstant.PARAM_VALUE_LENGTH_BYTES, + paramValueLength); + preparedValue.setType(dataTypeOptional.map(Enum::name).orElse(dataType)); + } return preparedValue; } @@ -203,6 +214,21 @@ public final class PostgresDataTypeConverter { return new PreparedValue(value, length + PostgresProtocolConstant.PARAM_VALUE_LENGTH_BYTES); } + /** + * convertDateValue + * + * @param packet byte[] packet + * @param start int the start index + * @param length int the length of bytes + * @return PreparedValue the preparedValue + */ + public static PreparedValue convertDateValue(byte[] packet, int start, int length) { + long timeStamp = CommonParser.parseTimestamp(packet); + Date date = new Date(timeStamp); + String value = date.toString(); + return new PreparedValue(value, length + PostgresProtocolConstant.PARAM_VALUE_LENGTH_BYTES); + } + /** * Convert numeric value * @@ -222,6 +248,23 @@ public final class PostgresDataTypeConverter { return new PreparedValue(value, length + PostgresProtocolConstant.PARAM_VALUE_LENGTH_BYTES); } + /** + * Convert TimeStamp value + * + * @param packet byte[] the packet + * @param start int the start index + * @param length int the length of bytes + * @return PreparedValue the preparedValue + */ + public static PreparedValue convertTimeStampValue(byte[] packet, int start, int length) { + long timeStamp = CommonParser.parseTimestamp(packet); + Instant instant = Instant.ofEpochMilli(timeStamp); + DateTimeFormatter formatter = + DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS").withZone(ZoneId.systemDefault()); + String value = formatter.format(instant); + return new PreparedValue(value, length + PostgresProtocolConstant.PARAM_VALUE_LENGTH_BYTES); + } + /** * not supported type converter * diff --git a/dynamic_sql_collection/transcribe-replay-tool/src/main/java/org/opengauss/tool/parse/postgres/PostgresParser.java b/dynamic_sql_collection/transcribe-replay-tool/src/main/java/org/opengauss/tool/parse/postgres/PostgresParser.java index e6c8112..415b3ee 100644 --- a/dynamic_sql_collection/transcribe-replay-tool/src/main/java/org/opengauss/tool/parse/postgres/PostgresParser.java +++ b/dynamic_sql_collection/transcribe-replay-tool/src/main/java/org/opengauss/tool/parse/postgres/PostgresParser.java @@ -175,7 +175,7 @@ public class PostgresParser extends ParseThread { int startIdx = statementIdEndIdx + 1; int sqlEndIdx = getStringEndIndex(data, startIdx); - String sql = CommonParser.parseByteToString(data, startIdx, sqlEndIdx).trim() + ";"; + String sql = CommonParser.parseByteToString(data, startIdx, sqlEndIdx).trim(); int paramCount = CommonParser.parseSignedIntByBigEndian(data, sqlEndIdx + 1, 2); int paramIdx = sqlEndIdx + 3; @@ -208,7 +208,8 @@ public class PostgresParser extends ParseThread { int statementIdEndIdx = getStringEndIndex(data, parseIdx); if (statementIdEndIdx < 0) { - LOGGER.error("parse statementId in bind param error, packetId: {}", packet.getPacketId()); + LOGGER.error("parse statementId in bind param error, packetId: {}, locationFile: {}, idInFile: {}", + packet.getPacketId(), packet.getLocationFile(), packet.getIdInFile()); return; } String bindStatementId = CommonParser.parseByteToString(data, parseIdx, statementIdEndIdx).trim(); diff --git a/dynamic_sql_collection/transcribe-replay-tool/src/main/java/org/opengauss/tool/replay/factory/ReplayConnectionFactory.java b/dynamic_sql_collection/transcribe-replay-tool/src/main/java/org/opengauss/tool/replay/factory/ReplayConnectionFactory.java index ff77aad..410e9f0 100644 --- a/dynamic_sql_collection/transcribe-replay-tool/src/main/java/org/opengauss/tool/replay/factory/ReplayConnectionFactory.java +++ b/dynamic_sql_collection/transcribe-replay-tool/src/main/java/org/opengauss/tool/replay/factory/ReplayConnectionFactory.java @@ -69,13 +69,24 @@ public class ReplayConnectionFactory { return Optional.empty(); } targetDbConfig.setDbName(schemaMap.get(schema)); - Connection connection = ConnectionFactory.createConnection(targetDbConfig, ConnectionFactory.OPENGAUSS); + Connection connection = startConnection(targetDbConfig, + targetDbConfig.getDbType()); setSessionConfig(connection); connectionMap.put(Thread.currentThread().getName() + schema, connection); } return Optional.of(connectionMap.get(Thread.currentThread().getName() + schema)); } + private Connection startConnection(DatabaseConfig targetDbConfig, String dbType) { + if (dbType.equalsIgnoreCase("OPENGAUSS")) { + return ConnectionFactory.createConnection(targetDbConfig, ConnectionFactory.OPENGAUSS); + } else if (dbType.equalsIgnoreCase("MYSQL")) { + return ConnectionFactory.createConnection(targetDbConfig, ConnectionFactory.MYSQL); + } else { + throw new IllegalArgumentException("Unknown database type: " + dbType); + } + } + private void setSessionConfig(Connection connection) { Statement stmt = null; try { diff --git a/dynamic_sql_collection/transcribe-replay-tool/src/main/java/org/opengauss/tool/replay/model/ParameterTypeEnum.java b/dynamic_sql_collection/transcribe-replay-tool/src/main/java/org/opengauss/tool/replay/model/ParameterTypeEnum.java index 100802c..344774b 100644 --- a/dynamic_sql_collection/transcribe-replay-tool/src/main/java/org/opengauss/tool/replay/model/ParameterTypeEnum.java +++ b/dynamic_sql_collection/transcribe-replay-tool/src/main/java/org/opengauss/tool/replay/model/ParameterTypeEnum.java @@ -43,6 +43,15 @@ public enum ParameterTypeEnum { preSqlStmt.setInt(paramIndex, Integer.parseInt(paramValue)); } }, + /** + * float + */ + FLOAT { + @Override + public void setParam(PreparedStatement preSqlStmt, int paramIndex, String paramValue) throws SQLException { + preSqlStmt.setFloat(paramIndex, Float.parseFloat(paramValue)); + } + }, /** * double */ @@ -68,12 +77,22 @@ public enum ParameterTypeEnum { @Override public void setParam(PreparedStatement preSqlStmt, int paramIndex, String paramValue) throws SQLException { try { - preSqlStmt.setTimestamp(paramIndex, Timestamp.valueOf(paramValue)); + String tempParamValue = paramValue; + if (tempParamValue.length() > 15) { + tempParamValue = tempParamValue.substring(0, 19); + preSqlStmt.setTimestamp(paramIndex, Timestamp.valueOf(tempParamValue)); + } else { + tempParamValue = tempParamValue.substring(0, 10); + preSqlStmt.setDate(paramIndex, Date.valueOf(tempParamValue)); + } } catch (IllegalArgumentException e) { throw new SQLException("Invalid timestamp format: " + paramValue, e); } } }, + /** + * numeric + */ NUMERIC { @Override public void setParam(PreparedStatement preSqlStmt, int paramIndex, String paramValue) throws SQLException { @@ -98,6 +117,15 @@ public enum ParameterTypeEnum { preSqlStmt.setObject(paramIndex, paramValue); } }, + /** + * boolean + */ + BOOL { + @Override + public void setParam(PreparedStatement preSqlStmt, int paramIndex, String paramValue) throws SQLException { + preSqlStmt.setBoolean(paramIndex, Boolean.parseBoolean(paramValue)); + } + }, /** * date */ @@ -185,11 +213,45 @@ public enum ParameterTypeEnum { * @return ParameterType */ public static ParameterTypeEnum fromTypeName(String typeStr) { + String newType = findType(typeStr); for (ParameterTypeEnum type : values()) { - if (type.name().equalsIgnoreCase(typeStr)) { + if (type.name().equalsIgnoreCase(newType)) { return type; } } throw new IllegalArgumentException("Unsupported parameter type: " + typeStr); } + + /** + * findType + * + * @param typeStr String typeStr + * @return String + */ + public static String findType(String typeStr) { + switch (typeStr) { + case "INT2": + case "INT4": + return "INT"; + + case "INT8": + return "LONG"; + + case "VARCHAR": + case "BPCHAR": + case "VARCHAR2": + case "NVARCHAR2": + case "TEXT": + return "STRING"; + + case "FLOAT4": + return "FLOAT"; + + case "FLOAT8": + return "DOUBLE"; + + default: + return typeStr; + } + } } diff --git a/dynamic_sql_collection/transcribe-replay-tool/src/main/java/org/opengauss/tool/replay/operator/ReplaySqlOperator.java b/dynamic_sql_collection/transcribe-replay-tool/src/main/java/org/opengauss/tool/replay/operator/ReplaySqlOperator.java index 69efedd..e1993c0 100644 --- a/dynamic_sql_collection/transcribe-replay-tool/src/main/java/org/opengauss/tool/replay/operator/ReplaySqlOperator.java +++ b/dynamic_sql_collection/transcribe-replay-tool/src/main/java/org/opengauss/tool/replay/operator/ReplaySqlOperator.java @@ -196,7 +196,12 @@ public class ReplaySqlOperator { String explainStr = executeAndGetExplain(replayConn, sql); int lastColonsIndex = explainStr.lastIndexOf(":"); int msIndex = explainStr.lastIndexOf("ms"); - long duration = (long) (Double.parseDouble(explainStr.substring(lastColonsIndex + 1, msIndex).trim()) * 1000); + long duration; + try { + duration = (long) (Double.parseDouble(explainStr.substring(lastColonsIndex + 1, msIndex).trim()) * 1000); + } catch (StringIndexOutOfBoundsException e) { + duration = 10; + } return getExecuteResponse(sqlModel, duration, explainStr); } diff --git a/dynamic_sql_collection/transcribe-replay-tool/src/main/java/org/opengauss/tool/replay/task/SingleReplayThread.java b/dynamic_sql_collection/transcribe-replay-tool/src/main/java/org/opengauss/tool/replay/task/SingleReplayThread.java index 5e00b2e..2567979 100644 --- a/dynamic_sql_collection/transcribe-replay-tool/src/main/java/org/opengauss/tool/replay/task/SingleReplayThread.java +++ b/dynamic_sql_collection/transcribe-replay-tool/src/main/java/org/opengauss/tool/replay/task/SingleReplayThread.java @@ -190,6 +190,7 @@ public class SingleReplayThread extends ReplayThread { } private synchronized void closeThread(Set sessions) { + LOGGER.info("Closing replay thread"); String firstSession = sessions.stream().findFirst().orElse(StringUtils.EMPTY); if (!singleThreadModel.getSessionThreadMap().containsKey(firstSession)) { return; diff --git a/dynamic_sql_collection/transcribe-replay-tool/src/main/java/org/opengauss/tool/utils/ConfigReader.java b/dynamic_sql_collection/transcribe-replay-tool/src/main/java/org/opengauss/tool/utils/ConfigReader.java index 6eaf5fe..64e0003 100644 --- a/dynamic_sql_collection/transcribe-replay-tool/src/main/java/org/opengauss/tool/utils/ConfigReader.java +++ b/dynamic_sql_collection/transcribe-replay-tool/src/main/java/org/opengauss/tool/utils/ConfigReader.java @@ -446,6 +446,11 @@ public final class ConfigReader { */ public static final String SQL_REPLAY_DATABASE_PORT = "sql.replay.database.port"; + /** + * sql replay database type + */ + public static final String SQL_REPLAY_DATABASE_TYPE = "sql.replay.database.type"; + /** * sql replay database username */ diff --git a/dynamic_sql_collection/transcribe-replay-tool/src/main/java/org/opengauss/tool/utils/ConnectionFactory.java b/dynamic_sql_collection/transcribe-replay-tool/src/main/java/org/opengauss/tool/utils/ConnectionFactory.java index 07d7419..b2cef69 100644 --- a/dynamic_sql_collection/transcribe-replay-tool/src/main/java/org/opengauss/tool/utils/ConnectionFactory.java +++ b/dynamic_sql_collection/transcribe-replay-tool/src/main/java/org/opengauss/tool/utils/ConnectionFactory.java @@ -44,7 +44,6 @@ public final class ConnectionFactory { private static final String CLUSTER_JDBC_URL = "jdbc:postgresql://%s/%s?currentSchema=%s&targetServerType=master" + "&loggerLevel=error"; - private static Connection createMysqlConnection(DatabaseConfig config) { String url = "jdbc:mysql://" + config.getDbIp() + ":" + config.getDbPort() + "/" + config.getDbName() + "?useSSL=false&allowPublicKeyRetrieval=true&" @@ -93,9 +92,8 @@ public final class ConnectionFactory { public static Connection createConnection(DatabaseConfig config, String dbType) { if (MYSQL.equalsIgnoreCase(dbType)) { return createMysqlConnection(config); - } else { - return createOpengaussConnection(config); } + return createOpengaussConnection(config); } private static Connection createOpengaussConnection(DatabaseConfig config) { -- Gitee