From 5c5578225b83f9836977776b0d31c1b1dee4ce2b Mon Sep 17 00:00:00 2001 From: liujiawen Date: Mon, 17 Jan 2022 16:45:09 +0800 Subject: [PATCH 1/9] =?UTF-8?q?Mqtt=205.0=20=E6=94=AF=E6=8C=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../java/io/github/quickmsg/core/protocol/ConnectProtocol.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/smqtt-core/src/main/java/io/github/quickmsg/core/protocol/ConnectProtocol.java b/smqtt-core/src/main/java/io/github/quickmsg/core/protocol/ConnectProtocol.java index 6e6bd47a..316883a6 100644 --- a/smqtt-core/src/main/java/io/github/quickmsg/core/protocol/ConnectProtocol.java +++ b/smqtt-core/src/main/java/io/github/quickmsg/core/protocol/ConnectProtocol.java @@ -85,7 +85,7 @@ public class ConnectProtocol implements Protocol { } /*protocol version support*/ if (MqttVersion.MQTT_3_1_1.protocolLevel() != (byte) mqttConnectVariableHeader.version() - && MqttVersion.MQTT_3_1.protocolLevel() != (byte) mqttConnectVariableHeader.version()) { + && MqttVersion.MQTT_5.protocolLevel() != (byte) mqttConnectVariableHeader.version()) { return mqttChannel.write( MqttMessageBuilder.buildConnectAck(MqttConnectReturnCode.CONNECTION_REFUSED_UNACCEPTABLE_PROTOCOL_VERSION), false).then(mqttChannel.close()); -- Gitee From da86657f144fe28584dba23fad77329cb03a436e Mon Sep 17 00:00:00 2001 From: liujiawen Date: Mon, 24 Jan 2022 11:48:39 +0800 Subject: [PATCH 2/9] =?UTF-8?q?Mqtt=20=E6=B6=88=E6=81=AF=E6=94=AF=E6=8C=81?= =?UTF-8?q?5.0=20properties?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../common/interceptor/MessageProxy.java | 1 + .../common/message/HeapMqttMessage.java | 3 +++ .../common/message/MqttMessageBuilder.java | 27 +++++++++++++++++++ .../common/message/RetainMessage.java | 9 ++++++- .../common/message/SessionMessage.java | 7 ++++- .../quickmsg/common/utils/MessageUtils.java | 4 ++- .../core/cluster/ClusterReceiver.java | 2 +- 7 files changed, 49 insertions(+), 4 deletions(-) diff --git a/smqtt-common/src/main/java/io/github/quickmsg/common/interceptor/MessageProxy.java b/smqtt-common/src/main/java/io/github/quickmsg/common/interceptor/MessageProxy.java index cb6d17b6..e34f7c8a 100644 --- a/smqtt-common/src/main/java/io/github/quickmsg/common/interceptor/MessageProxy.java +++ b/smqtt-common/src/main/java/io/github/quickmsg/common/interceptor/MessageProxy.java @@ -78,6 +78,7 @@ public class MessageProxy { .topic(header.topicName()) .retain(fixedHeader.isRetain()) .qos(fixedHeader.qosLevel().value()) + .properties(header.properties()) .build(); } diff --git a/smqtt-common/src/main/java/io/github/quickmsg/common/message/HeapMqttMessage.java b/smqtt-common/src/main/java/io/github/quickmsg/common/message/HeapMqttMessage.java index 19c340da..2ab22342 100644 --- a/smqtt-common/src/main/java/io/github/quickmsg/common/message/HeapMqttMessage.java +++ b/smqtt-common/src/main/java/io/github/quickmsg/common/message/HeapMqttMessage.java @@ -1,6 +1,7 @@ package io.github.quickmsg.common.message; import io.github.quickmsg.common.utils.JacksonUtil; +import io.netty.handler.codec.mqtt.MqttProperties; import io.netty.util.internal.StringUtil; import lombok.AllArgsConstructor; import lombok.Builder; @@ -31,6 +32,8 @@ public class HeapMqttMessage { private byte[] message; + private MqttProperties properties; + public Map getKeyMap() { Map keys = new HashMap<>(5); diff --git a/smqtt-common/src/main/java/io/github/quickmsg/common/message/MqttMessageBuilder.java b/smqtt-common/src/main/java/io/github/quickmsg/common/message/MqttMessageBuilder.java index d6a16539..1b705249 100644 --- a/smqtt-common/src/main/java/io/github/quickmsg/common/message/MqttMessageBuilder.java +++ b/smqtt-common/src/main/java/io/github/quickmsg/common/message/MqttMessageBuilder.java @@ -4,6 +4,7 @@ import io.netty.buffer.ByteBuf; import io.netty.handler.codec.mqtt.*; import java.util.List; +import java.util.Map; /** @@ -11,6 +12,32 @@ import java.util.List; */ public class MqttMessageBuilder { + private static MqttProperties genMqttProperties(Map userPropertiesMap) { + MqttProperties mqttProperties = null; + if (userPropertiesMap != null) { + mqttProperties = new MqttProperties(); + MqttProperties.UserProperties userProperties = new MqttProperties.UserProperties(); + for (Map.Entry entry : userPropertiesMap.entrySet()) { + userProperties.add(entry.getKey(), entry.getValue()); + } + mqttProperties.add(userProperties); + } + return mqttProperties; + } + + public static MqttPublishMessage buildPub(boolean isDup, MqttQoS qoS, int messageId, String topic, ByteBuf message, MqttProperties properties) { + MqttFixedHeader mqttFixedHeader = new MqttFixedHeader(MqttMessageType.PUBLISH, isDup, qoS, false, 0); + MqttPublishVariableHeader mqttPublishVariableHeader = new MqttPublishVariableHeader(topic, messageId, properties); + MqttPublishMessage mqttPublishMessage = new MqttPublishMessage(mqttFixedHeader, mqttPublishVariableHeader, message); + return mqttPublishMessage; + } + + public static MqttPublishMessage buildPub(boolean isDup, MqttQoS qoS, int messageId, String topic, ByteBuf message, Map userPropertiesMap) { + MqttFixedHeader mqttFixedHeader = new MqttFixedHeader(MqttMessageType.PUBLISH, isDup, qoS, false, 0); + MqttPublishVariableHeader mqttPublishVariableHeader = new MqttPublishVariableHeader(topic, messageId, genMqttProperties(userPropertiesMap)); + MqttPublishMessage mqttPublishMessage = new MqttPublishMessage(mqttFixedHeader, mqttPublishVariableHeader, message); + return mqttPublishMessage; + } public static MqttPublishMessage buildPub(boolean isDup, MqttQoS qoS, int messageId, String topic, ByteBuf message) { MqttFixedHeader mqttFixedHeader = new MqttFixedHeader(MqttMessageType.PUBLISH, isDup, qoS, false, 0); diff --git a/smqtt-common/src/main/java/io/github/quickmsg/common/message/RetainMessage.java b/smqtt-common/src/main/java/io/github/quickmsg/common/message/RetainMessage.java index fbc62352..a5bbfb3c 100644 --- a/smqtt-common/src/main/java/io/github/quickmsg/common/message/RetainMessage.java +++ b/smqtt-common/src/main/java/io/github/quickmsg/common/message/RetainMessage.java @@ -1,8 +1,11 @@ package io.github.quickmsg.common.message; +import java.util.Map; + import io.github.quickmsg.common.channel.MqttChannel; import io.github.quickmsg.common.utils.MessageUtils; import io.netty.buffer.PooledByteBufAllocator; +import io.netty.handler.codec.mqtt.MqttProperties; import io.netty.handler.codec.mqtt.MqttPublishMessage; import io.netty.handler.codec.mqtt.MqttPublishVariableHeader; import io.netty.handler.codec.mqtt.MqttQoS; @@ -22,12 +25,15 @@ public class RetainMessage { private byte[] body; + private MqttProperties properties; + public static RetainMessage of(MqttPublishMessage mqttPublishMessage) { MqttPublishVariableHeader publishVariableHeader = mqttPublishMessage.variableHeader(); return RetainMessage.builder() .topic(publishVariableHeader.topicName()) .qos(mqttPublishMessage.fixedHeader().qosLevel().value()) .body(MessageUtils.copyByteBuf(mqttPublishMessage.payload())) + .properties(publishVariableHeader.properties()) .build(); } @@ -37,7 +43,8 @@ public class RetainMessage { MqttQoS.valueOf(this.qos), qos > 0 ? mqttChannel.generateMessageId() : 0, topic, - PooledByteBufAllocator.DEFAULT.directBuffer().writeBytes(body)); + PooledByteBufAllocator.DEFAULT.directBuffer().writeBytes(body), + properties); } } diff --git a/smqtt-common/src/main/java/io/github/quickmsg/common/message/SessionMessage.java b/smqtt-common/src/main/java/io/github/quickmsg/common/message/SessionMessage.java index a3596af3..7323d504 100644 --- a/smqtt-common/src/main/java/io/github/quickmsg/common/message/SessionMessage.java +++ b/smqtt-common/src/main/java/io/github/quickmsg/common/message/SessionMessage.java @@ -3,6 +3,7 @@ package io.github.quickmsg.common.message; import io.github.quickmsg.common.channel.MqttChannel; import io.github.quickmsg.common.utils.MessageUtils; import io.netty.buffer.PooledByteBufAllocator; +import io.netty.handler.codec.mqtt.MqttProperties; import io.netty.handler.codec.mqtt.MqttPublishMessage; import io.netty.handler.codec.mqtt.MqttPublishVariableHeader; import io.netty.handler.codec.mqtt.MqttQoS; @@ -27,6 +28,8 @@ public class SessionMessage { private boolean retain; + private MqttProperties properties; + public static SessionMessage of(String clientIdentifier, MqttPublishMessage mqttPublishMessage) { MqttPublishVariableHeader publishVariableHeader = mqttPublishMessage.variableHeader(); return SessionMessage.builder() @@ -35,6 +38,7 @@ public class SessionMessage { .qos(mqttPublishMessage.fixedHeader().qosLevel().value()) .retain(mqttPublishMessage.fixedHeader().isRetain()) .body(MessageUtils.copyByteBuf(mqttPublishMessage.payload())) + .properties(publishVariableHeader.properties()) .build(); } @@ -44,7 +48,8 @@ public class SessionMessage { MqttQoS.valueOf(this.qos), qos > 0 ? mqttChannel.generateMessageId() : 0, topic, - PooledByteBufAllocator.DEFAULT.directBuffer().writeBytes(body)); + PooledByteBufAllocator.DEFAULT.directBuffer().writeBytes(body), + properties); } } diff --git a/smqtt-common/src/main/java/io/github/quickmsg/common/utils/MessageUtils.java b/smqtt-common/src/main/java/io/github/quickmsg/common/utils/MessageUtils.java index fa197573..f25c2789 100644 --- a/smqtt-common/src/main/java/io/github/quickmsg/common/utils/MessageUtils.java +++ b/smqtt-common/src/main/java/io/github/quickmsg/common/utils/MessageUtils.java @@ -67,7 +67,9 @@ public class MessageUtils { MqttPublishVariableHeader mqttPublishVariableHeader = message.variableHeader(); MqttFixedHeader mqttFixedHeader = message.fixedHeader(); MqttFixedHeader newFixedHeader = new MqttFixedHeader(mqttFixedHeader.messageType(), false, mqttQoS, false, mqttFixedHeader.remainingLength()); - MqttPublishVariableHeader newHeader = new MqttPublishVariableHeader(mqttPublishVariableHeader.topicName(), messageId); + // mqtt 5 support properties + System.out.println( "----" + mqttPublishVariableHeader.properties().getProperties(MqttProperties.MqttPropertyType.USER_PROPERTY.value()) ); + MqttPublishVariableHeader newHeader = new MqttPublishVariableHeader(mqttPublishVariableHeader.topicName(), messageId, mqttPublishVariableHeader.properties()); return new MqttPublishMessage(newFixedHeader, newHeader, message.payload().copy()); } diff --git a/smqtt-core/src/main/java/io/github/quickmsg/core/cluster/ClusterReceiver.java b/smqtt-core/src/main/java/io/github/quickmsg/core/cluster/ClusterReceiver.java index d751f33b..62d5f3a3 100644 --- a/smqtt-core/src/main/java/io/github/quickmsg/core/cluster/ClusterReceiver.java +++ b/smqtt-core/src/main/java/io/github/quickmsg/core/cluster/ClusterReceiver.java @@ -56,7 +56,7 @@ public class ClusterReceiver { MqttQoS.valueOf(heapMqttMessage.getQos()), 0, heapMqttMessage.getTopic(), - PooledByteBufAllocator.DEFAULT.buffer().writeBytes(heapMqttMessage.getMessage())), System.currentTimeMillis(), Boolean.TRUE); + PooledByteBufAllocator.DEFAULT.buffer().writeBytes(heapMqttMessage.getMessage()), heapMqttMessage.getProperties()), System.currentTimeMillis(), Boolean.TRUE); } } -- Gitee From a5420a6bb5482a03183e9e9d565189a35b50adf2 Mon Sep 17 00:00:00 2001 From: liujiawen Date: Mon, 24 Jan 2022 14:00:22 +0800 Subject: [PATCH 3/9] =?UTF-8?q?Mqtt=20=E6=B6=88=E6=81=AF=E6=94=AF=E6=8C=81?= =?UTF-8?q?5.0=20properties=20=E4=BB=A5=E5=8F=8A=20connect=20ack=20support?= =?UTF-8?q?=205.0=20code?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../common/message/MqttMessageBuilder.java | 28 ++++++++++++++++++- .../core/protocol/ConnectProtocol.java | 10 +++---- .../quickmsg/rule/node/TopicRuleNode.java | 5 ++-- 3 files changed, 35 insertions(+), 8 deletions(-) diff --git a/smqtt-common/src/main/java/io/github/quickmsg/common/message/MqttMessageBuilder.java b/smqtt-common/src/main/java/io/github/quickmsg/common/message/MqttMessageBuilder.java index 1b705249..6d807b43 100644 --- a/smqtt-common/src/main/java/io/github/quickmsg/common/message/MqttMessageBuilder.java +++ b/smqtt-common/src/main/java/io/github/quickmsg/common/message/MqttMessageBuilder.java @@ -6,6 +6,12 @@ import io.netty.handler.codec.mqtt.*; import java.util.List; import java.util.Map; +import static io.netty.handler.codec.mqtt.MqttConnectReturnCode.CONNECTION_REFUSED_BAD_USERNAME_OR_PASSWORD; +import static io.netty.handler.codec.mqtt.MqttConnectReturnCode.CONNECTION_REFUSED_CLIENT_IDENTIFIER_NOT_VALID; +import static io.netty.handler.codec.mqtt.MqttConnectReturnCode.CONNECTION_REFUSED_NOT_AUTHORIZED_5; +import static io.netty.handler.codec.mqtt.MqttConnectReturnCode.CONNECTION_REFUSED_SERVER_UNAVAILABLE_5; +import static io.netty.handler.codec.mqtt.MqttConnectReturnCode.CONNECTION_REFUSED_UNSUPPORTED_PROTOCOL_VERSION; + /** * @author luxurong @@ -96,7 +102,27 @@ public class MqttMessageBuilder { return new MqttUnsubAckMessage(mqttFixedHeader, variableHeader); } - public static MqttConnAckMessage buildConnectAck(MqttConnectReturnCode connectReturnCode) { + public static MqttConnAckMessage buildConnectAck(MqttConnectReturnCode connectReturnCode, int version) { + if (MqttVersion.MQTT_5.protocolLevel() != (byte) version) { + switch (connectReturnCode) { + case CONNECTION_REFUSED_IDENTIFIER_REJECTED: + connectReturnCode = CONNECTION_REFUSED_CLIENT_IDENTIFIER_NOT_VALID; + break; + case CONNECTION_REFUSED_UNACCEPTABLE_PROTOCOL_VERSION: + connectReturnCode = CONNECTION_REFUSED_UNSUPPORTED_PROTOCOL_VERSION; + break; + case CONNECTION_REFUSED_SERVER_UNAVAILABLE: + connectReturnCode = CONNECTION_REFUSED_SERVER_UNAVAILABLE_5; + break; + case CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD: + connectReturnCode = CONNECTION_REFUSED_BAD_USERNAME_OR_PASSWORD; + break; + case CONNECTION_REFUSED_NOT_AUTHORIZED: + connectReturnCode = CONNECTION_REFUSED_NOT_AUTHORIZED_5; + break; + + } + } MqttConnAckVariableHeader mqttConnAckVariableHeader = new MqttConnAckVariableHeader(connectReturnCode, false); MqttFixedHeader mqttFixedHeader = new MqttFixedHeader( MqttMessageType.CONNACK, false, MqttQoS.AT_MOST_ONCE, false, 0X02); diff --git a/smqtt-core/src/main/java/io/github/quickmsg/core/protocol/ConnectProtocol.java b/smqtt-core/src/main/java/io/github/quickmsg/core/protocol/ConnectProtocol.java index 316883a6..e90042ad 100644 --- a/smqtt-core/src/main/java/io/github/quickmsg/core/protocol/ConnectProtocol.java +++ b/smqtt-core/src/main/java/io/github/quickmsg/core/protocol/ConnectProtocol.java @@ -68,7 +68,7 @@ public class ConnectProtocol implements Protocol { if (mqttReceiveContext.getConfiguration().getConnectModel() == ConnectModel.UNIQUE) { if (channelRegistry.exists(clientIdentifier)) { return mqttChannel.write( - MqttMessageBuilder.buildConnectAck(MqttConnectReturnCode.CONNECTION_REFUSED_IDENTIFIER_REJECTED), + MqttMessageBuilder.buildConnectAck(MqttConnectReturnCode.CONNECTION_REFUSED_IDENTIFIER_REJECTED, mqttConnectVariableHeader.version()), false).then(mqttChannel.close()); } } else { @@ -78,7 +78,7 @@ public class ConnectProtocol implements Protocol { existMqttChannel.close().subscribe(); } else { return mqttChannel.write( - MqttMessageBuilder.buildConnectAck(MqttConnectReturnCode.CONNECTION_REFUSED_IDENTIFIER_REJECTED), + MqttMessageBuilder.buildConnectAck(MqttConnectReturnCode.CONNECTION_REFUSED_IDENTIFIER_REJECTED, mqttConnectVariableHeader.version()), false).then(mqttChannel.close()); } } @@ -87,7 +87,7 @@ public class ConnectProtocol implements Protocol { if (MqttVersion.MQTT_3_1_1.protocolLevel() != (byte) mqttConnectVariableHeader.version() && MqttVersion.MQTT_5.protocolLevel() != (byte) mqttConnectVariableHeader.version()) { return mqttChannel.write( - MqttMessageBuilder.buildConnectAck(MqttConnectReturnCode.CONNECTION_REFUSED_UNACCEPTABLE_PROTOCOL_VERSION), + MqttMessageBuilder.buildConnectAck(MqttConnectReturnCode.CONNECTION_REFUSED_UNACCEPTABLE_PROTOCOL_VERSION, mqttConnectVariableHeader.version()), false).then(mqttChannel.close()); } /*password check*/ @@ -147,11 +147,11 @@ public class ConnectProtocol implements Protocol { eventRegistry.registry(Event.CONNECT, mqttChannel, message, mqttReceiveContext); - return mqttChannel.write(MqttMessageBuilder.buildConnectAck(MqttConnectReturnCode.CONNECTION_ACCEPTED), false) + return mqttChannel.write(MqttMessageBuilder.buildConnectAck(MqttConnectReturnCode.CONNECTION_ACCEPTED, mqttConnectVariableHeader.version()), false) .then(Mono.fromRunnable(() -> sendOfflineMessage(mqttReceiveContext.getMessageRegistry(), mqttChannel))); } else { return mqttChannel.write( - MqttMessageBuilder.buildConnectAck(MqttConnectReturnCode.CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD), + MqttMessageBuilder.buildConnectAck(MqttConnectReturnCode.CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD, mqttConnectVariableHeader.version()), false).then(mqttChannel.close()); } } catch (Exception e) { diff --git a/smqtt-rule/smqtt-rule-engine/src/main/java/io/github/quickmsg/rule/node/TopicRuleNode.java b/smqtt-rule/smqtt-rule-engine/src/main/java/io/github/quickmsg/rule/node/TopicRuleNode.java index 2b28c1e7..f320d965 100644 --- a/smqtt-rule/smqtt-rule-engine/src/main/java/io/github/quickmsg/rule/node/TopicRuleNode.java +++ b/smqtt-rule/smqtt-rule-engine/src/main/java/io/github/quickmsg/rule/node/TopicRuleNode.java @@ -47,7 +47,7 @@ public class TopicRuleNode implements RuleNode { log.info("rule engine TopicRuleNode request {}", heapMqttMessage); ProtocolAdaptor protocolAdaptor = receiveContext.getProtocolAdaptor(); protocolAdaptor.chooseProtocol(MockMqttChannel.wrapClientIdentifier(heapMqttMessage.getClientIdentifier()), - new SmqttMessage<>(getMqttMessage(heapMqttMessage),heapMqttMessage.getTimestamp(),Boolean.TRUE), receiveContext); + new SmqttMessage<>(getMqttMessage(heapMqttMessage), heapMqttMessage.getTimestamp(), Boolean.TRUE), receiveContext); executeNext(contextView); } @@ -58,7 +58,8 @@ public class TopicRuleNode implements RuleNode { MqttQoS.valueOf(heapMqttMessage.getQos()), 0, this.topic, - PooledByteBufAllocator.DEFAULT.buffer().writeBytes(heapMqttMessage.getMessage())); + PooledByteBufAllocator.DEFAULT.buffer().writeBytes(heapMqttMessage.getMessage()), + heapMqttMessage.getProperties()); } -- Gitee From 2b7ff97b51f68a28f067e4aea6d3fa0512bb6a80 Mon Sep 17 00:00:00 2001 From: zhoujingwu Date: Mon, 24 Jan 2022 15:20:52 +0800 Subject: [PATCH 4/9] =?UTF-8?q?Adjust:1.=E4=B8=80=E5=AF=B9=E4=B8=80?= =?UTF-8?q?=E7=A6=BB=E7=BA=BF=E6=B6=88=E6=81=AF=E6=94=AF=E6=8C=81=EF=BC=9B?= =?UTF-8?q?=202.=E4=BF=9D=E7=95=99=E6=B6=88=E6=81=AF=E3=80=81=E7=A6=BB?= =?UTF-8?q?=E7=BA=BF=E6=B6=88=E6=81=AF=E7=9A=84userProperties=20Redis?= =?UTF-8?q?=E6=8C=81=E4=B9=85=E5=8C=96=E6=94=AF=E6=8C=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- smqtt-bootstrap/pom.xml | 15 +++++ .../springboot/SpringBootStarter.java | 22 +++++++ .../src/main/resources/application.yml | 61 +++++++++++++++++++ smqtt-bootstrap/src/main/resources/test.yaml | 3 - .../common/bootstrap/BootstrapKey.java | 3 + .../common/message/MessageRegistry.java | 14 +++++ .../common/message/RetainMessage.java | 22 +++++-- .../common/message/SessionMessage.java | 22 ++++++- smqtt-core/pom.xml | 5 ++ .../core/protocol/PublishProtocol.java | 6 ++ .../core/protocol/SubscribeProtocol.java | 26 +++++++- .../core/spi/DefaultMessageRegistry.java | 12 ++++ .../registry/DbMessageRegistry.java | 11 ++++ .../message/RetainMessageEntity.java | 2 + .../message/SessionMessageEntity.java | 2 + .../registry/RedisMessageRegistry.java | 54 +++++++++++++++- 16 files changed, 266 insertions(+), 14 deletions(-) create mode 100644 smqtt-bootstrap/src/main/java/io/github/quickmsg/springboot/SpringBootStarter.java create mode 100644 smqtt-bootstrap/src/main/resources/application.yml delete mode 100644 smqtt-bootstrap/src/main/resources/test.yaml diff --git a/smqtt-bootstrap/pom.xml b/smqtt-bootstrap/pom.xml index 1f67f6d1..dddb998c 100644 --- a/smqtt-bootstrap/pom.xml +++ b/smqtt-bootstrap/pom.xml @@ -57,6 +57,21 @@ io.github.quickmsg 1.1.2 + + org.springframework.boot + spring-boot + 2.2.1.RELEASE + + + org.springframework.boot + spring-boot-autoconfigure + 2.2.1.RELEASE + + + io.github.quickmsg + smqtt-spring-boot-starter + 1.1.2 + diff --git a/smqtt-bootstrap/src/main/java/io/github/quickmsg/springboot/SpringBootStarter.java b/smqtt-bootstrap/src/main/java/io/github/quickmsg/springboot/SpringBootStarter.java new file mode 100644 index 00000000..1a107e5f --- /dev/null +++ b/smqtt-bootstrap/src/main/java/io/github/quickmsg/springboot/SpringBootStarter.java @@ -0,0 +1,22 @@ +package io.github.quickmsg.springboot; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.boot.context.properties.EnableConfigurationProperties; +import org.springframework.context.annotation.ComponentScan; + +import io.github.quickmsg.starter.EnableMqttServer; + +/** + * @Author: Jingwu.Zhou + * @Date: 2022/1/24 + */ +@SpringBootApplication +@EnableConfigurationProperties +@ComponentScan(basePackages = {"io.github.quickmsg"}) +@EnableMqttServer +public class SpringBootStarter { + public static void main(String[] args) { + SpringApplication.run(SpringBootStarter.class, args); + } +} diff --git a/smqtt-bootstrap/src/main/resources/application.yml b/smqtt-bootstrap/src/main/resources/application.yml new file mode 100644 index 00000000..acc60e32 --- /dev/null +++ b/smqtt-bootstrap/src/main/resources/application.yml @@ -0,0 +1,61 @@ + smqtt: + logLevel: INFO # 系统日志 + tcp: # tcp配置 + port: 1883 # mqtt端口号 + username: APPID # mqtt连接默认用户名 生产环境建议spi去注入PasswordAuthentication接口 + password: 123456789 # mqtt连接默认密码 生产环境建议spi去注入PasswordAuthentication接口 + wiretap: true # 二进制日志 前提是 smqtt.logLevel = DEBUG + bossThreadSize: 4 # boss线程 + workThreadSize: 8 # work线程 + lowWaterMark: 4000000 # 不建议配置 默认 32768 + highWaterMark: 80000000 # 不建议配置 默认 65536 + businessThreadSize: 16 # 业务线程数 默认=cpu核心数*10 + businessQueueSize: 100000 #业务队列 默认=100000 + # globalReadWriteSize: 10000000,100000000 全局读写大小限制 + # channelReadWriteSize: 10000000,100000000 单个channel读写大小限制 + ssl: # ssl配置 + enable: false # 开关 + key: /user/server.key # 指定ssl文件 默认系统生成 + crt: /user/server.crt # 指定ssl文件 默认系统生成 + http: # http相关配置 端口固定60000 + enable: false # 开关 + accessLog: true # http访问日志 + ssl: # ssl配置 + enable: false + admin: # 后台管理配置 + enable: true # 开关 + username: smqtt # 访问用户名 + password: smqtt # 访问密码 + ws: # websocket配置 + enable: false # 开关 + port: 8999 # 端口 + path: /mqtt # ws 的访问path mqtt.js请设置此选项 + cluster: # 集群配置 + enable: false # 集群开关 + url: 127.0.0.1:7771,127.0.0.1:7772 # 启动节点 + port: 7771 # 端口 + node: node-1 # 集群节点名称 唯一 + external: + host: localhost # 用于映射容器ip 请不要随意设置,如果不需要请移除此选项 + port: 7777 # 用于映射容器端口 请不要随意设置,如果不需要请移除此选项 + redis: # redis 请参考 https://doc.smqtt.cc/%E5%85%B6%E4%BB%96/1.store.html 【如果没有引入相关依赖请移除此配置】 + mode: sentinel + database: 7 + password: xuanwu-T3st*17 + timeout: 3000 + poolMinIdle: 8 + poolConnTimeout: 3000 + poolSize: 10 + single: + address: 172.16.1.16:6392 + cluster: + scanInterval: 1000 + nodes: 172.16.1.16:6390,172.16.1.16:6391,172.16.1.16:6392 + readMode: SLAVE + retryAttempts: 3 + slaveConnectionPoolSize: 64 + masterConnectionPoolSize: 64 + retryInterval: 1500 + sentinel: + master: mq_master + nodes: 172.16.1.16:26371,172.16.1.16:26372,172.16.1.16:26373 \ No newline at end of file diff --git a/smqtt-bootstrap/src/main/resources/test.yaml b/smqtt-bootstrap/src/main/resources/test.yaml deleted file mode 100644 index e4203080..00000000 --- a/smqtt-bootstrap/src/main/resources/test.yaml +++ /dev/null @@ -1,3 +0,0 @@ -smqtt: - tcp: - port: 7000 \ No newline at end of file diff --git a/smqtt-common/src/main/java/io/github/quickmsg/common/bootstrap/BootstrapKey.java b/smqtt-common/src/main/java/io/github/quickmsg/common/bootstrap/BootstrapKey.java index ad5a8904..bd7da1b0 100644 --- a/smqtt-common/src/main/java/io/github/quickmsg/common/bootstrap/BootstrapKey.java +++ b/smqtt-common/src/main/java/io/github/quickmsg/common/bootstrap/BootstrapKey.java @@ -14,6 +14,9 @@ public class BootstrapKey { /*redis前缀key*/ public static final String REDIS_SESSION_MESSAGE_PREFIX_KEY = "smqtt:session:message:"; + /*redis前缀key 离线消息*/ + public static final String REDIS_OFFLINE_MESSAGE_PREFIX_KEY = "smqtt:offline:message:"; + /*模式*/ public static final String REDIS_MODE = "redis.mode"; diff --git a/smqtt-common/src/main/java/io/github/quickmsg/common/message/MessageRegistry.java b/smqtt-common/src/main/java/io/github/quickmsg/common/message/MessageRegistry.java index c21887db..9bc4f8ac 100644 --- a/smqtt-common/src/main/java/io/github/quickmsg/common/message/MessageRegistry.java +++ b/smqtt-common/src/main/java/io/github/quickmsg/common/message/MessageRegistry.java @@ -47,4 +47,18 @@ public interface MessageRegistry extends StartUp { List getRetainMessage(String topic); + /** + * 持久化离线消息 + * + * @param message + */ + void saveOfflineMessage(RetainMessage message); + + /** + * 获取离线消息 + * + * @param topic + * @return + */ + List getOfflineMessage(String topic); } diff --git a/smqtt-common/src/main/java/io/github/quickmsg/common/message/RetainMessage.java b/smqtt-common/src/main/java/io/github/quickmsg/common/message/RetainMessage.java index a5bbfb3c..8f64c4cd 100644 --- a/smqtt-common/src/main/java/io/github/quickmsg/common/message/RetainMessage.java +++ b/smqtt-common/src/main/java/io/github/quickmsg/common/message/RetainMessage.java @@ -1,8 +1,10 @@ package io.github.quickmsg.common.message; -import java.util.Map; +import java.util.HashMap; +import java.util.Optional; import io.github.quickmsg.common.channel.MqttChannel; +import io.github.quickmsg.common.utils.JacksonUtil; import io.github.quickmsg.common.utils.MessageUtils; import io.netty.buffer.PooledByteBufAllocator; import io.netty.handler.codec.mqtt.MqttProperties; @@ -25,7 +27,7 @@ public class RetainMessage { private byte[] body; - private MqttProperties properties; + private String userProperties; public static RetainMessage of(MqttPublishMessage mqttPublishMessage) { MqttPublishVariableHeader publishVariableHeader = mqttPublishMessage.variableHeader(); @@ -33,7 +35,19 @@ public class RetainMessage { .topic(publishVariableHeader.topicName()) .qos(mqttPublishMessage.fixedHeader().qosLevel().value()) .body(MessageUtils.copyByteBuf(mqttPublishMessage.payload())) - .properties(publishVariableHeader.properties()) + .userProperties(JacksonUtil + .map2Json(Optional + .ofNullable(publishVariableHeader + .properties() + .getProperties(MqttProperties.MqttPropertyType.USER_PROPERTY.value())) + .map(list -> { + HashMap propertiesMap = new HashMap<>(); + list.forEach(property -> { + MqttProperties.StringPair pair = (MqttProperties.StringPair) property.value(); + propertiesMap.put(pair.key, pair.value); + }); + return propertiesMap; + }).get())) .build(); } @@ -44,7 +58,7 @@ public class RetainMessage { qos > 0 ? mqttChannel.generateMessageId() : 0, topic, PooledByteBufAllocator.DEFAULT.directBuffer().writeBytes(body), - properties); + JacksonUtil.json2Map(userProperties, String.class, String.class)); } } diff --git a/smqtt-common/src/main/java/io/github/quickmsg/common/message/SessionMessage.java b/smqtt-common/src/main/java/io/github/quickmsg/common/message/SessionMessage.java index 7323d504..2337caeb 100644 --- a/smqtt-common/src/main/java/io/github/quickmsg/common/message/SessionMessage.java +++ b/smqtt-common/src/main/java/io/github/quickmsg/common/message/SessionMessage.java @@ -1,6 +1,10 @@ package io.github.quickmsg.common.message; +import java.util.HashMap; +import java.util.Optional; + import io.github.quickmsg.common.channel.MqttChannel; +import io.github.quickmsg.common.utils.JacksonUtil; import io.github.quickmsg.common.utils.MessageUtils; import io.netty.buffer.PooledByteBufAllocator; import io.netty.handler.codec.mqtt.MqttProperties; @@ -28,7 +32,7 @@ public class SessionMessage { private boolean retain; - private MqttProperties properties; + private String userProperties; public static SessionMessage of(String clientIdentifier, MqttPublishMessage mqttPublishMessage) { MqttPublishVariableHeader publishVariableHeader = mqttPublishMessage.variableHeader(); @@ -38,7 +42,19 @@ public class SessionMessage { .qos(mqttPublishMessage.fixedHeader().qosLevel().value()) .retain(mqttPublishMessage.fixedHeader().isRetain()) .body(MessageUtils.copyByteBuf(mqttPublishMessage.payload())) - .properties(publishVariableHeader.properties()) + .userProperties(JacksonUtil + .map2Json(Optional + .ofNullable(publishVariableHeader + .properties() + .getProperties(MqttProperties.MqttPropertyType.USER_PROPERTY.value())) + .map(list -> { + HashMap propertiesMap = new HashMap<>(); + list.forEach(property -> { + MqttProperties.StringPair pair = (MqttProperties.StringPair) property.value(); + propertiesMap.put(pair.key, pair.value); + }); + return propertiesMap; + }).get())) .build(); } @@ -49,7 +65,7 @@ public class SessionMessage { qos > 0 ? mqttChannel.generateMessageId() : 0, topic, PooledByteBufAllocator.DEFAULT.directBuffer().writeBytes(body), - properties); + JacksonUtil.json2Map(userProperties, String.class, String.class)); } } diff --git a/smqtt-core/pom.xml b/smqtt-core/pom.xml index dbb26bfb..e4e557ca 100644 --- a/smqtt-core/pom.xml +++ b/smqtt-core/pom.xml @@ -31,6 +31,11 @@ smqtt-metric-prometheus 1.1.2 + + io.github.quickmsg + smqtt-persistent-redis + 1.1.2 + \ No newline at end of file diff --git a/smqtt-core/src/main/java/io/github/quickmsg/core/protocol/PublishProtocol.java b/smqtt-core/src/main/java/io/github/quickmsg/core/protocol/PublishProtocol.java index 1a405e07..864b8bb6 100644 --- a/smqtt-core/src/main/java/io/github/quickmsg/core/protocol/PublishProtocol.java +++ b/smqtt-core/src/main/java/io/github/quickmsg/core/protocol/PublishProtocol.java @@ -30,6 +30,8 @@ public class PublishProtocol implements Protocol { private static List MESSAGE_TYPE_LIST = new ArrayList<>(); + private static String ONE2ONE_TOPIC_PREFIX = "ipush/mt/msg/"; + static { MESSAGE_TYPE_LIST.add(MqttMessageType.PUBLISH); } @@ -52,6 +54,10 @@ public class PublishProtocol implements Protocol { MessageRegistry messageRegistry = receiveContext.getMessageRegistry(); Set mqttChannels = topicRegistry.getSubscribesByTopic(variableHeader.topicName(), message.fixedHeader().qosLevel()); + // 定制化处理,一对一发送的离线消息进行持久化,保证送达 + if (mqttChannels.isEmpty() && variableHeader.topicName().startsWith(ONE2ONE_TOPIC_PREFIX)) { + messageRegistry.saveOfflineMessage(RetainMessage.of(message)); + } // http mock if (mqttChannel.getIsMock()) { return send(mqttChannels, message, messageRegistry, filterRetainMessage(message, messageRegistry)); diff --git a/smqtt-core/src/main/java/io/github/quickmsg/core/protocol/SubscribeProtocol.java b/smqtt-core/src/main/java/io/github/quickmsg/core/protocol/SubscribeProtocol.java index 6a1c2f4d..4cb98a0b 100644 --- a/smqtt-core/src/main/java/io/github/quickmsg/core/protocol/SubscribeProtocol.java +++ b/smqtt-core/src/main/java/io/github/quickmsg/core/protocol/SubscribeProtocol.java @@ -11,13 +11,13 @@ import io.github.quickmsg.common.protocol.Protocol; import io.github.quickmsg.common.topic.SubscribeTopic; import io.github.quickmsg.common.topic.TopicRegistry; import io.netty.handler.codec.mqtt.MqttMessageType; -import io.netty.handler.codec.mqtt.MqttSubAckMessage; import io.netty.handler.codec.mqtt.MqttSubscribeMessage; import reactor.core.publisher.Mono; import reactor.util.context.ContextView; import java.util.ArrayList; import java.util.List; +import java.util.Optional; import java.util.Set; import java.util.stream.Collectors; @@ -40,7 +40,10 @@ public class SubscribeProtocol implements Protocol { Set mqttTopicSubscriptions = message.payload().topicSubscriptions() .stream() - .peek(mqttTopicSubscription -> this.loadRetainMessage(messageRegistry, mqttChannel, mqttTopicSubscription.topicName())) + .peek(mqttTopicSubscription -> { + this.loadRetainMessage(messageRegistry, mqttChannel, mqttTopicSubscription.topicName()); + this.loadOfflineMessage(messageRegistry, mqttChannel, mqttTopicSubscription.topicName()); + }) .map(mqttTopicSubscription -> new SubscribeTopic(mqttTopicSubscription.topicName(), mqttTopicSubscription.qualityOfService(), mqttChannel)) .collect(Collectors.toSet()); @@ -57,6 +60,12 @@ public class SubscribeProtocol implements Protocol { .collect(Collectors.toList())), false)); } + /** + * 获取保留消息 + * @param messageRegistry + * @param mqttChannel + * @param topicName + */ private void loadRetainMessage(MessageRegistry messageRegistry, MqttChannel mqttChannel, String topicName) { messageRegistry.getRetainMessage(topicName) .forEach(retainMessage -> @@ -64,6 +73,19 @@ public class SubscribeProtocol implements Protocol { .subscribe()); } + /** + * 获取离线消息 + * @param messageRegistry + * @param mqttChannel + * @param topicName + */ + private void loadOfflineMessage(MessageRegistry messageRegistry, MqttChannel mqttChannel, String topicName) { + Optional.ofNullable(messageRegistry.getOfflineMessage(topicName)).ifPresent(msgList -> + msgList.forEach(retainMessage -> + mqttChannel.write(retainMessage.toPublishMessage(mqttChannel), retainMessage.getQos() > 0) + .subscribe())); + } + @Override public List getMqttMessageTypes() { return MESSAGE_TYPE_LIST; diff --git a/smqtt-core/src/main/java/io/github/quickmsg/core/spi/DefaultMessageRegistry.java b/smqtt-core/src/main/java/io/github/quickmsg/core/spi/DefaultMessageRegistry.java index 256d8170..b3f2919b 100644 --- a/smqtt-core/src/main/java/io/github/quickmsg/core/spi/DefaultMessageRegistry.java +++ b/smqtt-core/src/main/java/io/github/quickmsg/core/spi/DefaultMessageRegistry.java @@ -5,6 +5,7 @@ import io.github.quickmsg.common.message.RetainMessage; import io.github.quickmsg.common.message.SessionMessage; import io.github.quickmsg.common.utils.TopicRegexUtils; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -47,4 +48,15 @@ public class DefaultMessageRegistry implements MessageRegistry { .collect(Collectors.toList()); } + @Override + public List getOfflineMessage(String topic) { + // TODO impl + return Collections.EMPTY_LIST; + } + + @Override + public void saveOfflineMessage(RetainMessage offlineMessage) { + // TODO impl + } + } diff --git a/smqtt-persistent/smqtt-persistent-db/src/main/java/io/github/quickmsg/persistent/registry/DbMessageRegistry.java b/smqtt-persistent/smqtt-persistent-db/src/main/java/io/github/quickmsg/persistent/registry/DbMessageRegistry.java index 92992c29..bde25550 100644 --- a/smqtt-persistent/smqtt-persistent-db/src/main/java/io/github/quickmsg/persistent/registry/DbMessageRegistry.java +++ b/smqtt-persistent/smqtt-persistent-db/src/main/java/io/github/quickmsg/persistent/registry/DbMessageRegistry.java @@ -198,4 +198,15 @@ public class DbMessageRegistry implements MessageRegistry { return StringUtils.isBlank(body) ? null : body.getBytes(CharsetUtil.UTF_8); } + @Override + public List getOfflineMessage(String topic) { + // TODO impl + return Collections.EMPTY_LIST; + } + + @Override + public void saveOfflineMessage(RetainMessage offlineMessage) { + // TODO impl + } + } diff --git a/smqtt-persistent/smqtt-persistent-redis/src/main/java/io/github/quickmsg/persistent/message/RetainMessageEntity.java b/smqtt-persistent/smqtt-persistent-redis/src/main/java/io/github/quickmsg/persistent/message/RetainMessageEntity.java index f30dc020..086c9870 100644 --- a/smqtt-persistent/smqtt-persistent-redis/src/main/java/io/github/quickmsg/persistent/message/RetainMessageEntity.java +++ b/smqtt-persistent/smqtt-persistent-redis/src/main/java/io/github/quickmsg/persistent/message/RetainMessageEntity.java @@ -24,6 +24,8 @@ public class RetainMessageEntity implements Serializable { private byte[] body; + private String userProperties; + @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss") private Date createTime; diff --git a/smqtt-persistent/smqtt-persistent-redis/src/main/java/io/github/quickmsg/persistent/message/SessionMessageEntity.java b/smqtt-persistent/smqtt-persistent-redis/src/main/java/io/github/quickmsg/persistent/message/SessionMessageEntity.java index 2966a125..418d356b 100644 --- a/smqtt-persistent/smqtt-persistent-redis/src/main/java/io/github/quickmsg/persistent/message/SessionMessageEntity.java +++ b/smqtt-persistent/smqtt-persistent-redis/src/main/java/io/github/quickmsg/persistent/message/SessionMessageEntity.java @@ -25,6 +25,8 @@ public class SessionMessageEntity implements Serializable { private Boolean retain; + private String userProperties; + private byte[] body; @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss") private Date createTime; diff --git a/smqtt-persistent/smqtt-persistent-redis/src/main/java/io/github/quickmsg/persistent/registry/RedisMessageRegistry.java b/smqtt-persistent/smqtt-persistent-redis/src/main/java/io/github/quickmsg/persistent/registry/RedisMessageRegistry.java index a24f5a97..a31e2df9 100644 --- a/smqtt-persistent/smqtt-persistent-redis/src/main/java/io/github/quickmsg/persistent/registry/RedisMessageRegistry.java +++ b/smqtt-persistent/smqtt-persistent-redis/src/main/java/io/github/quickmsg/persistent/registry/RedisMessageRegistry.java @@ -2,7 +2,6 @@ package io.github.quickmsg.persistent.registry; import io.github.quickmsg.common.bootstrap.BootstrapKey; import io.github.quickmsg.common.config.BootstrapConfig; -import io.github.quickmsg.common.environment.EnvContext; import io.github.quickmsg.common.message.MessageRegistry; import io.github.quickmsg.common.message.RetainMessage; import io.github.quickmsg.common.message.SessionMessage; @@ -15,10 +14,11 @@ import lombok.extern.slf4j.Slf4j; import org.redisson.api.RBucket; import org.redisson.api.RKeys; import org.redisson.api.RList; +import org.redisson.api.RSetCache; import org.redisson.api.RedissonClient; -import java.sql.Connection; import java.util.*; +import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import java.util.stream.StreamSupport; @@ -59,6 +59,7 @@ public class RedisMessageRegistry implements MessageRegistry { .qos(record.getQos()) .retain(record.getRetain()) .body(record.getBody()) + .userProperties(record.getUserProperties()) .build() ).collect(Collectors.toList()); @@ -79,6 +80,7 @@ public class RedisMessageRegistry implements MessageRegistry { int qos = sessionMessage.getQos(); boolean retain = sessionMessage.isRetain(); byte[] body = sessionMessage.getBody(); + String userProperty = sessionMessage.getUserProperties(); try { SessionMessageEntity sessionMessageEntity = SessionMessageEntity.builder() @@ -87,6 +89,7 @@ public class RedisMessageRegistry implements MessageRegistry { .qos(qos) .body(body) .retain(retain) + .userProperties(userProperty) .createTime(new Date()).build(); RList list = redissonClient.getList(BootstrapKey.Redis.REDIS_SESSION_MESSAGE_PREFIX_KEY + clientIdentifier); @@ -110,6 +113,7 @@ public class RedisMessageRegistry implements MessageRegistry { .topic(topic) .qos(qos) .body(retainMessage.getBody()) + .userProperties(retainMessage.getUserProperties()) .createTime(date) .updateTime(date).build(); @@ -137,6 +141,7 @@ public class RedisMessageRegistry implements MessageRegistry { .topic(item.getTopic()) .qos(item.getQos()) .body(item.getBody()) + .userProperties(item.getUserProperties()) .build() ).orElse(null); }) @@ -148,4 +153,49 @@ public class RedisMessageRegistry implements MessageRegistry { return Collections.emptyList(); } + @Override + public List getOfflineMessage(String topic) { + // 一对一发送,只需要精确匹配 + try { + RSetCache set = redissonClient.getSetCache(BootstrapKey.Redis.REDIS_OFFLINE_MESSAGE_PREFIX_KEY + topic); + List resList = set.stream().map(record -> RetainMessage.builder() + .topic(record.getTopic()) + .qos(record.getQos()) + .body(record.getBody()) + .userProperties(record.getUserProperties()) + .build() + ).collect(Collectors.toList()); + + if (set.size() > 0) { + redissonClient.getBucket(BootstrapKey.Redis.REDIS_OFFLINE_MESSAGE_PREFIX_KEY + topic).delete(); + } + return resList; + } catch (Exception e) { + log.error("getOfflineMessage error clientIdentifier:{}", topic, e); + return Collections.emptyList(); + } + } + + @Override + public void saveOfflineMessage(RetainMessage offlineMessage) { + String topic = offlineMessage.getTopic(); + int qos = offlineMessage.getQos(); + byte[] body = offlineMessage.getBody(); + String userProperties = offlineMessage.getUserProperties(); + try { + RetainMessageEntity offlineMessageEntity = RetainMessageEntity.builder() + .topic(topic) + .qos(qos) + .body(body) + .userProperties(userProperties) + .createTime(new Date()) + .build(); + + RSetCache set = redissonClient.getSetCache(BootstrapKey.Redis.REDIS_OFFLINE_MESSAGE_PREFIX_KEY + topic); + set.add(offlineMessageEntity, 1, TimeUnit.DAYS); + } catch (Exception e) { + log.error("saveOfflineMessage error message: {}", topic, e); + } + } + } -- Gitee From cadec951ee3d44acb041920a5cbbdfcf4d0f40c2 Mon Sep 17 00:00:00 2001 From: liujiawen Date: Mon, 24 Jan 2022 16:14:13 +0800 Subject: [PATCH 5/9] remove unused log --- .../main/java/io/github/quickmsg/common/utils/MessageUtils.java | 1 - 1 file changed, 1 deletion(-) diff --git a/smqtt-common/src/main/java/io/github/quickmsg/common/utils/MessageUtils.java b/smqtt-common/src/main/java/io/github/quickmsg/common/utils/MessageUtils.java index f25c2789..1b949a16 100644 --- a/smqtt-common/src/main/java/io/github/quickmsg/common/utils/MessageUtils.java +++ b/smqtt-common/src/main/java/io/github/quickmsg/common/utils/MessageUtils.java @@ -68,7 +68,6 @@ public class MessageUtils { MqttFixedHeader mqttFixedHeader = message.fixedHeader(); MqttFixedHeader newFixedHeader = new MqttFixedHeader(mqttFixedHeader.messageType(), false, mqttQoS, false, mqttFixedHeader.remainingLength()); // mqtt 5 support properties - System.out.println( "----" + mqttPublishVariableHeader.properties().getProperties(MqttProperties.MqttPropertyType.USER_PROPERTY.value()) ); MqttPublishVariableHeader newHeader = new MqttPublishVariableHeader(mqttPublishVariableHeader.topicName(), messageId, mqttPublishVariableHeader.properties()); return new MqttPublishMessage(newFixedHeader, newHeader, message.payload().copy()); -- Gitee From ec84f682ea3dbb72ffe35b294ebf53c0f8f3974f Mon Sep 17 00:00:00 2001 From: liujiawen Date: Tue, 25 Jan 2022 09:57:46 +0800 Subject: [PATCH 6/9] don't support shared subscription --- smqtt-bootstrap/src/main/resources/application.yml | 8 ++++---- .../quickmsg/common/message/MqttMessageBuilder.java | 9 +++++++-- 2 files changed, 11 insertions(+), 6 deletions(-) diff --git a/smqtt-bootstrap/src/main/resources/application.yml b/smqtt-bootstrap/src/main/resources/application.yml index acc60e32..f9dd36e3 100644 --- a/smqtt-bootstrap/src/main/resources/application.yml +++ b/smqtt-bootstrap/src/main/resources/application.yml @@ -2,8 +2,8 @@ logLevel: INFO # 系统日志 tcp: # tcp配置 port: 1883 # mqtt端口号 - username: APPID # mqtt连接默认用户名 生产环境建议spi去注入PasswordAuthentication接口 - password: 123456789 # mqtt连接默认密码 生产环境建议spi去注入PasswordAuthentication接口 + username: smqtt # mqtt连接默认用户名 生产环境建议spi去注入PasswordAuthentication接口 + password: smqtt # mqtt连接默认密码 生产环境建议spi去注入PasswordAuthentication接口 wiretap: true # 二进制日志 前提是 smqtt.logLevel = DEBUG bossThreadSize: 4 # boss线程 workThreadSize: 8 # work线程 @@ -18,8 +18,8 @@ key: /user/server.key # 指定ssl文件 默认系统生成 crt: /user/server.crt # 指定ssl文件 默认系统生成 http: # http相关配置 端口固定60000 - enable: false # 开关 - accessLog: true # http访问日志 + enable: true # 开关 + accessLog: false # http访问日志 ssl: # ssl配置 enable: false admin: # 后台管理配置 diff --git a/smqtt-common/src/main/java/io/github/quickmsg/common/message/MqttMessageBuilder.java b/smqtt-common/src/main/java/io/github/quickmsg/common/message/MqttMessageBuilder.java index 6d807b43..5346a176 100644 --- a/smqtt-common/src/main/java/io/github/quickmsg/common/message/MqttMessageBuilder.java +++ b/smqtt-common/src/main/java/io/github/quickmsg/common/message/MqttMessageBuilder.java @@ -103,7 +103,12 @@ public class MqttMessageBuilder { } public static MqttConnAckMessage buildConnectAck(MqttConnectReturnCode connectReturnCode, int version) { - if (MqttVersion.MQTT_5.protocolLevel() != (byte) version) { + MqttProperties properties = MqttProperties.NO_PROPERTIES; + if (MqttVersion.MQTT_5.protocolLevel() == (byte) version) { + properties = new MqttProperties(); + properties.add(new MqttProperties.IntegerProperty(MqttProperties.MqttPropertyType.RETAIN_AVAILABLE.value(), 1)); + // don't support shared subscription + properties.add(new MqttProperties.IntegerProperty(MqttProperties.MqttPropertyType.SHARED_SUBSCRIPTION_AVAILABLE.value(), 0)); switch (connectReturnCode) { case CONNECTION_REFUSED_IDENTIFIER_REJECTED: connectReturnCode = CONNECTION_REFUSED_CLIENT_IDENTIFIER_NOT_VALID; @@ -123,7 +128,7 @@ public class MqttMessageBuilder { } } - MqttConnAckVariableHeader mqttConnAckVariableHeader = new MqttConnAckVariableHeader(connectReturnCode, false); + MqttConnAckVariableHeader mqttConnAckVariableHeader = new MqttConnAckVariableHeader(connectReturnCode, false, properties); MqttFixedHeader mqttFixedHeader = new MqttFixedHeader( MqttMessageType.CONNACK, false, MqttQoS.AT_MOST_ONCE, false, 0X02); return new MqttConnAckMessage(mqttFixedHeader, mqttConnAckVariableHeader); -- Gitee From c533629a7d214e39876547eaa14ba17d1bb4e973 Mon Sep 17 00:00:00 2001 From: zhoujingwu Date: Tue, 25 Jan 2022 15:42:54 +0800 Subject: [PATCH 7/9] =?UTF-8?q?Adjust:=201.=E6=96=B0=E5=BB=BA=E7=A6=BB?= =?UTF-8?q?=E7=BA=BF=E6=B6=88=E6=81=AF=E5=AE=9E=E4=BD=93=EF=BC=8C=E4=B8=8D?= =?UTF-8?q?=E5=86=8D=E5=92=8C=E4=BF=9D=E7=95=99=E6=B6=88=E6=81=AF=E5=85=B1?= =?UTF-8?q?=E7=94=A8=202.=E4=BF=9D=E7=95=99=E6=B6=88=E6=81=AF=E3=80=81?= =?UTF-8?q?=E7=A6=BB=E7=BA=BF=E6=B6=88=E6=81=AF=E7=9A=84userProperties=20d?= =?UTF-8?q?b=E6=8C=81=E4=B9=85=E5=8C=96=E6=94=AF=E6=8C=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../io/github/quickmsg/AbstractStarter.java | 1 + .../springboot/SpringBootStarter.java | 2 - .../src/main/resources/application.yml | 136 ++++++----- .../common/message/MessageRegistry.java | 4 +- .../common/message/OfflineMessage.java | 62 +++++ .../common/message/RetainMessage.java | 24 +- .../common/message/SessionMessage.java | 24 +- smqtt-core/pom.xml | 5 + .../core/protocol/PublishProtocol.java | 2 +- .../core/protocol/SubscribeProtocol.java | 4 +- .../core/spi/DefaultMessageRegistry.java | 13 +- .../registry/DbMessageRegistry.java | 73 +++++- .../quickmsg/persistent/tables/Indexes.java | 3 + .../quickmsg/persistent/tables/Smqtt.java | 13 +- .../quickmsg/persistent/tables/Tables.java | 4 + .../tables/tables/SmqttOffline.java | 123 ++++++++++ .../persistent/tables/tables/SmqttRetain.java | 8 +- .../tables/tables/SmqttSession.java | 8 +- .../tables/records/SmqttOfflineRecord.java | 214 ++++++++++++++++++ .../tables/records/SmqttRetainRecord.java | 50 +++- .../tables/records/SmqttSessionRecord.java | 50 +++- .../src/main/resources/liquibase/smqtt_db.xml | 17 ++ .../message/OfflineMessageEntity.java | 30 +++ .../registry/RedisMessageRegistry.java | 16 +- 24 files changed, 742 insertions(+), 144 deletions(-) create mode 100644 smqtt-common/src/main/java/io/github/quickmsg/common/message/OfflineMessage.java create mode 100644 smqtt-persistent/smqtt-persistent-db/src/main/java/io/github/quickmsg/persistent/tables/tables/SmqttOffline.java create mode 100644 smqtt-persistent/smqtt-persistent-db/src/main/java/io/github/quickmsg/persistent/tables/tables/records/SmqttOfflineRecord.java create mode 100644 smqtt-persistent/smqtt-persistent-redis/src/main/java/io/github/quickmsg/persistent/message/OfflineMessageEntity.java diff --git a/smqtt-bootstrap/src/main/java/io/github/quickmsg/AbstractStarter.java b/smqtt-bootstrap/src/main/java/io/github/quickmsg/AbstractStarter.java index a41fc41d..b41176df 100644 --- a/smqtt-bootstrap/src/main/java/io/github/quickmsg/AbstractStarter.java +++ b/smqtt-bootstrap/src/main/java/io/github/quickmsg/AbstractStarter.java @@ -23,6 +23,7 @@ public abstract class AbstractStarter { public static void start(String path) { + path = "D:\\xuanwu\\IDEA\\mqtt-cluster\\smqtt-bootstrap\\src\\main\\resources\\application.yml"; BootstrapConfig config = null; if (path != null) { if (path.endsWith(FileExtension.PROPERTIES_SYMBOL)) { diff --git a/smqtt-bootstrap/src/main/java/io/github/quickmsg/springboot/SpringBootStarter.java b/smqtt-bootstrap/src/main/java/io/github/quickmsg/springboot/SpringBootStarter.java index 1a107e5f..4bb5e3e1 100644 --- a/smqtt-bootstrap/src/main/java/io/github/quickmsg/springboot/SpringBootStarter.java +++ b/smqtt-bootstrap/src/main/java/io/github/quickmsg/springboot/SpringBootStarter.java @@ -2,7 +2,6 @@ package io.github.quickmsg.springboot; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; -import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.annotation.ComponentScan; import io.github.quickmsg.starter.EnableMqttServer; @@ -12,7 +11,6 @@ import io.github.quickmsg.starter.EnableMqttServer; * @Date: 2022/1/24 */ @SpringBootApplication -@EnableConfigurationProperties @ComponentScan(basePackages = {"io.github.quickmsg"}) @EnableMqttServer public class SpringBootStarter { diff --git a/smqtt-bootstrap/src/main/resources/application.yml b/smqtt-bootstrap/src/main/resources/application.yml index acc60e32..4e3dad5f 100644 --- a/smqtt-bootstrap/src/main/resources/application.yml +++ b/smqtt-bootstrap/src/main/resources/application.yml @@ -1,61 +1,77 @@ smqtt: - logLevel: INFO # 系统日志 - tcp: # tcp配置 - port: 1883 # mqtt端口号 - username: APPID # mqtt连接默认用户名 生产环境建议spi去注入PasswordAuthentication接口 - password: 123456789 # mqtt连接默认密码 生产环境建议spi去注入PasswordAuthentication接口 - wiretap: true # 二进制日志 前提是 smqtt.logLevel = DEBUG - bossThreadSize: 4 # boss线程 - workThreadSize: 8 # work线程 - lowWaterMark: 4000000 # 不建议配置 默认 32768 - highWaterMark: 80000000 # 不建议配置 默认 65536 - businessThreadSize: 16 # 业务线程数 默认=cpu核心数*10 - businessQueueSize: 100000 #业务队列 默认=100000 - # globalReadWriteSize: 10000000,100000000 全局读写大小限制 - # channelReadWriteSize: 10000000,100000000 单个channel读写大小限制 - ssl: # ssl配置 - enable: false # 开关 - key: /user/server.key # 指定ssl文件 默认系统生成 - crt: /user/server.crt # 指定ssl文件 默认系统生成 - http: # http相关配置 端口固定60000 - enable: false # 开关 - accessLog: true # http访问日志 - ssl: # ssl配置 - enable: false - admin: # 后台管理配置 - enable: true # 开关 - username: smqtt # 访问用户名 - password: smqtt # 访问密码 - ws: # websocket配置 - enable: false # 开关 - port: 8999 # 端口 - path: /mqtt # ws 的访问path mqtt.js请设置此选项 - cluster: # 集群配置 - enable: false # 集群开关 - url: 127.0.0.1:7771,127.0.0.1:7772 # 启动节点 - port: 7771 # 端口 - node: node-1 # 集群节点名称 唯一 - external: - host: localhost # 用于映射容器ip 请不要随意设置,如果不需要请移除此选项 - port: 7777 # 用于映射容器端口 请不要随意设置,如果不需要请移除此选项 - redis: # redis 请参考 https://doc.smqtt.cc/%E5%85%B6%E4%BB%96/1.store.html 【如果没有引入相关依赖请移除此配置】 - mode: sentinel - database: 7 - password: xuanwu-T3st*17 - timeout: 3000 - poolMinIdle: 8 - poolConnTimeout: 3000 - poolSize: 10 - single: - address: 172.16.1.16:6392 - cluster: - scanInterval: 1000 - nodes: 172.16.1.16:6390,172.16.1.16:6391,172.16.1.16:6392 - readMode: SLAVE - retryAttempts: 3 - slaveConnectionPoolSize: 64 - masterConnectionPoolSize: 64 - retryInterval: 1500 - sentinel: - master: mq_master - nodes: 172.16.1.16:26371,172.16.1.16:26372,172.16.1.16:26373 \ No newline at end of file + logLevel: INFO # 系统日志 + tcp: # tcp配置 + port: 1883 # mqtt端口号 + username: smqtt # mqtt连接默认用户名 生产环境建议spi去注入PasswordAuthentication接口 + password: smqtt # mqtt连接默认密码 生产环境建议spi去注入PasswordAuthentication接口 + wiretap: true # 二进制日志 前提是 smqtt.logLevel = DEBUG + bossThreadSize: 4 # boss线程 + workThreadSize: 8 # work线程 + lowWaterMark: 4000000 # 不建议配置 默认 32768 + highWaterMark: 80000000 # 不建议配置 默认 65536 + businessThreadSize: 16 # 业务线程数 默认=cpu核心数*10 + businessQueueSize: 100000 #业务队列 默认=100000 + # globalReadWriteSize: 10000000,100000000 全局读写大小限制 + # channelReadWriteSize: 10000000,100000000 单个channel读写大小限制 + ssl: # ssl配置 + enable: false # 开关 + key: /user/server.key # 指定ssl文件 默认系统生成 + crt: /user/server.crt # 指定ssl文件 默认系统生成 + http: # http相关配置 端口固定60000 + enable: true # 开关 + accessLog: false # http访问日志 + ssl: # ssl配置 + enable: false + admin: # 后台管理配置 + enable: true # 开关 + username: smqtt # 访问用户名 + password: smqtt # 访问密码 + ws: # websocket配置 + enable: false # 开关 + port: 8999 # 端口 + path: /mqtt # ws 的访问path mqtt.js请设置此选项 + cluster: # 集群配置 + enable: false # 集群开关 + url: 127.0.0.1:7771,127.0.0.1:7772 # 启动节点 + port: 7771 # 端口 + node: node-1 # 集群节点名称 唯一 + external: + host: localhost # 用于映射容器ip 请不要随意设置,如果不需要请移除此选项 + port: 7777 # 用于映射容器端口 请不要随意设置,如果不需要请移除此选项 + meter: + meterType: PROMETHEUS # INFLUXDB , PROMETHEUS + db: # 参数值配置参考https://github.com/brettwooldridge/HikariCP/wiki/MySQL-Configuration + jdbcUrl: jdbc:mysql://172.16.1.77:3306/smqtt + username: root + password: jx@xw!@#$~|{} + dataSourceCachePrepStmts: false + dataSourcePrepStmtCacheSize: 250 + dataSourcePrepStmtCacheSqlLimit: 2048 + dataSourceUseServerPrepStmts: true + dataSourceUseLocalSessionState: true + dataSourceRewriteBatchedStatements: true + dataSourceCacheResultSetMetadata: true + dataSourceCacheServerConfiguration: true + dataSourceElideSetAutoCommits: true + dataSourceMaintainTimeStats: false + redis: # redis 请参考 https://doc.smqtt.cc/%E5%85%B6%E4%BB%96/1.store.html 【如果没有引入相关依赖请移除此配置】 + mode: sentinel + database: 7 + password: xuanwu-T3st*17 + timeout: 3000 + poolMinIdle: 8 + poolConnTimeout: 3000 + poolSize: 10 + single: + address: 172.16.1.16:6392 + cluster: + scanInterval: 1000 + nodes: 172.16.1.16:6390,172.16.1.16:6391,172.16.1.16:6392 + readMode: SLAVE + retryAttempts: 3 + slaveConnectionPoolSize: 64 + masterConnectionPoolSize: 64 + retryInterval: 1500 + sentinel: + master: mq_master + nodes: 172.16.1.16:26371,172.16.1.16:26372,172.16.1.16:26373 \ No newline at end of file diff --git a/smqtt-common/src/main/java/io/github/quickmsg/common/message/MessageRegistry.java b/smqtt-common/src/main/java/io/github/quickmsg/common/message/MessageRegistry.java index 9bc4f8ac..3a5d4a6f 100644 --- a/smqtt-common/src/main/java/io/github/quickmsg/common/message/MessageRegistry.java +++ b/smqtt-common/src/main/java/io/github/quickmsg/common/message/MessageRegistry.java @@ -52,7 +52,7 @@ public interface MessageRegistry extends StartUp { * * @param message */ - void saveOfflineMessage(RetainMessage message); + void saveOfflineMessage(OfflineMessage message); /** * 获取离线消息 @@ -60,5 +60,5 @@ public interface MessageRegistry extends StartUp { * @param topic * @return */ - List getOfflineMessage(String topic); + List getOfflineMessage(String topic); } diff --git a/smqtt-common/src/main/java/io/github/quickmsg/common/message/OfflineMessage.java b/smqtt-common/src/main/java/io/github/quickmsg/common/message/OfflineMessage.java new file mode 100644 index 00000000..9771c565 --- /dev/null +++ b/smqtt-common/src/main/java/io/github/quickmsg/common/message/OfflineMessage.java @@ -0,0 +1,62 @@ +package io.github.quickmsg.common.message; + +import java.util.HashMap; +import java.util.Optional; + +import io.github.quickmsg.common.channel.MqttChannel; +import io.github.quickmsg.common.utils.JacksonUtil; +import io.github.quickmsg.common.utils.MessageUtils; +import io.netty.buffer.PooledByteBufAllocator; +import io.netty.handler.codec.mqtt.MqttProperties; +import io.netty.handler.codec.mqtt.MqttPublishMessage; +import io.netty.handler.codec.mqtt.MqttPublishVariableHeader; +import io.netty.handler.codec.mqtt.MqttQoS; +import lombok.Builder; +import lombok.Data; + +/** + * @Author: Jingwu.Zhou + * @Date: 2022/1/24 + */ +@Data +@Builder +public class OfflineMessage { + private int qos; + + private String topic; + + private byte[] body; + + private String userProperties; + + public static OfflineMessage of(MqttPublishMessage mqttPublishMessage) { + MqttPublishVariableHeader publishVariableHeader = mqttPublishMessage.variableHeader(); + return OfflineMessage.builder() + .topic(publishVariableHeader.topicName()) + .qos(mqttPublishMessage.fixedHeader().qosLevel().value()) + .body(MessageUtils.copyByteBuf(mqttPublishMessage.payload())) + .userProperties(JacksonUtil.map2Json(Optional.ofNullable(publishVariableHeader + .properties() + .getProperties(MqttProperties.MqttPropertyType.USER_PROPERTY.value())) + .map(list -> { + HashMap propertiesMap = new HashMap<>(list.size()); + list.forEach(property -> { + MqttProperties.StringPair pair = (MqttProperties.StringPair) property.value(); + propertiesMap.put(pair.key, pair.value); + }); + return propertiesMap; + }).orElseGet(HashMap::new))) + .build(); + } + + public MqttPublishMessage toPublishMessage(MqttChannel mqttChannel) { + return MqttMessageBuilder.buildPub( + false, + MqttQoS.valueOf(this.qos), + qos > 0 ? mqttChannel.generateMessageId() : 0, + topic, + PooledByteBufAllocator.DEFAULT.directBuffer().writeBytes(body), + JacksonUtil.json2Map(userProperties, String.class, String.class)); + } + +} diff --git a/smqtt-common/src/main/java/io/github/quickmsg/common/message/RetainMessage.java b/smqtt-common/src/main/java/io/github/quickmsg/common/message/RetainMessage.java index 8f64c4cd..2d67c21e 100644 --- a/smqtt-common/src/main/java/io/github/quickmsg/common/message/RetainMessage.java +++ b/smqtt-common/src/main/java/io/github/quickmsg/common/message/RetainMessage.java @@ -35,19 +35,17 @@ public class RetainMessage { .topic(publishVariableHeader.topicName()) .qos(mqttPublishMessage.fixedHeader().qosLevel().value()) .body(MessageUtils.copyByteBuf(mqttPublishMessage.payload())) - .userProperties(JacksonUtil - .map2Json(Optional - .ofNullable(publishVariableHeader - .properties() - .getProperties(MqttProperties.MqttPropertyType.USER_PROPERTY.value())) - .map(list -> { - HashMap propertiesMap = new HashMap<>(); - list.forEach(property -> { - MqttProperties.StringPair pair = (MqttProperties.StringPair) property.value(); - propertiesMap.put(pair.key, pair.value); - }); - return propertiesMap; - }).get())) + .userProperties(JacksonUtil.map2Json(Optional.ofNullable(publishVariableHeader + .properties() + .getProperties(MqttProperties.MqttPropertyType.USER_PROPERTY.value())) + .map(list -> { + HashMap propertiesMap = new HashMap<>(list.size()); + list.forEach(property -> { + MqttProperties.StringPair pair = (MqttProperties.StringPair) property.value(); + propertiesMap.put(pair.key, pair.value); + }); + return propertiesMap; + }).orElseGet(HashMap::new))) .build(); } diff --git a/smqtt-common/src/main/java/io/github/quickmsg/common/message/SessionMessage.java b/smqtt-common/src/main/java/io/github/quickmsg/common/message/SessionMessage.java index 2337caeb..f5aecea5 100644 --- a/smqtt-common/src/main/java/io/github/quickmsg/common/message/SessionMessage.java +++ b/smqtt-common/src/main/java/io/github/quickmsg/common/message/SessionMessage.java @@ -42,19 +42,17 @@ public class SessionMessage { .qos(mqttPublishMessage.fixedHeader().qosLevel().value()) .retain(mqttPublishMessage.fixedHeader().isRetain()) .body(MessageUtils.copyByteBuf(mqttPublishMessage.payload())) - .userProperties(JacksonUtil - .map2Json(Optional - .ofNullable(publishVariableHeader - .properties() - .getProperties(MqttProperties.MqttPropertyType.USER_PROPERTY.value())) - .map(list -> { - HashMap propertiesMap = new HashMap<>(); - list.forEach(property -> { - MqttProperties.StringPair pair = (MqttProperties.StringPair) property.value(); - propertiesMap.put(pair.key, pair.value); - }); - return propertiesMap; - }).get())) + .userProperties(JacksonUtil.map2Json(Optional.ofNullable(publishVariableHeader + .properties() + .getProperties(MqttProperties.MqttPropertyType.USER_PROPERTY.value())) + .map(list -> { + HashMap propertiesMap = new HashMap<>(list.size()); + list.forEach(property -> { + MqttProperties.StringPair pair = (MqttProperties.StringPair) property.value(); + propertiesMap.put(pair.key, pair.value); + }); + return propertiesMap; + }).orElseGet(HashMap::new))) .build(); } diff --git a/smqtt-core/pom.xml b/smqtt-core/pom.xml index e4e557ca..be754e80 100644 --- a/smqtt-core/pom.xml +++ b/smqtt-core/pom.xml @@ -36,6 +36,11 @@ smqtt-persistent-redis 1.1.2 + + io.github.quickmsg + smqtt-persistent-db + 1.1.2 + \ No newline at end of file diff --git a/smqtt-core/src/main/java/io/github/quickmsg/core/protocol/PublishProtocol.java b/smqtt-core/src/main/java/io/github/quickmsg/core/protocol/PublishProtocol.java index 864b8bb6..eadaa278 100644 --- a/smqtt-core/src/main/java/io/github/quickmsg/core/protocol/PublishProtocol.java +++ b/smqtt-core/src/main/java/io/github/quickmsg/core/protocol/PublishProtocol.java @@ -56,7 +56,7 @@ public class PublishProtocol implements Protocol { message.fixedHeader().qosLevel()); // 定制化处理,一对一发送的离线消息进行持久化,保证送达 if (mqttChannels.isEmpty() && variableHeader.topicName().startsWith(ONE2ONE_TOPIC_PREFIX)) { - messageRegistry.saveOfflineMessage(RetainMessage.of(message)); + messageRegistry.saveOfflineMessage(OfflineMessage.of(message)); } // http mock if (mqttChannel.getIsMock()) { diff --git a/smqtt-core/src/main/java/io/github/quickmsg/core/protocol/SubscribeProtocol.java b/smqtt-core/src/main/java/io/github/quickmsg/core/protocol/SubscribeProtocol.java index 4cb98a0b..1f7914a5 100644 --- a/smqtt-core/src/main/java/io/github/quickmsg/core/protocol/SubscribeProtocol.java +++ b/smqtt-core/src/main/java/io/github/quickmsg/core/protocol/SubscribeProtocol.java @@ -81,8 +81,8 @@ public class SubscribeProtocol implements Protocol { */ private void loadOfflineMessage(MessageRegistry messageRegistry, MqttChannel mqttChannel, String topicName) { Optional.ofNullable(messageRegistry.getOfflineMessage(topicName)).ifPresent(msgList -> - msgList.forEach(retainMessage -> - mqttChannel.write(retainMessage.toPublishMessage(mqttChannel), retainMessage.getQos() > 0) + msgList.forEach(offlineMessage -> + mqttChannel.write(offlineMessage.toPublishMessage(mqttChannel), offlineMessage.getQos() > 0) .subscribe())); } diff --git a/smqtt-core/src/main/java/io/github/quickmsg/core/spi/DefaultMessageRegistry.java b/smqtt-core/src/main/java/io/github/quickmsg/core/spi/DefaultMessageRegistry.java index b3f2919b..67e35485 100644 --- a/smqtt-core/src/main/java/io/github/quickmsg/core/spi/DefaultMessageRegistry.java +++ b/smqtt-core/src/main/java/io/github/quickmsg/core/spi/DefaultMessageRegistry.java @@ -1,6 +1,7 @@ package io.github.quickmsg.core.spi; import io.github.quickmsg.common.message.MessageRegistry; +import io.github.quickmsg.common.message.OfflineMessage; import io.github.quickmsg.common.message.RetainMessage; import io.github.quickmsg.common.message.SessionMessage; import io.github.quickmsg.common.utils.TopicRegexUtils; @@ -22,6 +23,8 @@ public class DefaultMessageRegistry implements MessageRegistry { private Map retainMessages = new ConcurrentHashMap<>(); + private Map> offlineMessages = new ConcurrentHashMap<>(); + @Override public List getSessionMessage(String clientIdentifier) { @@ -49,14 +52,14 @@ public class DefaultMessageRegistry implements MessageRegistry { } @Override - public List getOfflineMessage(String topic) { - // TODO impl - return Collections.EMPTY_LIST; + public List getOfflineMessage(String topic) { + return offlineMessages.remove(topic); } @Override - public void saveOfflineMessage(RetainMessage offlineMessage) { - // TODO impl + public void saveOfflineMessage(OfflineMessage offlineMessage) { + List offlineList = offlineMessages.computeIfAbsent(offlineMessage.getTopic(), key -> new CopyOnWriteArrayList<>()); + offlineList.add(offlineMessage); } } diff --git a/smqtt-persistent/smqtt-persistent-db/src/main/java/io/github/quickmsg/persistent/registry/DbMessageRegistry.java b/smqtt-persistent/smqtt-persistent-db/src/main/java/io/github/quickmsg/persistent/registry/DbMessageRegistry.java index bde25550..20d9d996 100644 --- a/smqtt-persistent/smqtt-persistent-db/src/main/java/io/github/quickmsg/persistent/registry/DbMessageRegistry.java +++ b/smqtt-persistent/smqtt-persistent-db/src/main/java/io/github/quickmsg/persistent/registry/DbMessageRegistry.java @@ -2,6 +2,7 @@ package io.github.quickmsg.persistent.registry; import io.github.quickmsg.common.config.BootstrapConfig; import io.github.quickmsg.common.message.MessageRegistry; +import io.github.quickmsg.common.message.OfflineMessage; import io.github.quickmsg.common.message.RetainMessage; import io.github.quickmsg.common.message.SessionMessage; import io.github.quickmsg.common.utils.TopicRegexUtils; @@ -86,6 +87,7 @@ public class DbMessageRegistry implements MessageRegistry { .qos(record.getQos()) .topic(record.getTopic()) .body(record.getBody().getBytes()) + .userProperties(record.getUserProperties()) .clientIdentifier(record.getClientId()) .retain(record.getRetain()) .build()) @@ -111,6 +113,7 @@ public class DbMessageRegistry implements MessageRegistry { int qos = sessionMessage.getQos(); boolean retain = sessionMessage.isRetain(); byte[] body = sessionMessage.getBody(); + String userProperties = sessionMessage.getUserProperties(); try (Connection connection = HikariCPConnectionProvider.singleTon().getConnection()) { DSLContext dslContext = DSL.using(connection); @@ -121,8 +124,9 @@ public class DbMessageRegistry implements MessageRegistry { Tables.SMQTT_SESSION.QOS, Tables.SMQTT_SESSION.RETAIN, Tables.SMQTT_SESSION.BODY, - Tables.SMQTT_SESSION.CREATE_TIME) - .values(topic, clientIdentifier, qos, retain, bodyMsg, LocalDateTime.now()) + Tables.SMQTT_SESSION.CREATE_TIME, + Tables.SMQTT_SESSION.USER_PROPERTIES) + .values(topic, clientIdentifier, qos, retain, bodyMsg, LocalDateTime.now(), userProperties) .execute(); } catch (Exception e) { log.error("sendSessionMessages error message: {}", clientIdentifier, e); @@ -144,24 +148,26 @@ public class DbMessageRegistry implements MessageRegistry { .from(Tables.SMQTT_RETAIN) .where(Tables.SMQTT_RETAIN.TOPIC.eq(topic)) .fetchAny(); + String bodyMsg = new String(retainMessage.getBody(), CharsetUtil.UTF_8); + String userProperties = retainMessage.getUserProperties(); if (integerRecord1 != null && integerRecord1.value1() != null && integerRecord1.value1() > 0) { // 更新记录 - String bodyMsg = new String(retainMessage.getBody(), CharsetUtil.UTF_8); dslContext.update(Tables.SMQTT_RETAIN) .set(Tables.SMQTT_RETAIN.QOS, qos) .set(Tables.SMQTT_RETAIN.BODY, bodyMsg) .set(Tables.SMQTT_RETAIN.UPDATE_TIME, LocalDateTime.now()) + .set(Tables.SMQTT_RETAIN.USER_PROPERTIES, userProperties) .where(Tables.SMQTT_RETAIN.TOPIC.eq(topic)) .execute(); } else { // 新增记录 - String bodyMsg = new String(retainMessage.getBody(), CharsetUtil.UTF_8); dslContext.insertInto(Tables.SMQTT_RETAIN) .columns(Tables.SMQTT_RETAIN.TOPIC, Tables.SMQTT_RETAIN.QOS, Tables.SMQTT_RETAIN.BODY, - Tables.SMQTT_RETAIN.CREATE_TIME) - .values(topic, qos, bodyMsg, LocalDateTime.now()) + Tables.SMQTT_RETAIN.CREATE_TIME, + Tables.SMQTT_RETAIN.USER_PROPERTIES) + .values(topic, qos, bodyMsg, LocalDateTime.now(), userProperties) .execute(); } } @@ -183,6 +189,7 @@ public class DbMessageRegistry implements MessageRegistry { .topic(record.getTopic()) .qos(record.getQos()) .body(getBody(record.getBody())) + .userProperties(record.getUserProperties()) .build() ) .collect(Collectors.toList()); @@ -199,14 +206,58 @@ public class DbMessageRegistry implements MessageRegistry { } @Override - public List getOfflineMessage(String topic) { - // TODO impl - return Collections.EMPTY_LIST; + public List getOfflineMessage(String topic) { + try (Connection connection = HikariCPConnectionProvider.singleTon().getConnection()) { + DSLContext dslContext = DSL.using(connection); + + List list = dslContext + .selectFrom(Tables.SMQTT_OFFLINE) + .where(Tables.SMQTT_OFFLINE.TOPIC.eq(topic)) + .fetch() + .stream() + .map(record -> + OfflineMessage.builder() + .qos(record.getQos()) + .topic(record.getTopic()) + .body(record.getBody().getBytes()) + .userProperties(record.getUserProperties()) + .build()) + .collect(Collectors.toList()); + + if (list.size() > 0) { + // 删除记录 + dslContext.deleteFrom(Tables.SMQTT_OFFLINE) + .where(Tables.SMQTT_OFFLINE.TOPIC.eq(topic)) + .execute(); + } + return list; + } catch (Exception e) { + log.error("getOfflineMessages error topic:{}", topic, e); + return Collections.emptyList(); + } } @Override - public void saveOfflineMessage(RetainMessage offlineMessage) { - // TODO impl + public void saveOfflineMessage(OfflineMessage offlineMessage) { + String topic = offlineMessage.getTopic(); + int qos = offlineMessage.getQos(); + byte[] body = offlineMessage.getBody(); + String userProperties = offlineMessage.getUserProperties(); + + try (Connection connection = HikariCPConnectionProvider.singleTon().getConnection()) { + DSLContext dslContext = DSL.using(connection); + String bodyMsg = new String(body, CharsetUtil.UTF_8); + dslContext.insertInto(Tables.SMQTT_OFFLINE) + .columns(Tables.SMQTT_OFFLINE.TOPIC, + Tables.SMQTT_OFFLINE.QOS, + Tables.SMQTT_OFFLINE.BODY, + Tables.SMQTT_OFFLINE.CREATE_TIME, + Tables.SMQTT_OFFLINE.USER_PROPERTIES) + .values(topic, qos, bodyMsg, LocalDateTime.now(), userProperties) + .execute(); + } catch (Exception e) { + log.error("saveOfflineMessages error message: {}", topic, e); + } } } diff --git a/smqtt-persistent/smqtt-persistent-db/src/main/java/io/github/quickmsg/persistent/tables/Indexes.java b/smqtt-persistent/smqtt-persistent-db/src/main/java/io/github/quickmsg/persistent/tables/Indexes.java index 376554f0..a09f8904 100644 --- a/smqtt-persistent/smqtt-persistent-db/src/main/java/io/github/quickmsg/persistent/tables/Indexes.java +++ b/smqtt-persistent/smqtt-persistent-db/src/main/java/io/github/quickmsg/persistent/tables/Indexes.java @@ -4,6 +4,7 @@ package io.github.quickmsg.persistent.tables; +import io.github.quickmsg.persistent.tables.tables.SmqttOffline; import io.github.quickmsg.persistent.tables.tables.SmqttRetain; import org.jooq.Index; @@ -23,4 +24,6 @@ public class Indexes { // ------------------------------------------------------------------------- public static final Index SMQTT_RETAIN_INDEX_TOPIC = Internal.createIndex(DSL.name("index_topic"), SmqttRetain.SMQTT_RETAIN, new OrderField[] { SmqttRetain.SMQTT_RETAIN.TOPIC }, false); + + public static final Index SMQTT_OFFLINE_INDEX_TOPIC = Internal.createIndex(DSL.name("index_topic"), SmqttOffline.SMQTT_OFFLINE, new OrderField[] { SmqttOffline.SMQTT_OFFLINE.TOPIC }, false); } diff --git a/smqtt-persistent/smqtt-persistent-db/src/main/java/io/github/quickmsg/persistent/tables/Smqtt.java b/smqtt-persistent/smqtt-persistent-db/src/main/java/io/github/quickmsg/persistent/tables/Smqtt.java index 010bb68d..c2a5712d 100644 --- a/smqtt-persistent/smqtt-persistent-db/src/main/java/io/github/quickmsg/persistent/tables/Smqtt.java +++ b/smqtt-persistent/smqtt-persistent-db/src/main/java/io/github/quickmsg/persistent/tables/Smqtt.java @@ -6,6 +6,7 @@ package io.github.quickmsg.persistent.tables; import io.github.quickmsg.persistent.tables.tables.Databasechangelog; import io.github.quickmsg.persistent.tables.tables.Databasechangeloglock; +import io.github.quickmsg.persistent.tables.tables.SmqttOffline; import io.github.quickmsg.persistent.tables.tables.SmqttRetain; import io.github.quickmsg.persistent.tables.tables.SmqttSession; @@ -40,6 +41,9 @@ public class Smqtt extends SchemaImpl { public final SmqttSession SMQTT_SESSION = SmqttSession.SMQTT_SESSION; + public static final SmqttOffline SMQTT_OFFLINE = SmqttOffline.SMQTT_OFFLINE; + + private Smqtt() { super("smqtt", null); } @@ -53,9 +57,10 @@ public class Smqtt extends SchemaImpl { @Override public final List> getTables() { return Arrays.>asList( - Databasechangelog.DATABASECHANGELOG, - Databasechangeloglock.DATABASECHANGELOGLOCK, - SmqttRetain.SMQTT_RETAIN, - SmqttSession.SMQTT_SESSION); + Databasechangelog.DATABASECHANGELOG, + Databasechangeloglock.DATABASECHANGELOGLOCK, + SmqttRetain.SMQTT_RETAIN, + SmqttSession.SMQTT_SESSION, + SmqttOffline.SMQTT_OFFLINE); } } diff --git a/smqtt-persistent/smqtt-persistent-db/src/main/java/io/github/quickmsg/persistent/tables/Tables.java b/smqtt-persistent/smqtt-persistent-db/src/main/java/io/github/quickmsg/persistent/tables/Tables.java index dc24425d..d1bdbc0d 100644 --- a/smqtt-persistent/smqtt-persistent-db/src/main/java/io/github/quickmsg/persistent/tables/Tables.java +++ b/smqtt-persistent/smqtt-persistent-db/src/main/java/io/github/quickmsg/persistent/tables/Tables.java @@ -6,6 +6,7 @@ package io.github.quickmsg.persistent.tables; import io.github.quickmsg.persistent.tables.tables.Databasechangelog; import io.github.quickmsg.persistent.tables.tables.Databasechangeloglock; +import io.github.quickmsg.persistent.tables.tables.SmqttOffline; import io.github.quickmsg.persistent.tables.tables.SmqttRetain; import io.github.quickmsg.persistent.tables.tables.SmqttSession; @@ -27,4 +28,7 @@ public class Tables { public static final SmqttSession SMQTT_SESSION = SmqttSession.SMQTT_SESSION; + + + public static final SmqttOffline SMQTT_OFFLINE = SmqttOffline.SMQTT_OFFLINE; } diff --git a/smqtt-persistent/smqtt-persistent-db/src/main/java/io/github/quickmsg/persistent/tables/tables/SmqttOffline.java b/smqtt-persistent/smqtt-persistent-db/src/main/java/io/github/quickmsg/persistent/tables/tables/SmqttOffline.java new file mode 100644 index 00000000..d6cd317e --- /dev/null +++ b/smqtt-persistent/smqtt-persistent-db/src/main/java/io/github/quickmsg/persistent/tables/tables/SmqttOffline.java @@ -0,0 +1,123 @@ +package io.github.quickmsg.persistent.tables.tables; + +import org.jooq.Field; +import org.jooq.ForeignKey; +import org.jooq.Index; +import org.jooq.Name; +import org.jooq.Record; +import org.jooq.Row5; +import org.jooq.Schema; +import org.jooq.Table; +import org.jooq.TableField; +import org.jooq.TableOptions; +import org.jooq.impl.DSL; +import org.jooq.impl.SQLDataType; +import org.jooq.impl.TableImpl; + +import java.time.LocalDateTime; +import java.util.Arrays; +import java.util.List; + +import io.github.quickmsg.persistent.tables.Indexes; +import io.github.quickmsg.persistent.tables.Smqtt; +import io.github.quickmsg.persistent.tables.tables.records.SmqttOfflineRecord; + +/** + * @Author: Jingwu.Zhou + * @Date: 2022/1/24 + */ +public class SmqttOffline extends TableImpl { + private static final long serialVersionUID = 1L; + + + public static final SmqttOffline SMQTT_OFFLINE = new SmqttOffline(); + + + @Override + public Class getRecordType() { + return SmqttOfflineRecord.class; + } + + public final TableField TOPIC = createField(DSL.name("topic"), SQLDataType.VARCHAR(255).defaultValue(DSL.inline("NULL", SQLDataType.VARCHAR)), this, "话题"); + + + public final TableField QOS = createField(DSL.name("qos"), SQLDataType.INTEGER.defaultValue(DSL.inline("NULL", SQLDataType.INTEGER)), this, "qos"); + + + public final TableField BODY = createField(DSL.name("body"), SQLDataType.VARCHAR(255).defaultValue(DSL.inline("NULL", SQLDataType.VARCHAR)), this, "消息内容"); + + + public final TableField CREATE_TIME = createField(DSL.name("create_time"), SQLDataType.LOCALDATETIME(0).defaultValue(DSL.inline("NULL", SQLDataType.LOCALDATETIME)), this, "记录保存时间"); + + + public final TableField USER_PROPERTIES = createField(DSL.name("user_properties"), SQLDataType.VARCHAR(1000).defaultValue(DSL.inline("NULL", SQLDataType.VARCHAR)), this, "可变头userProperties"); + + + private SmqttOffline(Name alias, Table aliased) { + this(alias, aliased, null); + } + + private SmqttOffline(Name alias, Table aliased, Field[] parameters) { + super(alias, null, aliased, parameters, DSL.comment(""), TableOptions.table()); + } + + + public SmqttOffline(String alias) { + this(DSL.name(alias), SMQTT_OFFLINE); + } + + + public SmqttOffline(Name alias) { + this(alias, SMQTT_OFFLINE); + } + + + public SmqttOffline() { + this(DSL.name("smqtt_offline"), null); + } + + public SmqttOffline(Table child, ForeignKey key) { + super(child, key, SMQTT_OFFLINE); + } + + @Override + public Schema getSchema() { + return Smqtt.SMQTT; + } + + @Override + public List getIndexes() { + return Arrays.asList(Indexes.SMQTT_OFFLINE_INDEX_TOPIC); + } + + @Override + public SmqttOffline as(String alias) { + return new SmqttOffline(DSL.name(alias), this); + } + + @Override + public SmqttOffline as(Name alias) { + return new SmqttOffline(alias, this); + } + + + @Override + public SmqttOffline rename(String name) { + return new SmqttOffline(DSL.name(name), null); + } + + + @Override + public SmqttOffline rename(Name name) { + return new SmqttOffline(name, null); + } + + // ------------------------------------------------------------------------- + // Row5 type methods + // ------------------------------------------------------------------------- + + @Override + public Row5 fieldsRow() { + return (Row5) super.fieldsRow(); + } +} diff --git a/smqtt-persistent/smqtt-persistent-db/src/main/java/io/github/quickmsg/persistent/tables/tables/SmqttRetain.java b/smqtt-persistent/smqtt-persistent-db/src/main/java/io/github/quickmsg/persistent/tables/tables/SmqttRetain.java index 11edf2ce..5fb01bcb 100644 --- a/smqtt-persistent/smqtt-persistent-db/src/main/java/io/github/quickmsg/persistent/tables/tables/SmqttRetain.java +++ b/smqtt-persistent/smqtt-persistent-db/src/main/java/io/github/quickmsg/persistent/tables/tables/SmqttRetain.java @@ -17,7 +17,7 @@ import org.jooq.ForeignKey; import org.jooq.Index; import org.jooq.Name; import org.jooq.Record; -import org.jooq.Row5; +import org.jooq.Row6; import org.jooq.Schema; import org.jooq.Table; import org.jooq.TableField; @@ -57,6 +57,8 @@ public class SmqttRetain extends TableImpl { public final TableField UPDATE_TIME = createField(DSL.name("update_time"), SQLDataType.LOCALDATETIME(0).defaultValue(DSL.inline("NULL", SQLDataType.LOCALDATETIME)), this, "记录更新时间"); + public final TableField USER_PROPERTIES = createField(DSL.name("user_properties"), SQLDataType.VARCHAR(1000).defaultValue(DSL.inline("NULL", SQLDataType.VARCHAR)), this, "可变头userProperties"); + private SmqttRetain(Name alias, Table aliased) { this(alias, aliased, null); } @@ -121,7 +123,7 @@ public class SmqttRetain extends TableImpl { // ------------------------------------------------------------------------- @Override - public Row5 fieldsRow() { - return (Row5) super.fieldsRow(); + public Row6 fieldsRow() { + return (Row6) super.fieldsRow(); } } diff --git a/smqtt-persistent/smqtt-persistent-db/src/main/java/io/github/quickmsg/persistent/tables/tables/SmqttSession.java b/smqtt-persistent/smqtt-persistent-db/src/main/java/io/github/quickmsg/persistent/tables/tables/SmqttSession.java index 8e46bde6..cebf7fb2 100644 --- a/smqtt-persistent/smqtt-persistent-db/src/main/java/io/github/quickmsg/persistent/tables/tables/SmqttSession.java +++ b/smqtt-persistent/smqtt-persistent-db/src/main/java/io/github/quickmsg/persistent/tables/tables/SmqttSession.java @@ -13,7 +13,7 @@ import org.jooq.Field; import org.jooq.ForeignKey; import org.jooq.Name; import org.jooq.Record; -import org.jooq.Row6; +import org.jooq.Row7; import org.jooq.Schema; import org.jooq.Table; import org.jooq.TableField; @@ -62,6 +62,8 @@ public class SmqttSession extends TableImpl { public final TableField CREATE_TIME = createField(DSL.name("create_time"), SQLDataType.LOCALDATETIME(0).defaultValue(DSL.inline("NULL", SQLDataType.LOCALDATETIME)), this, "记录保存时间"); + public final TableField USER_PROPERTIES = createField(DSL.name("user_properties"), SQLDataType.VARCHAR(1000).defaultValue(DSL.inline("NULL", SQLDataType.VARCHAR)), this, "可变头userProperties"); + private SmqttSession(Name alias, Table aliased) { this(alias, aliased, null); } @@ -121,7 +123,7 @@ public class SmqttSession extends TableImpl { // ------------------------------------------------------------------------- @Override - public Row6 fieldsRow() { - return (Row6) super.fieldsRow(); + public Row7 fieldsRow() { + return (Row7) super.fieldsRow(); } } diff --git a/smqtt-persistent/smqtt-persistent-db/src/main/java/io/github/quickmsg/persistent/tables/tables/records/SmqttOfflineRecord.java b/smqtt-persistent/smqtt-persistent-db/src/main/java/io/github/quickmsg/persistent/tables/tables/records/SmqttOfflineRecord.java new file mode 100644 index 00000000..5408bd50 --- /dev/null +++ b/smqtt-persistent/smqtt-persistent-db/src/main/java/io/github/quickmsg/persistent/tables/tables/records/SmqttOfflineRecord.java @@ -0,0 +1,214 @@ +package io.github.quickmsg.persistent.tables.tables.records; + +import org.jooq.Field; +import org.jooq.Record5; +import org.jooq.Row5; +import org.jooq.impl.TableRecordImpl; + +import java.time.LocalDateTime; + +import io.github.quickmsg.persistent.tables.tables.SmqttOffline; + +/** + * @Author: Jingwu.Zhou + * @Date: 2022/1/24 + */ +public class SmqttOfflineRecord extends TableRecordImpl implements Record5 { + private static final long serialVersionUID = 1L; + + public void setTopic(String value) { + set(0, value); + } + + + public String getTopic() { + return (String) get(0); + } + + + public void setQos(Integer value) { + set(1, value); + } + + + public Integer getQos() { + return (Integer) get(1); + } + + + public void setBody(String value) { + set(2, value); + } + + + public String getBody() { + return (String) get(2); + } + + public void setCreateTime(LocalDateTime value) { + set(3, value); + } + + public LocalDateTime getCreateTime() { + return (LocalDateTime) get(3); + } + + + public void setUserProperties(String value) { + set(4, value); + } + + + public String getUserProperties() { + return (String) get(4); + } + + // ------------------------------------------------------------------------- + // Record5 type implementation + // ------------------------------------------------------------------------- + + @Override + public Row5 fieldsRow() { + return (Row5) super.fieldsRow(); + } + + @Override + public Row5 valuesRow() { + return (Row5) super.valuesRow(); + } + + @Override + public Field field1() { + return SmqttOffline.SMQTT_OFFLINE.TOPIC; + } + + @Override + public Field field2() { + return SmqttOffline.SMQTT_OFFLINE.QOS; + } + + @Override + public Field field3() { + return SmqttOffline.SMQTT_OFFLINE.BODY; + } + + @Override + public Field field4() { + return SmqttOffline.SMQTT_OFFLINE.CREATE_TIME; + } + + @Override + public Field field5() { + return SmqttOffline.SMQTT_OFFLINE.USER_PROPERTIES; + } + + @Override + public String component1() { + return getTopic(); + } + + @Override + public Integer component2() { + return getQos(); + } + + @Override + public String component3() { + return getBody(); + } + + @Override + public LocalDateTime component4() { + return getCreateTime(); + } + + @Override + public String component5() { + return getUserProperties(); + } + + @Override + public String value1() { + return getTopic(); + } + + @Override + public Integer value2() { + return getQos(); + } + + @Override + public String value3() { + return getBody(); + } + + @Override + public LocalDateTime value4() { + return getCreateTime(); + } + + @Override + public String value5() { + return getUserProperties(); + } + + @Override + public SmqttOfflineRecord value1(String value) { + setTopic(value); + return this; + } + + @Override + public SmqttOfflineRecord value2(Integer value) { + setQos(value); + return this; + } + + @Override + public SmqttOfflineRecord value3(String value) { + setBody(value); + return this; + } + + @Override + public SmqttOfflineRecord value4(LocalDateTime value) { + setCreateTime(value); + return this; + } + + @Override + public SmqttOfflineRecord value5(String value) { + setUserProperties(value); + return this; + } + + @Override + public SmqttOfflineRecord values(String value1, Integer value2, String value3, LocalDateTime value4, String value5) { + value1(value1); + value2(value2); + value3(value3); + value4(value4); + value5(value5); + return this; + } + + // ------------------------------------------------------------------------- + // Constructors + // ------------------------------------------------------------------------- + + + public SmqttOfflineRecord() { + super(SmqttOffline.SMQTT_OFFLINE); + } + + + public SmqttOfflineRecord(String topic, Integer qos, String body, LocalDateTime createTime, LocalDateTime updateTime, String userProperties) { + super(SmqttOffline.SMQTT_OFFLINE); + + setTopic(topic); + setQos(qos); + setBody(body); + setCreateTime(createTime); + setUserProperties(userProperties); + } +} diff --git a/smqtt-persistent/smqtt-persistent-db/src/main/java/io/github/quickmsg/persistent/tables/tables/records/SmqttRetainRecord.java b/smqtt-persistent/smqtt-persistent-db/src/main/java/io/github/quickmsg/persistent/tables/tables/records/SmqttRetainRecord.java index df93c1d9..119e9376 100644 --- a/smqtt-persistent/smqtt-persistent-db/src/main/java/io/github/quickmsg/persistent/tables/tables/records/SmqttRetainRecord.java +++ b/smqtt-persistent/smqtt-persistent-db/src/main/java/io/github/quickmsg/persistent/tables/tables/records/SmqttRetainRecord.java @@ -9,8 +9,8 @@ import io.github.quickmsg.persistent.tables.tables.SmqttRetain; import java.time.LocalDateTime; import org.jooq.Field; -import org.jooq.Record5; -import org.jooq.Row5; +import org.jooq.Record6; +import org.jooq.Row6; import org.jooq.impl.TableRecordImpl; @@ -18,7 +18,7 @@ import org.jooq.impl.TableRecordImpl; * This class is generated by jOOQ. */ @SuppressWarnings({ "all", "unchecked", "rawtypes" }) -public class SmqttRetainRecord extends TableRecordImpl implements Record5 { +public class SmqttRetainRecord extends TableRecordImpl implements Record6 { private static final long serialVersionUID = 1L; @@ -69,18 +69,27 @@ public class SmqttRetainRecord extends TableRecordImpl implem return (LocalDateTime) get(4); } + public void setUserProperties(String value) { + set(5, value); + } + + + public String getUserProperties() { + return (String) get(5); + } + // ------------------------------------------------------------------------- // Record5 type implementation // ------------------------------------------------------------------------- @Override - public Row5 fieldsRow() { - return (Row5) super.fieldsRow(); + public Row6 fieldsRow() { + return (Row6) super.fieldsRow(); } @Override - public Row5 valuesRow() { - return (Row5) super.valuesRow(); + public Row6 valuesRow() { + return (Row6) super.valuesRow(); } @Override @@ -108,6 +117,11 @@ public class SmqttRetainRecord extends TableRecordImpl implem return SmqttRetain.SMQTT_RETAIN.UPDATE_TIME; } + @Override + public Field field6() { + return SmqttRetain.SMQTT_RETAIN.USER_PROPERTIES; + } + @Override public String component1() { return getTopic(); @@ -133,6 +147,11 @@ public class SmqttRetainRecord extends TableRecordImpl implem return getUpdateTime(); } + @Override + public String component6() { + return getUserProperties(); + } + @Override public String value1() { return getTopic(); @@ -158,6 +177,11 @@ public class SmqttRetainRecord extends TableRecordImpl implem return getUpdateTime(); } + @Override + public String value6() { + return getUserProperties(); + } + @Override public SmqttRetainRecord value1(String value) { setTopic(value); @@ -189,12 +213,19 @@ public class SmqttRetainRecord extends TableRecordImpl implem } @Override - public SmqttRetainRecord values(String value1, Integer value2, String value3, LocalDateTime value4, LocalDateTime value5) { + public SmqttRetainRecord value6(String value) { + setUserProperties(value); + return this; + } + + @Override + public SmqttRetainRecord values(String value1, Integer value2, String value3, LocalDateTime value4, LocalDateTime value5, String value6) { value1(value1); value2(value2); value3(value3); value4(value4); value5(value5); + value6(value6); return this; } @@ -208,7 +239,7 @@ public class SmqttRetainRecord extends TableRecordImpl implem } - public SmqttRetainRecord(String topic, Integer qos, String body, LocalDateTime createTime, LocalDateTime updateTime) { + public SmqttRetainRecord(String topic, Integer qos, String body, LocalDateTime createTime, LocalDateTime updateTime, String userProperties) { super(SmqttRetain.SMQTT_RETAIN); setTopic(topic); @@ -216,5 +247,6 @@ public class SmqttRetainRecord extends TableRecordImpl implem setBody(body); setCreateTime(createTime); setUpdateTime(updateTime); + setUserProperties(userProperties); } } diff --git a/smqtt-persistent/smqtt-persistent-db/src/main/java/io/github/quickmsg/persistent/tables/tables/records/SmqttSessionRecord.java b/smqtt-persistent/smqtt-persistent-db/src/main/java/io/github/quickmsg/persistent/tables/tables/records/SmqttSessionRecord.java index 8904fe46..f7e84502 100644 --- a/smqtt-persistent/smqtt-persistent-db/src/main/java/io/github/quickmsg/persistent/tables/tables/records/SmqttSessionRecord.java +++ b/smqtt-persistent/smqtt-persistent-db/src/main/java/io/github/quickmsg/persistent/tables/tables/records/SmqttSessionRecord.java @@ -9,8 +9,8 @@ import io.github.quickmsg.persistent.tables.tables.SmqttSession; import java.time.LocalDateTime; import org.jooq.Field; -import org.jooq.Record6; -import org.jooq.Row6; +import org.jooq.Record7; +import org.jooq.Row7; import org.jooq.impl.TableRecordImpl; @@ -18,7 +18,7 @@ import org.jooq.impl.TableRecordImpl; * This class is generated by jOOQ. */ @SuppressWarnings({ "all", "unchecked", "rawtypes" }) -public class SmqttSessionRecord extends TableRecordImpl implements Record6 { +public class SmqttSessionRecord extends TableRecordImpl implements Record7 { private static final long serialVersionUID = 1L; @@ -72,6 +72,15 @@ public class SmqttSessionRecord extends TableRecordImpl impl return (String) get(4); } + public void setUserProperties(String value) { + set(6, value); + } + + + public String getUserProperties() { + return (String) get(6); + } + public void setCreateTime(LocalDateTime value) { set(5, value); @@ -87,13 +96,13 @@ public class SmqttSessionRecord extends TableRecordImpl impl // ------------------------------------------------------------------------- @Override - public Row6 fieldsRow() { - return (Row6) super.fieldsRow(); + public Row7 fieldsRow() { + return (Row7) super.fieldsRow(); } @Override - public Row6 valuesRow() { - return (Row6) super.valuesRow(); + public Row7 valuesRow() { + return (Row7) super.valuesRow(); } @Override @@ -126,6 +135,11 @@ public class SmqttSessionRecord extends TableRecordImpl impl return SmqttSession.SMQTT_SESSION.CREATE_TIME; } + @Override + public Field field7() { + return SmqttSession.SMQTT_SESSION.USER_PROPERTIES; + } + @Override public String component1() { return getTopic(); @@ -156,6 +170,11 @@ public class SmqttSessionRecord extends TableRecordImpl impl return getCreateTime(); } + @Override + public String component7() { + return getUserProperties(); + } + @Override public String value1() { return getTopic(); @@ -186,6 +205,11 @@ public class SmqttSessionRecord extends TableRecordImpl impl return getCreateTime(); } + @Override + public String value7() { + return getUserProperties(); + } + @Override public SmqttSessionRecord value1(String value) { setTopic(value); @@ -223,13 +247,20 @@ public class SmqttSessionRecord extends TableRecordImpl impl } @Override - public SmqttSessionRecord values(String value1, String value2, Integer value3, Boolean value4, String value5, LocalDateTime value6) { + public SmqttSessionRecord value7(String value) { + setUserProperties(value); + return this; + } + + @Override + public SmqttSessionRecord values(String value1, String value2, Integer value3, Boolean value4, String value5, LocalDateTime value6, String value7) { value1(value1); value2(value2); value3(value3); value4(value4); value5(value5); value6(value6); + value7(value7); return this; } @@ -243,7 +274,7 @@ public class SmqttSessionRecord extends TableRecordImpl impl } - public SmqttSessionRecord(String topic, String clientId, Integer qos, Boolean retain, String body, LocalDateTime createTime) { + public SmqttSessionRecord(String topic, String clientId, Integer qos, Boolean retain, String body, LocalDateTime createTime, String userProperties) { super(SmqttSession.SMQTT_SESSION); setTopic(topic); @@ -252,5 +283,6 @@ public class SmqttSessionRecord extends TableRecordImpl impl setRetain(retain); setBody(body); setCreateTime(createTime); + setUserProperties(userProperties); } } diff --git a/smqtt-persistent/smqtt-persistent-db/src/main/resources/liquibase/smqtt_db.xml b/smqtt-persistent/smqtt-persistent-db/src/main/resources/liquibase/smqtt_db.xml index 8c598562..06cb5bfe 100644 --- a/smqtt-persistent/smqtt-persistent-db/src/main/resources/liquibase/smqtt_db.xml +++ b/smqtt-persistent/smqtt-persistent-db/src/main/resources/liquibase/smqtt_db.xml @@ -14,6 +14,7 @@ + @@ -30,10 +31,26 @@ + + + + create smqtt_offline table + + + + + + + + + + + + \ No newline at end of file diff --git a/smqtt-persistent/smqtt-persistent-redis/src/main/java/io/github/quickmsg/persistent/message/OfflineMessageEntity.java b/smqtt-persistent/smqtt-persistent-redis/src/main/java/io/github/quickmsg/persistent/message/OfflineMessageEntity.java new file mode 100644 index 00000000..44f33d79 --- /dev/null +++ b/smqtt-persistent/smqtt-persistent-redis/src/main/java/io/github/quickmsg/persistent/message/OfflineMessageEntity.java @@ -0,0 +1,30 @@ +package io.github.quickmsg.persistent.message; + +import com.fasterxml.jackson.annotation.JsonFormat; + +import java.io.Serializable; +import java.util.Date; + +import lombok.Builder; +import lombok.Data; + +/** + * @Author: Jingwu.Zhou + * @Date: 2022/1/24 + */ +@Data +@Builder +public class OfflineMessageEntity implements Serializable { + private static final long serialVersionUID = 1095608914696359395L; + + private String topic; + + private Integer qos; + + private byte[] body; + + private String userProperties; + + @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss") + private Date createTime; +} diff --git a/smqtt-persistent/smqtt-persistent-redis/src/main/java/io/github/quickmsg/persistent/registry/RedisMessageRegistry.java b/smqtt-persistent/smqtt-persistent-redis/src/main/java/io/github/quickmsg/persistent/registry/RedisMessageRegistry.java index a31e2df9..9abbbd67 100644 --- a/smqtt-persistent/smqtt-persistent-redis/src/main/java/io/github/quickmsg/persistent/registry/RedisMessageRegistry.java +++ b/smqtt-persistent/smqtt-persistent-redis/src/main/java/io/github/quickmsg/persistent/registry/RedisMessageRegistry.java @@ -3,10 +3,12 @@ package io.github.quickmsg.persistent.registry; import io.github.quickmsg.common.bootstrap.BootstrapKey; import io.github.quickmsg.common.config.BootstrapConfig; import io.github.quickmsg.common.message.MessageRegistry; +import io.github.quickmsg.common.message.OfflineMessage; import io.github.quickmsg.common.message.RetainMessage; import io.github.quickmsg.common.message.SessionMessage; import io.github.quickmsg.common.utils.TopicRegexUtils; import io.github.quickmsg.persistent.factory.ClientFactory; +import io.github.quickmsg.persistent.message.OfflineMessageEntity; import io.github.quickmsg.persistent.message.RetainMessageEntity; import io.github.quickmsg.persistent.message.SessionMessageEntity; import io.github.quickmsg.persistent.strategy.ClientStrategy; @@ -154,11 +156,11 @@ public class RedisMessageRegistry implements MessageRegistry { } @Override - public List getOfflineMessage(String topic) { + public List getOfflineMessage(String topic) { // 一对一发送,只需要精确匹配 try { - RSetCache set = redissonClient.getSetCache(BootstrapKey.Redis.REDIS_OFFLINE_MESSAGE_PREFIX_KEY + topic); - List resList = set.stream().map(record -> RetainMessage.builder() + RSetCache set = redissonClient.getSetCache(BootstrapKey.Redis.REDIS_OFFLINE_MESSAGE_PREFIX_KEY + topic); + List resList = set.stream().map(record -> OfflineMessage.builder() .topic(record.getTopic()) .qos(record.getQos()) .body(record.getBody()) @@ -177,13 +179,13 @@ public class RedisMessageRegistry implements MessageRegistry { } @Override - public void saveOfflineMessage(RetainMessage offlineMessage) { + public void saveOfflineMessage(OfflineMessage offlineMessage) { String topic = offlineMessage.getTopic(); int qos = offlineMessage.getQos(); byte[] body = offlineMessage.getBody(); String userProperties = offlineMessage.getUserProperties(); try { - RetainMessageEntity offlineMessageEntity = RetainMessageEntity.builder() + OfflineMessageEntity offlineMessageEntity = OfflineMessageEntity.builder() .topic(topic) .qos(qos) .body(body) @@ -191,8 +193,8 @@ public class RedisMessageRegistry implements MessageRegistry { .createTime(new Date()) .build(); - RSetCache set = redissonClient.getSetCache(BootstrapKey.Redis.REDIS_OFFLINE_MESSAGE_PREFIX_KEY + topic); - set.add(offlineMessageEntity, 1, TimeUnit.DAYS); + RSetCache set = redissonClient.getSetCache(BootstrapKey.Redis.REDIS_OFFLINE_MESSAGE_PREFIX_KEY + topic); + set.add(offlineMessageEntity, 7, TimeUnit.DAYS); } catch (Exception e) { log.error("saveOfflineMessage error message: {}", topic, e); } -- Gitee From 6cb44c5170050a153adbf86d6805e2d857c00d08 Mon Sep 17 00:00:00 2001 From: zhoujingwu Date: Tue, 25 Jan 2022 17:10:18 +0800 Subject: [PATCH 8/9] Adjust: delete local path config --- .../src/main/java/io/github/quickmsg/AbstractStarter.java | 1 - 1 file changed, 1 deletion(-) diff --git a/smqtt-bootstrap/src/main/java/io/github/quickmsg/AbstractStarter.java b/smqtt-bootstrap/src/main/java/io/github/quickmsg/AbstractStarter.java index b41176df..a41fc41d 100644 --- a/smqtt-bootstrap/src/main/java/io/github/quickmsg/AbstractStarter.java +++ b/smqtt-bootstrap/src/main/java/io/github/quickmsg/AbstractStarter.java @@ -23,7 +23,6 @@ public abstract class AbstractStarter { public static void start(String path) { - path = "D:\\xuanwu\\IDEA\\mqtt-cluster\\smqtt-bootstrap\\src\\main\\resources\\application.yml"; BootstrapConfig config = null; if (path != null) { if (path.endsWith(FileExtension.PROPERTIES_SYMBOL)) { -- Gitee From 01b82be3267f9b30b1f2a4711a87567515c013f8 Mon Sep 17 00:00:00 2001 From: zhoujingwu Date: Tue, 25 Jan 2022 18:15:36 +0800 Subject: [PATCH 9/9] =?UTF-8?q?Adjust:=20=E6=8C=81=E4=B9=85=E5=8C=96?= =?UTF-8?q?=E6=96=B9=E5=BC=8F=E6=94=AF=E6=8C=81=E5=8F=AF=E9=85=8D=E7=BD=AE?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../java/io/github/quickmsg/AbstractStarter.java | 1 + .../src/main/resources/application.yml | 2 ++ .../quickmsg/common/config/BootstrapConfig.java | 5 +++++ .../quickmsg/common/config/Configuration.java | 8 ++++++-- .../java/io/github/quickmsg/core/Bootstrap.java | 3 +++ .../quickmsg/core/http/HttpConfiguration.java | 2 ++ .../core/mqtt/AbstractReceiveContext.java | 15 ++++++++++++--- .../quickmsg/core/mqtt/MqttConfiguration.java | 2 ++ .../quickmsg/starter/AutoMqttConfiguration.java | 1 + .../quickmsg/starter/SpringBootstrapConfig.java | 4 ++++ 10 files changed, 38 insertions(+), 5 deletions(-) diff --git a/smqtt-bootstrap/src/main/java/io/github/quickmsg/AbstractStarter.java b/smqtt-bootstrap/src/main/java/io/github/quickmsg/AbstractStarter.java index a41fc41d..a26dffde 100644 --- a/smqtt-bootstrap/src/main/java/io/github/quickmsg/AbstractStarter.java +++ b/smqtt-bootstrap/src/main/java/io/github/quickmsg/AbstractStarter.java @@ -53,6 +53,7 @@ public abstract class AbstractStarter { .httpConfig(config.getSmqttConfig().getHttpConfig()) .websocketConfig(config.getSmqttConfig().getWebsocketConfig()) .clusterConfig(config.getSmqttConfig().getClusterConfig()) + .persistenceType(config.getPersistenceType()) .redisConfig(config.getSmqttConfig().getRedisConfig()) .databaseConfig(config.getSmqttConfig().getDatabaseConfig()) .meterConfig(config.getSmqttConfig().getMeterConfig()) diff --git a/smqtt-bootstrap/src/main/resources/application.yml b/smqtt-bootstrap/src/main/resources/application.yml index 4e3dad5f..62e3393d 100644 --- a/smqtt-bootstrap/src/main/resources/application.yml +++ b/smqtt-bootstrap/src/main/resources/application.yml @@ -40,6 +40,8 @@ port: 7777 # 用于映射容器端口 请不要随意设置,如果不需要请移除此选项 meter: meterType: PROMETHEUS # INFLUXDB , PROMETHEUS + + persistenceType: 1 # 持久化方式,0:内存(默认),1:Redis,2:DB db: # 参数值配置参考https://github.com/brettwooldridge/HikariCP/wiki/MySQL-Configuration jdbcUrl: jdbc:mysql://172.16.1.77:3306/smqtt username: root diff --git a/smqtt-common/src/main/java/io/github/quickmsg/common/config/BootstrapConfig.java b/smqtt-common/src/main/java/io/github/quickmsg/common/config/BootstrapConfig.java index 6776a4da..c563a3d5 100644 --- a/smqtt-common/src/main/java/io/github/quickmsg/common/config/BootstrapConfig.java +++ b/smqtt-common/src/main/java/io/github/quickmsg/common/config/BootstrapConfig.java @@ -279,6 +279,11 @@ public class BootstrapConfig { private ClusterExternal external; } + /** + * 持久化方式,0:内存(默认),1:Redis,2:DB + */ + private Integer persistenceType; + @Data @Builder @NoArgsConstructor diff --git a/smqtt-common/src/main/java/io/github/quickmsg/common/config/Configuration.java b/smqtt-common/src/main/java/io/github/quickmsg/common/config/Configuration.java index ca994d2b..de77bc9a 100644 --- a/smqtt-common/src/main/java/io/github/quickmsg/common/config/Configuration.java +++ b/smqtt-common/src/main/java/io/github/quickmsg/common/config/Configuration.java @@ -122,8 +122,12 @@ public interface Configuration { */ BootstrapConfig.MeterConfig getMeterConfig(); - - + /** + * 获取持久化方式 + * + * @return + */ + Integer getPersistenceType(); diff --git a/smqtt-core/src/main/java/io/github/quickmsg/core/Bootstrap.java b/smqtt-core/src/main/java/io/github/quickmsg/core/Bootstrap.java index 2ab1843f..407ed515 100644 --- a/smqtt-core/src/main/java/io/github/quickmsg/core/Bootstrap.java +++ b/smqtt-core/src/main/java/io/github/quickmsg/core/Bootstrap.java @@ -43,6 +43,8 @@ public class Bootstrap { private BootstrapConfig.ClusterConfig clusterConfig; + private Integer persistenceType; + private BootstrapConfig.RedisConfig redisConfig; private BootstrapConfig.DatabaseConfig databaseConfig; @@ -94,6 +96,7 @@ public class Bootstrap { Optional.ofNullable(tcpConfig.getMessageMaxSize()).ifPresent(mqttConfiguration::setMessageMaxSize); Optional.ofNullable(clusterConfig).ifPresent(mqttConfiguration::setClusterConfig); Optional.ofNullable(meterConfig).ifPresent(mqttConfiguration::setMeterConfig); + Optional.ofNullable(persistenceType).ifPresent(mqttConfiguration::setPersistenceType); if (websocketConfig != null && websocketConfig.isEnable()) { mqttConfiguration.setWebSocketPort(websocketConfig.getPort()); diff --git a/smqtt-core/src/main/java/io/github/quickmsg/core/http/HttpConfiguration.java b/smqtt-core/src/main/java/io/github/quickmsg/core/http/HttpConfiguration.java index 12fc2b00..8c9a6161 100644 --- a/smqtt-core/src/main/java/io/github/quickmsg/core/http/HttpConfiguration.java +++ b/smqtt-core/src/main/java/io/github/quickmsg/core/http/HttpConfiguration.java @@ -37,6 +37,8 @@ public class HttpConfiguration implements Configuration { private BootstrapConfig.MeterConfig meterConfig; + private Integer persistenceType = 0; + @Override public ConnectModel getConnectModel() { diff --git a/smqtt-core/src/main/java/io/github/quickmsg/core/mqtt/AbstractReceiveContext.java b/smqtt-core/src/main/java/io/github/quickmsg/core/mqtt/AbstractReceiveContext.java index 288e898e..2b78f347 100644 --- a/smqtt-core/src/main/java/io/github/quickmsg/core/mqtt/AbstractReceiveContext.java +++ b/smqtt-core/src/main/java/io/github/quickmsg/core/mqtt/AbstractReceiveContext.java @@ -30,6 +30,8 @@ import io.github.quickmsg.core.spi.DefaultTopicRegistry; import io.github.quickmsg.dsl.RuleDslParser; import io.github.quickmsg.metric.InfluxDbMetricFactory; import io.github.quickmsg.metric.PrometheusMetricFactory; +import io.github.quickmsg.persistent.registry.DbMessageRegistry; +import io.github.quickmsg.persistent.registry.RedisMessageRegistry; import io.github.quickmsg.rule.source.SourceManager; import io.netty.handler.traffic.GlobalChannelTrafficShapingHandler; import io.netty.handler.traffic.GlobalTrafficShapingHandler; @@ -91,7 +93,7 @@ public abstract class AbstractReceiveContext implements this.topicRegistry = topicRegistry(); this.loopResources = LoopResources.create("smqtt-cluster-io", configuration.getBossThreadSize(), configuration.getWorkThreadSize(), true); this.trafficHandlerLoader = trafficHandlerLoader(); - this.messageRegistry = messageRegistry(); + this.messageRegistry = messageRegistry(abstractConfiguration.getPersistenceType()); this.clusterRegistry = clusterRegistry(); this.passwordAuthentication = basicAuthentication(); this.channelRegistry.startUp(abstractConfiguration.getEnvironmentMap()); @@ -122,8 +124,15 @@ public abstract class AbstractReceiveContext implements return Event::sender; } - private MessageRegistry messageRegistry() { - return Optional.ofNullable(MessageRegistry.INSTANCE).orElseGet(DefaultMessageRegistry::new); + private MessageRegistry messageRegistry(int persistenceType) { + switch (persistenceType) { + case 1: + return new RedisMessageRegistry(); + case 2: + return new DbMessageRegistry(); + default: + return new DefaultMessageRegistry(); + } } private PasswordAuthentication basicAuthentication() { diff --git a/smqtt-core/src/main/java/io/github/quickmsg/core/mqtt/MqttConfiguration.java b/smqtt-core/src/main/java/io/github/quickmsg/core/mqtt/MqttConfiguration.java index 03bda94c..e3444aaf 100644 --- a/smqtt-core/src/main/java/io/github/quickmsg/core/mqtt/MqttConfiguration.java +++ b/smqtt-core/src/main/java/io/github/quickmsg/core/mqtt/MqttConfiguration.java @@ -69,4 +69,6 @@ public class MqttConfiguration extends AbstractSslHandler implements AbstractCon private Integer messageMaxSize = 4194304; + private Integer persistenceType = 0; + } diff --git a/smqtt-spring-boot-starter/src/main/java/io/github/quickmsg/starter/AutoMqttConfiguration.java b/smqtt-spring-boot-starter/src/main/java/io/github/quickmsg/starter/AutoMqttConfiguration.java index 60132949..ba7bcae6 100644 --- a/smqtt-spring-boot-starter/src/main/java/io/github/quickmsg/starter/AutoMqttConfiguration.java +++ b/smqtt-spring-boot-starter/src/main/java/io/github/quickmsg/starter/AutoMqttConfiguration.java @@ -35,6 +35,7 @@ public class AutoMqttConfiguration { .httpConfig(springBootstrapConfig.getHttp()) .websocketConfig(springBootstrapConfig.getWs()) .clusterConfig(springBootstrapConfig.getCluster()) + .persistenceType(springBootstrapConfig.getPersistenceType()) .redisConfig(springBootstrapConfig.getRedis()) .databaseConfig(springBootstrapConfig.getDb()) .ruleChainDefinitions(springBootstrapConfig.getRules()) diff --git a/smqtt-spring-boot-starter/src/main/java/io/github/quickmsg/starter/SpringBootstrapConfig.java b/smqtt-spring-boot-starter/src/main/java/io/github/quickmsg/starter/SpringBootstrapConfig.java index 7bf84d6c..b49ab112 100644 --- a/smqtt-spring-boot-starter/src/main/java/io/github/quickmsg/starter/SpringBootstrapConfig.java +++ b/smqtt-spring-boot-starter/src/main/java/io/github/quickmsg/starter/SpringBootstrapConfig.java @@ -63,6 +63,10 @@ public class SpringBootstrapConfig { private BootstrapConfig.MeterConfig meter; + /** + * 持久化方式,0:内存(默认),1:Redis,2:DB + */ + private Integer persistenceType; /** * 数据库配置 */ -- Gitee