diff --git a/smqtt-bootstrap/pom.xml b/smqtt-bootstrap/pom.xml
index 1f67f6d19a35971acb604dc67d441429e65b3469..dddb998cd53713e5caade0066e83ac9340ecdc64 100644
--- a/smqtt-bootstrap/pom.xml
+++ b/smqtt-bootstrap/pom.xml
@@ -57,6 +57,21 @@
io.github.quickmsg
1.1.2
+
+ org.springframework.boot
+ spring-boot
+ 2.2.1.RELEASE
+
+
+ org.springframework.boot
+ spring-boot-autoconfigure
+ 2.2.1.RELEASE
+
+
+ io.github.quickmsg
+ smqtt-spring-boot-starter
+ 1.1.2
+
diff --git a/smqtt-bootstrap/src/main/java/io/github/quickmsg/AbstractStarter.java b/smqtt-bootstrap/src/main/java/io/github/quickmsg/AbstractStarter.java
index a41fc41d5b650df8ceebad70243c64562245edf9..a26dffdedd08dc9744eae506a30266df3f8d1068 100644
--- a/smqtt-bootstrap/src/main/java/io/github/quickmsg/AbstractStarter.java
+++ b/smqtt-bootstrap/src/main/java/io/github/quickmsg/AbstractStarter.java
@@ -53,6 +53,7 @@ public abstract class AbstractStarter {
.httpConfig(config.getSmqttConfig().getHttpConfig())
.websocketConfig(config.getSmqttConfig().getWebsocketConfig())
.clusterConfig(config.getSmqttConfig().getClusterConfig())
+ .persistenceType(config.getPersistenceType())
.redisConfig(config.getSmqttConfig().getRedisConfig())
.databaseConfig(config.getSmqttConfig().getDatabaseConfig())
.meterConfig(config.getSmqttConfig().getMeterConfig())
diff --git a/smqtt-bootstrap/src/main/java/io/github/quickmsg/springboot/SpringBootStarter.java b/smqtt-bootstrap/src/main/java/io/github/quickmsg/springboot/SpringBootStarter.java
new file mode 100644
index 0000000000000000000000000000000000000000..4bb5e3e1c12b9e3b0650351aa778f6362768f22f
--- /dev/null
+++ b/smqtt-bootstrap/src/main/java/io/github/quickmsg/springboot/SpringBootStarter.java
@@ -0,0 +1,20 @@
+package io.github.quickmsg.springboot;
+
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+import org.springframework.context.annotation.ComponentScan;
+
+import io.github.quickmsg.starter.EnableMqttServer;
+
+/**
+ * @Author: Jingwu.Zhou
+ * @Date: 2022/1/24
+ */
+@SpringBootApplication
+@ComponentScan(basePackages = {"io.github.quickmsg"})
+@EnableMqttServer
+public class SpringBootStarter {
+ public static void main(String[] args) {
+ SpringApplication.run(SpringBootStarter.class, args);
+ }
+}
diff --git a/smqtt-bootstrap/src/main/resources/application.yml b/smqtt-bootstrap/src/main/resources/application.yml
new file mode 100644
index 0000000000000000000000000000000000000000..62e3393d6f623aedd6a61331831ef23651f91763
--- /dev/null
+++ b/smqtt-bootstrap/src/main/resources/application.yml
@@ -0,0 +1,79 @@
+ smqtt:
+ logLevel: INFO # 系统日志
+ tcp: # tcp配置
+ port: 1883 # mqtt端口号
+ username: smqtt # mqtt连接默认用户名 生产环境建议spi去注入PasswordAuthentication接口
+ password: smqtt # mqtt连接默认密码 生产环境建议spi去注入PasswordAuthentication接口
+ wiretap: true # 二进制日志 前提是 smqtt.logLevel = DEBUG
+ bossThreadSize: 4 # boss线程
+ workThreadSize: 8 # work线程
+ lowWaterMark: 4000000 # 不建议配置 默认 32768
+ highWaterMark: 80000000 # 不建议配置 默认 65536
+ businessThreadSize: 16 # 业务线程数 默认=cpu核心数*10
+ businessQueueSize: 100000 #业务队列 默认=100000
+ # globalReadWriteSize: 10000000,100000000 全局读写大小限制
+ # channelReadWriteSize: 10000000,100000000 单个channel读写大小限制
+ ssl: # ssl配置
+ enable: false # 开关
+ key: /user/server.key # 指定ssl文件 默认系统生成
+ crt: /user/server.crt # 指定ssl文件 默认系统生成
+ http: # http相关配置 端口固定60000
+ enable: true # 开关
+ accessLog: false # http访问日志
+ ssl: # ssl配置
+ enable: false
+ admin: # 后台管理配置
+ enable: true # 开关
+ username: smqtt # 访问用户名
+ password: smqtt # 访问密码
+ ws: # websocket配置
+ enable: false # 开关
+ port: 8999 # 端口
+ path: /mqtt # ws 的访问path mqtt.js请设置此选项
+ cluster: # 集群配置
+ enable: false # 集群开关
+ url: 127.0.0.1:7771,127.0.0.1:7772 # 启动节点
+ port: 7771 # 端口
+ node: node-1 # 集群节点名称 唯一
+ external:
+ host: localhost # 用于映射容器ip 请不要随意设置,如果不需要请移除此选项
+ port: 7777 # 用于映射容器端口 请不要随意设置,如果不需要请移除此选项
+ meter:
+ meterType: PROMETHEUS # INFLUXDB , PROMETHEUS
+
+ persistenceType: 1 # 持久化方式,0:内存(默认),1:Redis,2:DB
+ db: # 参数值配置参考https://github.com/brettwooldridge/HikariCP/wiki/MySQL-Configuration
+ jdbcUrl: jdbc:mysql://172.16.1.77:3306/smqtt
+ username: root
+ password: jx@xw!@#$~|{}
+ dataSourceCachePrepStmts: false
+ dataSourcePrepStmtCacheSize: 250
+ dataSourcePrepStmtCacheSqlLimit: 2048
+ dataSourceUseServerPrepStmts: true
+ dataSourceUseLocalSessionState: true
+ dataSourceRewriteBatchedStatements: true
+ dataSourceCacheResultSetMetadata: true
+ dataSourceCacheServerConfiguration: true
+ dataSourceElideSetAutoCommits: true
+ dataSourceMaintainTimeStats: false
+ redis: # redis 请参考 https://doc.smqtt.cc/%E5%85%B6%E4%BB%96/1.store.html 【如果没有引入相关依赖请移除此配置】
+ mode: sentinel
+ database: 7
+ password: xuanwu-T3st*17
+ timeout: 3000
+ poolMinIdle: 8
+ poolConnTimeout: 3000
+ poolSize: 10
+ single:
+ address: 172.16.1.16:6392
+ cluster:
+ scanInterval: 1000
+ nodes: 172.16.1.16:6390,172.16.1.16:6391,172.16.1.16:6392
+ readMode: SLAVE
+ retryAttempts: 3
+ slaveConnectionPoolSize: 64
+ masterConnectionPoolSize: 64
+ retryInterval: 1500
+ sentinel:
+ master: mq_master
+ nodes: 172.16.1.16:26371,172.16.1.16:26372,172.16.1.16:26373
\ No newline at end of file
diff --git a/smqtt-bootstrap/src/main/resources/test.yaml b/smqtt-bootstrap/src/main/resources/test.yaml
deleted file mode 100644
index e4203080c7a10e0811258fd56cb1cf8b0f78465e..0000000000000000000000000000000000000000
--- a/smqtt-bootstrap/src/main/resources/test.yaml
+++ /dev/null
@@ -1,3 +0,0 @@
-smqtt:
- tcp:
- port: 7000
\ No newline at end of file
diff --git a/smqtt-common/src/main/java/io/github/quickmsg/common/bootstrap/BootstrapKey.java b/smqtt-common/src/main/java/io/github/quickmsg/common/bootstrap/BootstrapKey.java
index ad5a890440d307136f1e9d9571441c9df4f33736..bd7da1b0cf77f7b9c6426c927b41ca9b3459ae3f 100644
--- a/smqtt-common/src/main/java/io/github/quickmsg/common/bootstrap/BootstrapKey.java
+++ b/smqtt-common/src/main/java/io/github/quickmsg/common/bootstrap/BootstrapKey.java
@@ -14,6 +14,9 @@ public class BootstrapKey {
/*redis前缀key*/
public static final String REDIS_SESSION_MESSAGE_PREFIX_KEY = "smqtt:session:message:";
+ /*redis前缀key 离线消息*/
+ public static final String REDIS_OFFLINE_MESSAGE_PREFIX_KEY = "smqtt:offline:message:";
+
/*模式*/
public static final String REDIS_MODE = "redis.mode";
diff --git a/smqtt-common/src/main/java/io/github/quickmsg/common/config/BootstrapConfig.java b/smqtt-common/src/main/java/io/github/quickmsg/common/config/BootstrapConfig.java
index 6776a4da13c2a270c997bd50798e6862b300247e..c563a3d5bdfb9ca8739681ab2706393016e3c58e 100644
--- a/smqtt-common/src/main/java/io/github/quickmsg/common/config/BootstrapConfig.java
+++ b/smqtt-common/src/main/java/io/github/quickmsg/common/config/BootstrapConfig.java
@@ -279,6 +279,11 @@ public class BootstrapConfig {
private ClusterExternal external;
}
+ /**
+ * 持久化方式,0:内存(默认),1:Redis,2:DB
+ */
+ private Integer persistenceType;
+
@Data
@Builder
@NoArgsConstructor
diff --git a/smqtt-common/src/main/java/io/github/quickmsg/common/config/Configuration.java b/smqtt-common/src/main/java/io/github/quickmsg/common/config/Configuration.java
index ca994d2b793e07445cb35b8d9d21f7d34250ec6e..de77bc9affc551ac4748f0fe4fc369732b2daea0 100644
--- a/smqtt-common/src/main/java/io/github/quickmsg/common/config/Configuration.java
+++ b/smqtt-common/src/main/java/io/github/quickmsg/common/config/Configuration.java
@@ -122,8 +122,12 @@ public interface Configuration {
*/
BootstrapConfig.MeterConfig getMeterConfig();
-
-
+ /**
+ * 获取持久化方式
+ *
+ * @return
+ */
+ Integer getPersistenceType();
diff --git a/smqtt-common/src/main/java/io/github/quickmsg/common/interceptor/MessageProxy.java b/smqtt-common/src/main/java/io/github/quickmsg/common/interceptor/MessageProxy.java
index cb6d17b619f128b6376236f54806ea4c049095b9..e34f7c8ab2697a988d3869a33468ae79f114b8da 100644
--- a/smqtt-common/src/main/java/io/github/quickmsg/common/interceptor/MessageProxy.java
+++ b/smqtt-common/src/main/java/io/github/quickmsg/common/interceptor/MessageProxy.java
@@ -78,6 +78,7 @@ public class MessageProxy {
.topic(header.topicName())
.retain(fixedHeader.isRetain())
.qos(fixedHeader.qosLevel().value())
+ .properties(header.properties())
.build();
}
diff --git a/smqtt-common/src/main/java/io/github/quickmsg/common/message/HeapMqttMessage.java b/smqtt-common/src/main/java/io/github/quickmsg/common/message/HeapMqttMessage.java
index 19c340dac9389abc45642e94393119623571ad09..2ab22342784775c9b8d3cc01102251df3380543e 100644
--- a/smqtt-common/src/main/java/io/github/quickmsg/common/message/HeapMqttMessage.java
+++ b/smqtt-common/src/main/java/io/github/quickmsg/common/message/HeapMqttMessage.java
@@ -1,6 +1,7 @@
package io.github.quickmsg.common.message;
import io.github.quickmsg.common.utils.JacksonUtil;
+import io.netty.handler.codec.mqtt.MqttProperties;
import io.netty.util.internal.StringUtil;
import lombok.AllArgsConstructor;
import lombok.Builder;
@@ -31,6 +32,8 @@ public class HeapMqttMessage {
private byte[] message;
+ private MqttProperties properties;
+
public Map getKeyMap() {
Map keys = new HashMap<>(5);
diff --git a/smqtt-common/src/main/java/io/github/quickmsg/common/message/MessageRegistry.java b/smqtt-common/src/main/java/io/github/quickmsg/common/message/MessageRegistry.java
index c21887dba432d7aa5498b958707ec25a005e632f..3a5d4a6f8e2bd62f37a3fcfbdfce88a7992e79a5 100644
--- a/smqtt-common/src/main/java/io/github/quickmsg/common/message/MessageRegistry.java
+++ b/smqtt-common/src/main/java/io/github/quickmsg/common/message/MessageRegistry.java
@@ -47,4 +47,18 @@ public interface MessageRegistry extends StartUp {
List getRetainMessage(String topic);
+ /**
+ * 持久化离线消息
+ *
+ * @param message
+ */
+ void saveOfflineMessage(OfflineMessage message);
+
+ /**
+ * 获取离线消息
+ *
+ * @param topic
+ * @return
+ */
+ List getOfflineMessage(String topic);
}
diff --git a/smqtt-common/src/main/java/io/github/quickmsg/common/message/MqttMessageBuilder.java b/smqtt-common/src/main/java/io/github/quickmsg/common/message/MqttMessageBuilder.java
index d6a16539cc6c0dfaa5f0e63141210ed658e9ea7f..5346a17693eb9bbb3f86e9ab851216c3a343735c 100644
--- a/smqtt-common/src/main/java/io/github/quickmsg/common/message/MqttMessageBuilder.java
+++ b/smqtt-common/src/main/java/io/github/quickmsg/common/message/MqttMessageBuilder.java
@@ -4,6 +4,13 @@ import io.netty.buffer.ByteBuf;
import io.netty.handler.codec.mqtt.*;
import java.util.List;
+import java.util.Map;
+
+import static io.netty.handler.codec.mqtt.MqttConnectReturnCode.CONNECTION_REFUSED_BAD_USERNAME_OR_PASSWORD;
+import static io.netty.handler.codec.mqtt.MqttConnectReturnCode.CONNECTION_REFUSED_CLIENT_IDENTIFIER_NOT_VALID;
+import static io.netty.handler.codec.mqtt.MqttConnectReturnCode.CONNECTION_REFUSED_NOT_AUTHORIZED_5;
+import static io.netty.handler.codec.mqtt.MqttConnectReturnCode.CONNECTION_REFUSED_SERVER_UNAVAILABLE_5;
+import static io.netty.handler.codec.mqtt.MqttConnectReturnCode.CONNECTION_REFUSED_UNSUPPORTED_PROTOCOL_VERSION;
/**
@@ -11,6 +18,32 @@ import java.util.List;
*/
public class MqttMessageBuilder {
+ private static MqttProperties genMqttProperties(Map userPropertiesMap) {
+ MqttProperties mqttProperties = null;
+ if (userPropertiesMap != null) {
+ mqttProperties = new MqttProperties();
+ MqttProperties.UserProperties userProperties = new MqttProperties.UserProperties();
+ for (Map.Entry entry : userPropertiesMap.entrySet()) {
+ userProperties.add(entry.getKey(), entry.getValue());
+ }
+ mqttProperties.add(userProperties);
+ }
+ return mqttProperties;
+ }
+
+ public static MqttPublishMessage buildPub(boolean isDup, MqttQoS qoS, int messageId, String topic, ByteBuf message, MqttProperties properties) {
+ MqttFixedHeader mqttFixedHeader = new MqttFixedHeader(MqttMessageType.PUBLISH, isDup, qoS, false, 0);
+ MqttPublishVariableHeader mqttPublishVariableHeader = new MqttPublishVariableHeader(topic, messageId, properties);
+ MqttPublishMessage mqttPublishMessage = new MqttPublishMessage(mqttFixedHeader, mqttPublishVariableHeader, message);
+ return mqttPublishMessage;
+ }
+
+ public static MqttPublishMessage buildPub(boolean isDup, MqttQoS qoS, int messageId, String topic, ByteBuf message, Map userPropertiesMap) {
+ MqttFixedHeader mqttFixedHeader = new MqttFixedHeader(MqttMessageType.PUBLISH, isDup, qoS, false, 0);
+ MqttPublishVariableHeader mqttPublishVariableHeader = new MqttPublishVariableHeader(topic, messageId, genMqttProperties(userPropertiesMap));
+ MqttPublishMessage mqttPublishMessage = new MqttPublishMessage(mqttFixedHeader, mqttPublishVariableHeader, message);
+ return mqttPublishMessage;
+ }
public static MqttPublishMessage buildPub(boolean isDup, MqttQoS qoS, int messageId, String topic, ByteBuf message) {
MqttFixedHeader mqttFixedHeader = new MqttFixedHeader(MqttMessageType.PUBLISH, isDup, qoS, false, 0);
@@ -69,8 +102,33 @@ public class MqttMessageBuilder {
return new MqttUnsubAckMessage(mqttFixedHeader, variableHeader);
}
- public static MqttConnAckMessage buildConnectAck(MqttConnectReturnCode connectReturnCode) {
- MqttConnAckVariableHeader mqttConnAckVariableHeader = new MqttConnAckVariableHeader(connectReturnCode, false);
+ public static MqttConnAckMessage buildConnectAck(MqttConnectReturnCode connectReturnCode, int version) {
+ MqttProperties properties = MqttProperties.NO_PROPERTIES;
+ if (MqttVersion.MQTT_5.protocolLevel() == (byte) version) {
+ properties = new MqttProperties();
+ properties.add(new MqttProperties.IntegerProperty(MqttProperties.MqttPropertyType.RETAIN_AVAILABLE.value(), 1));
+ // don't support shared subscription
+ properties.add(new MqttProperties.IntegerProperty(MqttProperties.MqttPropertyType.SHARED_SUBSCRIPTION_AVAILABLE.value(), 0));
+ switch (connectReturnCode) {
+ case CONNECTION_REFUSED_IDENTIFIER_REJECTED:
+ connectReturnCode = CONNECTION_REFUSED_CLIENT_IDENTIFIER_NOT_VALID;
+ break;
+ case CONNECTION_REFUSED_UNACCEPTABLE_PROTOCOL_VERSION:
+ connectReturnCode = CONNECTION_REFUSED_UNSUPPORTED_PROTOCOL_VERSION;
+ break;
+ case CONNECTION_REFUSED_SERVER_UNAVAILABLE:
+ connectReturnCode = CONNECTION_REFUSED_SERVER_UNAVAILABLE_5;
+ break;
+ case CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD:
+ connectReturnCode = CONNECTION_REFUSED_BAD_USERNAME_OR_PASSWORD;
+ break;
+ case CONNECTION_REFUSED_NOT_AUTHORIZED:
+ connectReturnCode = CONNECTION_REFUSED_NOT_AUTHORIZED_5;
+ break;
+
+ }
+ }
+ MqttConnAckVariableHeader mqttConnAckVariableHeader = new MqttConnAckVariableHeader(connectReturnCode, false, properties);
MqttFixedHeader mqttFixedHeader = new MqttFixedHeader(
MqttMessageType.CONNACK, false, MqttQoS.AT_MOST_ONCE, false, 0X02);
return new MqttConnAckMessage(mqttFixedHeader, mqttConnAckVariableHeader);
diff --git a/smqtt-common/src/main/java/io/github/quickmsg/common/message/OfflineMessage.java b/smqtt-common/src/main/java/io/github/quickmsg/common/message/OfflineMessage.java
new file mode 100644
index 0000000000000000000000000000000000000000..9771c5657bf2410287f92d32279b32d1fcb3f9f1
--- /dev/null
+++ b/smqtt-common/src/main/java/io/github/quickmsg/common/message/OfflineMessage.java
@@ -0,0 +1,62 @@
+package io.github.quickmsg.common.message;
+
+import java.util.HashMap;
+import java.util.Optional;
+
+import io.github.quickmsg.common.channel.MqttChannel;
+import io.github.quickmsg.common.utils.JacksonUtil;
+import io.github.quickmsg.common.utils.MessageUtils;
+import io.netty.buffer.PooledByteBufAllocator;
+import io.netty.handler.codec.mqtt.MqttProperties;
+import io.netty.handler.codec.mqtt.MqttPublishMessage;
+import io.netty.handler.codec.mqtt.MqttPublishVariableHeader;
+import io.netty.handler.codec.mqtt.MqttQoS;
+import lombok.Builder;
+import lombok.Data;
+
+/**
+ * @Author: Jingwu.Zhou
+ * @Date: 2022/1/24
+ */
+@Data
+@Builder
+public class OfflineMessage {
+ private int qos;
+
+ private String topic;
+
+ private byte[] body;
+
+ private String userProperties;
+
+ public static OfflineMessage of(MqttPublishMessage mqttPublishMessage) {
+ MqttPublishVariableHeader publishVariableHeader = mqttPublishMessage.variableHeader();
+ return OfflineMessage.builder()
+ .topic(publishVariableHeader.topicName())
+ .qos(mqttPublishMessage.fixedHeader().qosLevel().value())
+ .body(MessageUtils.copyByteBuf(mqttPublishMessage.payload()))
+ .userProperties(JacksonUtil.map2Json(Optional.ofNullable(publishVariableHeader
+ .properties()
+ .getProperties(MqttProperties.MqttPropertyType.USER_PROPERTY.value()))
+ .map(list -> {
+ HashMap propertiesMap = new HashMap<>(list.size());
+ list.forEach(property -> {
+ MqttProperties.StringPair pair = (MqttProperties.StringPair) property.value();
+ propertiesMap.put(pair.key, pair.value);
+ });
+ return propertiesMap;
+ }).orElseGet(HashMap::new)))
+ .build();
+ }
+
+ public MqttPublishMessage toPublishMessage(MqttChannel mqttChannel) {
+ return MqttMessageBuilder.buildPub(
+ false,
+ MqttQoS.valueOf(this.qos),
+ qos > 0 ? mqttChannel.generateMessageId() : 0,
+ topic,
+ PooledByteBufAllocator.DEFAULT.directBuffer().writeBytes(body),
+ JacksonUtil.json2Map(userProperties, String.class, String.class));
+ }
+
+}
diff --git a/smqtt-common/src/main/java/io/github/quickmsg/common/message/RetainMessage.java b/smqtt-common/src/main/java/io/github/quickmsg/common/message/RetainMessage.java
index fbc623529cdcbc7e0f9fb110feb2f8899ebc30e4..2d67c21e50281155a26fec9f839a6bfa3c8929a7 100644
--- a/smqtt-common/src/main/java/io/github/quickmsg/common/message/RetainMessage.java
+++ b/smqtt-common/src/main/java/io/github/quickmsg/common/message/RetainMessage.java
@@ -1,8 +1,13 @@
package io.github.quickmsg.common.message;
+import java.util.HashMap;
+import java.util.Optional;
+
import io.github.quickmsg.common.channel.MqttChannel;
+import io.github.quickmsg.common.utils.JacksonUtil;
import io.github.quickmsg.common.utils.MessageUtils;
import io.netty.buffer.PooledByteBufAllocator;
+import io.netty.handler.codec.mqtt.MqttProperties;
import io.netty.handler.codec.mqtt.MqttPublishMessage;
import io.netty.handler.codec.mqtt.MqttPublishVariableHeader;
import io.netty.handler.codec.mqtt.MqttQoS;
@@ -22,12 +27,25 @@ public class RetainMessage {
private byte[] body;
+ private String userProperties;
+
public static RetainMessage of(MqttPublishMessage mqttPublishMessage) {
MqttPublishVariableHeader publishVariableHeader = mqttPublishMessage.variableHeader();
return RetainMessage.builder()
.topic(publishVariableHeader.topicName())
.qos(mqttPublishMessage.fixedHeader().qosLevel().value())
.body(MessageUtils.copyByteBuf(mqttPublishMessage.payload()))
+ .userProperties(JacksonUtil.map2Json(Optional.ofNullable(publishVariableHeader
+ .properties()
+ .getProperties(MqttProperties.MqttPropertyType.USER_PROPERTY.value()))
+ .map(list -> {
+ HashMap propertiesMap = new HashMap<>(list.size());
+ list.forEach(property -> {
+ MqttProperties.StringPair pair = (MqttProperties.StringPair) property.value();
+ propertiesMap.put(pair.key, pair.value);
+ });
+ return propertiesMap;
+ }).orElseGet(HashMap::new)))
.build();
}
@@ -37,7 +55,8 @@ public class RetainMessage {
MqttQoS.valueOf(this.qos),
qos > 0 ? mqttChannel.generateMessageId() : 0,
topic,
- PooledByteBufAllocator.DEFAULT.directBuffer().writeBytes(body));
+ PooledByteBufAllocator.DEFAULT.directBuffer().writeBytes(body),
+ JacksonUtil.json2Map(userProperties, String.class, String.class));
}
}
diff --git a/smqtt-common/src/main/java/io/github/quickmsg/common/message/SessionMessage.java b/smqtt-common/src/main/java/io/github/quickmsg/common/message/SessionMessage.java
index a3596af3aa8e4686cf1786d39598480c42efc2d8..f5aecea5ff339c029a868517f2e1d7fb8c243de8 100644
--- a/smqtt-common/src/main/java/io/github/quickmsg/common/message/SessionMessage.java
+++ b/smqtt-common/src/main/java/io/github/quickmsg/common/message/SessionMessage.java
@@ -1,8 +1,13 @@
package io.github.quickmsg.common.message;
+import java.util.HashMap;
+import java.util.Optional;
+
import io.github.quickmsg.common.channel.MqttChannel;
+import io.github.quickmsg.common.utils.JacksonUtil;
import io.github.quickmsg.common.utils.MessageUtils;
import io.netty.buffer.PooledByteBufAllocator;
+import io.netty.handler.codec.mqtt.MqttProperties;
import io.netty.handler.codec.mqtt.MqttPublishMessage;
import io.netty.handler.codec.mqtt.MqttPublishVariableHeader;
import io.netty.handler.codec.mqtt.MqttQoS;
@@ -27,6 +32,8 @@ public class SessionMessage {
private boolean retain;
+ private String userProperties;
+
public static SessionMessage of(String clientIdentifier, MqttPublishMessage mqttPublishMessage) {
MqttPublishVariableHeader publishVariableHeader = mqttPublishMessage.variableHeader();
return SessionMessage.builder()
@@ -35,6 +42,17 @@ public class SessionMessage {
.qos(mqttPublishMessage.fixedHeader().qosLevel().value())
.retain(mqttPublishMessage.fixedHeader().isRetain())
.body(MessageUtils.copyByteBuf(mqttPublishMessage.payload()))
+ .userProperties(JacksonUtil.map2Json(Optional.ofNullable(publishVariableHeader
+ .properties()
+ .getProperties(MqttProperties.MqttPropertyType.USER_PROPERTY.value()))
+ .map(list -> {
+ HashMap propertiesMap = new HashMap<>(list.size());
+ list.forEach(property -> {
+ MqttProperties.StringPair pair = (MqttProperties.StringPair) property.value();
+ propertiesMap.put(pair.key, pair.value);
+ });
+ return propertiesMap;
+ }).orElseGet(HashMap::new)))
.build();
}
@@ -44,7 +62,8 @@ public class SessionMessage {
MqttQoS.valueOf(this.qos),
qos > 0 ? mqttChannel.generateMessageId() : 0,
topic,
- PooledByteBufAllocator.DEFAULT.directBuffer().writeBytes(body));
+ PooledByteBufAllocator.DEFAULT.directBuffer().writeBytes(body),
+ JacksonUtil.json2Map(userProperties, String.class, String.class));
}
}
diff --git a/smqtt-common/src/main/java/io/github/quickmsg/common/utils/MessageUtils.java b/smqtt-common/src/main/java/io/github/quickmsg/common/utils/MessageUtils.java
index fa197573b91107d755751afe9c9cf6c6e2877aa1..1b949a1639f554c09cee0e025b6eee41690182fa 100644
--- a/smqtt-common/src/main/java/io/github/quickmsg/common/utils/MessageUtils.java
+++ b/smqtt-common/src/main/java/io/github/quickmsg/common/utils/MessageUtils.java
@@ -67,7 +67,8 @@ public class MessageUtils {
MqttPublishVariableHeader mqttPublishVariableHeader = message.variableHeader();
MqttFixedHeader mqttFixedHeader = message.fixedHeader();
MqttFixedHeader newFixedHeader = new MqttFixedHeader(mqttFixedHeader.messageType(), false, mqttQoS, false, mqttFixedHeader.remainingLength());
- MqttPublishVariableHeader newHeader = new MqttPublishVariableHeader(mqttPublishVariableHeader.topicName(), messageId);
+ // mqtt 5 support properties
+ MqttPublishVariableHeader newHeader = new MqttPublishVariableHeader(mqttPublishVariableHeader.topicName(), messageId, mqttPublishVariableHeader.properties());
return new MqttPublishMessage(newFixedHeader, newHeader, message.payload().copy());
}
diff --git a/smqtt-core/pom.xml b/smqtt-core/pom.xml
index dbb26bfb59fcfad27bf26045f1afee2353a59415..be754e809a6fec72ef710ec6d16043eb366b682c 100644
--- a/smqtt-core/pom.xml
+++ b/smqtt-core/pom.xml
@@ -31,6 +31,16 @@
smqtt-metric-prometheus
1.1.2
+
+ io.github.quickmsg
+ smqtt-persistent-redis
+ 1.1.2
+
+
+ io.github.quickmsg
+ smqtt-persistent-db
+ 1.1.2
+
\ No newline at end of file
diff --git a/smqtt-core/src/main/java/io/github/quickmsg/core/Bootstrap.java b/smqtt-core/src/main/java/io/github/quickmsg/core/Bootstrap.java
index 2ab1843fde8d307151c4c7001508d22c63f2d7d7..407ed515b3c822ab82464c8bfa5c50e40032f730 100644
--- a/smqtt-core/src/main/java/io/github/quickmsg/core/Bootstrap.java
+++ b/smqtt-core/src/main/java/io/github/quickmsg/core/Bootstrap.java
@@ -43,6 +43,8 @@ public class Bootstrap {
private BootstrapConfig.ClusterConfig clusterConfig;
+ private Integer persistenceType;
+
private BootstrapConfig.RedisConfig redisConfig;
private BootstrapConfig.DatabaseConfig databaseConfig;
@@ -94,6 +96,7 @@ public class Bootstrap {
Optional.ofNullable(tcpConfig.getMessageMaxSize()).ifPresent(mqttConfiguration::setMessageMaxSize);
Optional.ofNullable(clusterConfig).ifPresent(mqttConfiguration::setClusterConfig);
Optional.ofNullable(meterConfig).ifPresent(mqttConfiguration::setMeterConfig);
+ Optional.ofNullable(persistenceType).ifPresent(mqttConfiguration::setPersistenceType);
if (websocketConfig != null && websocketConfig.isEnable()) {
mqttConfiguration.setWebSocketPort(websocketConfig.getPort());
diff --git a/smqtt-core/src/main/java/io/github/quickmsg/core/cluster/ClusterReceiver.java b/smqtt-core/src/main/java/io/github/quickmsg/core/cluster/ClusterReceiver.java
index d751f33ba3eb3b42e7f01106fdb31dc4cc4e4ec3..62d5f3a39dbf0be92f69cd5092cd3c3c2211d79f 100644
--- a/smqtt-core/src/main/java/io/github/quickmsg/core/cluster/ClusterReceiver.java
+++ b/smqtt-core/src/main/java/io/github/quickmsg/core/cluster/ClusterReceiver.java
@@ -56,7 +56,7 @@ public class ClusterReceiver {
MqttQoS.valueOf(heapMqttMessage.getQos()),
0,
heapMqttMessage.getTopic(),
- PooledByteBufAllocator.DEFAULT.buffer().writeBytes(heapMqttMessage.getMessage())), System.currentTimeMillis(), Boolean.TRUE);
+ PooledByteBufAllocator.DEFAULT.buffer().writeBytes(heapMqttMessage.getMessage()), heapMqttMessage.getProperties()), System.currentTimeMillis(), Boolean.TRUE);
}
}
diff --git a/smqtt-core/src/main/java/io/github/quickmsg/core/http/HttpConfiguration.java b/smqtt-core/src/main/java/io/github/quickmsg/core/http/HttpConfiguration.java
index 12fc2b003a703d0bb4a7593d0ae6078fcd258284..8c9a6161e1a7c903a6626e090e5f86732b039540 100644
--- a/smqtt-core/src/main/java/io/github/quickmsg/core/http/HttpConfiguration.java
+++ b/smqtt-core/src/main/java/io/github/quickmsg/core/http/HttpConfiguration.java
@@ -37,6 +37,8 @@ public class HttpConfiguration implements Configuration {
private BootstrapConfig.MeterConfig meterConfig;
+ private Integer persistenceType = 0;
+
@Override
public ConnectModel getConnectModel() {
diff --git a/smqtt-core/src/main/java/io/github/quickmsg/core/mqtt/AbstractReceiveContext.java b/smqtt-core/src/main/java/io/github/quickmsg/core/mqtt/AbstractReceiveContext.java
index 288e898e0ed8a0db2bbbafb88711cd2882db4864..2b78f347bcd47d769ee63d4eb2a3707d293710be 100644
--- a/smqtt-core/src/main/java/io/github/quickmsg/core/mqtt/AbstractReceiveContext.java
+++ b/smqtt-core/src/main/java/io/github/quickmsg/core/mqtt/AbstractReceiveContext.java
@@ -30,6 +30,8 @@ import io.github.quickmsg.core.spi.DefaultTopicRegistry;
import io.github.quickmsg.dsl.RuleDslParser;
import io.github.quickmsg.metric.InfluxDbMetricFactory;
import io.github.quickmsg.metric.PrometheusMetricFactory;
+import io.github.quickmsg.persistent.registry.DbMessageRegistry;
+import io.github.quickmsg.persistent.registry.RedisMessageRegistry;
import io.github.quickmsg.rule.source.SourceManager;
import io.netty.handler.traffic.GlobalChannelTrafficShapingHandler;
import io.netty.handler.traffic.GlobalTrafficShapingHandler;
@@ -91,7 +93,7 @@ public abstract class AbstractReceiveContext implements
this.topicRegistry = topicRegistry();
this.loopResources = LoopResources.create("smqtt-cluster-io", configuration.getBossThreadSize(), configuration.getWorkThreadSize(), true);
this.trafficHandlerLoader = trafficHandlerLoader();
- this.messageRegistry = messageRegistry();
+ this.messageRegistry = messageRegistry(abstractConfiguration.getPersistenceType());
this.clusterRegistry = clusterRegistry();
this.passwordAuthentication = basicAuthentication();
this.channelRegistry.startUp(abstractConfiguration.getEnvironmentMap());
@@ -122,8 +124,15 @@ public abstract class AbstractReceiveContext implements
return Event::sender;
}
- private MessageRegistry messageRegistry() {
- return Optional.ofNullable(MessageRegistry.INSTANCE).orElseGet(DefaultMessageRegistry::new);
+ private MessageRegistry messageRegistry(int persistenceType) {
+ switch (persistenceType) {
+ case 1:
+ return new RedisMessageRegistry();
+ case 2:
+ return new DbMessageRegistry();
+ default:
+ return new DefaultMessageRegistry();
+ }
}
private PasswordAuthentication basicAuthentication() {
diff --git a/smqtt-core/src/main/java/io/github/quickmsg/core/mqtt/MqttConfiguration.java b/smqtt-core/src/main/java/io/github/quickmsg/core/mqtt/MqttConfiguration.java
index 03bda94cb65e1bc261e3fb8cc9c0e1deaf84bf53..e3444aafe77a0bdeb9cdb9c5a2a7d30046c8631a 100644
--- a/smqtt-core/src/main/java/io/github/quickmsg/core/mqtt/MqttConfiguration.java
+++ b/smqtt-core/src/main/java/io/github/quickmsg/core/mqtt/MqttConfiguration.java
@@ -69,4 +69,6 @@ public class MqttConfiguration extends AbstractSslHandler implements AbstractCon
private Integer messageMaxSize = 4194304;
+ private Integer persistenceType = 0;
+
}
diff --git a/smqtt-core/src/main/java/io/github/quickmsg/core/protocol/ConnectProtocol.java b/smqtt-core/src/main/java/io/github/quickmsg/core/protocol/ConnectProtocol.java
index 6e6bd47a320e2d820938b5e5f7a4f306e5c15a18..e90042add3e51b4fc70d8ec7486c70ab5102a435 100644
--- a/smqtt-core/src/main/java/io/github/quickmsg/core/protocol/ConnectProtocol.java
+++ b/smqtt-core/src/main/java/io/github/quickmsg/core/protocol/ConnectProtocol.java
@@ -68,7 +68,7 @@ public class ConnectProtocol implements Protocol {
if (mqttReceiveContext.getConfiguration().getConnectModel() == ConnectModel.UNIQUE) {
if (channelRegistry.exists(clientIdentifier)) {
return mqttChannel.write(
- MqttMessageBuilder.buildConnectAck(MqttConnectReturnCode.CONNECTION_REFUSED_IDENTIFIER_REJECTED),
+ MqttMessageBuilder.buildConnectAck(MqttConnectReturnCode.CONNECTION_REFUSED_IDENTIFIER_REJECTED, mqttConnectVariableHeader.version()),
false).then(mqttChannel.close());
}
} else {
@@ -78,16 +78,16 @@ public class ConnectProtocol implements Protocol {
existMqttChannel.close().subscribe();
} else {
return mqttChannel.write(
- MqttMessageBuilder.buildConnectAck(MqttConnectReturnCode.CONNECTION_REFUSED_IDENTIFIER_REJECTED),
+ MqttMessageBuilder.buildConnectAck(MqttConnectReturnCode.CONNECTION_REFUSED_IDENTIFIER_REJECTED, mqttConnectVariableHeader.version()),
false).then(mqttChannel.close());
}
}
}
/*protocol version support*/
if (MqttVersion.MQTT_3_1_1.protocolLevel() != (byte) mqttConnectVariableHeader.version()
- && MqttVersion.MQTT_3_1.protocolLevel() != (byte) mqttConnectVariableHeader.version()) {
+ && MqttVersion.MQTT_5.protocolLevel() != (byte) mqttConnectVariableHeader.version()) {
return mqttChannel.write(
- MqttMessageBuilder.buildConnectAck(MqttConnectReturnCode.CONNECTION_REFUSED_UNACCEPTABLE_PROTOCOL_VERSION),
+ MqttMessageBuilder.buildConnectAck(MqttConnectReturnCode.CONNECTION_REFUSED_UNACCEPTABLE_PROTOCOL_VERSION, mqttConnectVariableHeader.version()),
false).then(mqttChannel.close());
}
/*password check*/
@@ -147,11 +147,11 @@ public class ConnectProtocol implements Protocol {
eventRegistry.registry(Event.CONNECT, mqttChannel, message, mqttReceiveContext);
- return mqttChannel.write(MqttMessageBuilder.buildConnectAck(MqttConnectReturnCode.CONNECTION_ACCEPTED), false)
+ return mqttChannel.write(MqttMessageBuilder.buildConnectAck(MqttConnectReturnCode.CONNECTION_ACCEPTED, mqttConnectVariableHeader.version()), false)
.then(Mono.fromRunnable(() -> sendOfflineMessage(mqttReceiveContext.getMessageRegistry(), mqttChannel)));
} else {
return mqttChannel.write(
- MqttMessageBuilder.buildConnectAck(MqttConnectReturnCode.CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD),
+ MqttMessageBuilder.buildConnectAck(MqttConnectReturnCode.CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD, mqttConnectVariableHeader.version()),
false).then(mqttChannel.close());
}
} catch (Exception e) {
diff --git a/smqtt-core/src/main/java/io/github/quickmsg/core/protocol/PublishProtocol.java b/smqtt-core/src/main/java/io/github/quickmsg/core/protocol/PublishProtocol.java
index 1a405e0799b2a5f9ce21db92daa533e2fa29afbe..eadaa2784e4c36ae45a41fdfc15aea0ed0432245 100644
--- a/smqtt-core/src/main/java/io/github/quickmsg/core/protocol/PublishProtocol.java
+++ b/smqtt-core/src/main/java/io/github/quickmsg/core/protocol/PublishProtocol.java
@@ -30,6 +30,8 @@ public class PublishProtocol implements Protocol {
private static List MESSAGE_TYPE_LIST = new ArrayList<>();
+ private static String ONE2ONE_TOPIC_PREFIX = "ipush/mt/msg/";
+
static {
MESSAGE_TYPE_LIST.add(MqttMessageType.PUBLISH);
}
@@ -52,6 +54,10 @@ public class PublishProtocol implements Protocol {
MessageRegistry messageRegistry = receiveContext.getMessageRegistry();
Set mqttChannels = topicRegistry.getSubscribesByTopic(variableHeader.topicName(),
message.fixedHeader().qosLevel());
+ // 定制化处理,一对一发送的离线消息进行持久化,保证送达
+ if (mqttChannels.isEmpty() && variableHeader.topicName().startsWith(ONE2ONE_TOPIC_PREFIX)) {
+ messageRegistry.saveOfflineMessage(OfflineMessage.of(message));
+ }
// http mock
if (mqttChannel.getIsMock()) {
return send(mqttChannels, message, messageRegistry, filterRetainMessage(message, messageRegistry));
diff --git a/smqtt-core/src/main/java/io/github/quickmsg/core/protocol/SubscribeProtocol.java b/smqtt-core/src/main/java/io/github/quickmsg/core/protocol/SubscribeProtocol.java
index 6a1c2f4d6409c53364b74d21a25338d99eb66d72..1f7914a54a2902849b6f796a9d938bcf9268b8ed 100644
--- a/smqtt-core/src/main/java/io/github/quickmsg/core/protocol/SubscribeProtocol.java
+++ b/smqtt-core/src/main/java/io/github/quickmsg/core/protocol/SubscribeProtocol.java
@@ -11,13 +11,13 @@ import io.github.quickmsg.common.protocol.Protocol;
import io.github.quickmsg.common.topic.SubscribeTopic;
import io.github.quickmsg.common.topic.TopicRegistry;
import io.netty.handler.codec.mqtt.MqttMessageType;
-import io.netty.handler.codec.mqtt.MqttSubAckMessage;
import io.netty.handler.codec.mqtt.MqttSubscribeMessage;
import reactor.core.publisher.Mono;
import reactor.util.context.ContextView;
import java.util.ArrayList;
import java.util.List;
+import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
@@ -40,7 +40,10 @@ public class SubscribeProtocol implements Protocol {
Set mqttTopicSubscriptions =
message.payload().topicSubscriptions()
.stream()
- .peek(mqttTopicSubscription -> this.loadRetainMessage(messageRegistry, mqttChannel, mqttTopicSubscription.topicName()))
+ .peek(mqttTopicSubscription -> {
+ this.loadRetainMessage(messageRegistry, mqttChannel, mqttTopicSubscription.topicName());
+ this.loadOfflineMessage(messageRegistry, mqttChannel, mqttTopicSubscription.topicName());
+ })
.map(mqttTopicSubscription ->
new SubscribeTopic(mqttTopicSubscription.topicName(), mqttTopicSubscription.qualityOfService(), mqttChannel))
.collect(Collectors.toSet());
@@ -57,6 +60,12 @@ public class SubscribeProtocol implements Protocol {
.collect(Collectors.toList())), false));
}
+ /**
+ * 获取保留消息
+ * @param messageRegistry
+ * @param mqttChannel
+ * @param topicName
+ */
private void loadRetainMessage(MessageRegistry messageRegistry, MqttChannel mqttChannel, String topicName) {
messageRegistry.getRetainMessage(topicName)
.forEach(retainMessage ->
@@ -64,6 +73,19 @@ public class SubscribeProtocol implements Protocol {
.subscribe());
}
+ /**
+ * 获取离线消息
+ * @param messageRegistry
+ * @param mqttChannel
+ * @param topicName
+ */
+ private void loadOfflineMessage(MessageRegistry messageRegistry, MqttChannel mqttChannel, String topicName) {
+ Optional.ofNullable(messageRegistry.getOfflineMessage(topicName)).ifPresent(msgList ->
+ msgList.forEach(offlineMessage ->
+ mqttChannel.write(offlineMessage.toPublishMessage(mqttChannel), offlineMessage.getQos() > 0)
+ .subscribe()));
+ }
+
@Override
public List getMqttMessageTypes() {
return MESSAGE_TYPE_LIST;
diff --git a/smqtt-core/src/main/java/io/github/quickmsg/core/spi/DefaultMessageRegistry.java b/smqtt-core/src/main/java/io/github/quickmsg/core/spi/DefaultMessageRegistry.java
index 256d8170f42227019a8c1ec680095a9ecc04b99f..67e35485b15b48e108a2ac4a50f2010d2faaf537 100644
--- a/smqtt-core/src/main/java/io/github/quickmsg/core/spi/DefaultMessageRegistry.java
+++ b/smqtt-core/src/main/java/io/github/quickmsg/core/spi/DefaultMessageRegistry.java
@@ -1,10 +1,12 @@
package io.github.quickmsg.core.spi;
import io.github.quickmsg.common.message.MessageRegistry;
+import io.github.quickmsg.common.message.OfflineMessage;
import io.github.quickmsg.common.message.RetainMessage;
import io.github.quickmsg.common.message.SessionMessage;
import io.github.quickmsg.common.utils.TopicRegexUtils;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@@ -21,6 +23,8 @@ public class DefaultMessageRegistry implements MessageRegistry {
private Map retainMessages = new ConcurrentHashMap<>();
+ private Map> offlineMessages = new ConcurrentHashMap<>();
+
@Override
public List getSessionMessage(String clientIdentifier) {
@@ -47,4 +51,15 @@ public class DefaultMessageRegistry implements MessageRegistry {
.collect(Collectors.toList());
}
+ @Override
+ public List getOfflineMessage(String topic) {
+ return offlineMessages.remove(topic);
+ }
+
+ @Override
+ public void saveOfflineMessage(OfflineMessage offlineMessage) {
+ List offlineList = offlineMessages.computeIfAbsent(offlineMessage.getTopic(), key -> new CopyOnWriteArrayList<>());
+ offlineList.add(offlineMessage);
+ }
+
}
diff --git a/smqtt-persistent/smqtt-persistent-db/src/main/java/io/github/quickmsg/persistent/registry/DbMessageRegistry.java b/smqtt-persistent/smqtt-persistent-db/src/main/java/io/github/quickmsg/persistent/registry/DbMessageRegistry.java
index 92992c29dee58d8e71718b6cf1bbaf86452e9a35..20d9d996672b7fc62a6aaf31736cc9c413c76583 100644
--- a/smqtt-persistent/smqtt-persistent-db/src/main/java/io/github/quickmsg/persistent/registry/DbMessageRegistry.java
+++ b/smqtt-persistent/smqtt-persistent-db/src/main/java/io/github/quickmsg/persistent/registry/DbMessageRegistry.java
@@ -2,6 +2,7 @@ package io.github.quickmsg.persistent.registry;
import io.github.quickmsg.common.config.BootstrapConfig;
import io.github.quickmsg.common.message.MessageRegistry;
+import io.github.quickmsg.common.message.OfflineMessage;
import io.github.quickmsg.common.message.RetainMessage;
import io.github.quickmsg.common.message.SessionMessage;
import io.github.quickmsg.common.utils.TopicRegexUtils;
@@ -86,6 +87,7 @@ public class DbMessageRegistry implements MessageRegistry {
.qos(record.getQos())
.topic(record.getTopic())
.body(record.getBody().getBytes())
+ .userProperties(record.getUserProperties())
.clientIdentifier(record.getClientId())
.retain(record.getRetain())
.build())
@@ -111,6 +113,7 @@ public class DbMessageRegistry implements MessageRegistry {
int qos = sessionMessage.getQos();
boolean retain = sessionMessage.isRetain();
byte[] body = sessionMessage.getBody();
+ String userProperties = sessionMessage.getUserProperties();
try (Connection connection = HikariCPConnectionProvider.singleTon().getConnection()) {
DSLContext dslContext = DSL.using(connection);
@@ -121,8 +124,9 @@ public class DbMessageRegistry implements MessageRegistry {
Tables.SMQTT_SESSION.QOS,
Tables.SMQTT_SESSION.RETAIN,
Tables.SMQTT_SESSION.BODY,
- Tables.SMQTT_SESSION.CREATE_TIME)
- .values(topic, clientIdentifier, qos, retain, bodyMsg, LocalDateTime.now())
+ Tables.SMQTT_SESSION.CREATE_TIME,
+ Tables.SMQTT_SESSION.USER_PROPERTIES)
+ .values(topic, clientIdentifier, qos, retain, bodyMsg, LocalDateTime.now(), userProperties)
.execute();
} catch (Exception e) {
log.error("sendSessionMessages error message: {}", clientIdentifier, e);
@@ -144,24 +148,26 @@ public class DbMessageRegistry implements MessageRegistry {
.from(Tables.SMQTT_RETAIN)
.where(Tables.SMQTT_RETAIN.TOPIC.eq(topic))
.fetchAny();
+ String bodyMsg = new String(retainMessage.getBody(), CharsetUtil.UTF_8);
+ String userProperties = retainMessage.getUserProperties();
if (integerRecord1 != null && integerRecord1.value1() != null && integerRecord1.value1() > 0) {
// 更新记录
- String bodyMsg = new String(retainMessage.getBody(), CharsetUtil.UTF_8);
dslContext.update(Tables.SMQTT_RETAIN)
.set(Tables.SMQTT_RETAIN.QOS, qos)
.set(Tables.SMQTT_RETAIN.BODY, bodyMsg)
.set(Tables.SMQTT_RETAIN.UPDATE_TIME, LocalDateTime.now())
+ .set(Tables.SMQTT_RETAIN.USER_PROPERTIES, userProperties)
.where(Tables.SMQTT_RETAIN.TOPIC.eq(topic))
.execute();
} else {
// 新增记录
- String bodyMsg = new String(retainMessage.getBody(), CharsetUtil.UTF_8);
dslContext.insertInto(Tables.SMQTT_RETAIN)
.columns(Tables.SMQTT_RETAIN.TOPIC,
Tables.SMQTT_RETAIN.QOS,
Tables.SMQTT_RETAIN.BODY,
- Tables.SMQTT_RETAIN.CREATE_TIME)
- .values(topic, qos, bodyMsg, LocalDateTime.now())
+ Tables.SMQTT_RETAIN.CREATE_TIME,
+ Tables.SMQTT_RETAIN.USER_PROPERTIES)
+ .values(topic, qos, bodyMsg, LocalDateTime.now(), userProperties)
.execute();
}
}
@@ -183,6 +189,7 @@ public class DbMessageRegistry implements MessageRegistry {
.topic(record.getTopic())
.qos(record.getQos())
.body(getBody(record.getBody()))
+ .userProperties(record.getUserProperties())
.build()
)
.collect(Collectors.toList());
@@ -198,4 +205,59 @@ public class DbMessageRegistry implements MessageRegistry {
return StringUtils.isBlank(body) ? null : body.getBytes(CharsetUtil.UTF_8);
}
+ @Override
+ public List getOfflineMessage(String topic) {
+ try (Connection connection = HikariCPConnectionProvider.singleTon().getConnection()) {
+ DSLContext dslContext = DSL.using(connection);
+
+ List list = dslContext
+ .selectFrom(Tables.SMQTT_OFFLINE)
+ .where(Tables.SMQTT_OFFLINE.TOPIC.eq(topic))
+ .fetch()
+ .stream()
+ .map(record ->
+ OfflineMessage.builder()
+ .qos(record.getQos())
+ .topic(record.getTopic())
+ .body(record.getBody().getBytes())
+ .userProperties(record.getUserProperties())
+ .build())
+ .collect(Collectors.toList());
+
+ if (list.size() > 0) {
+ // 删除记录
+ dslContext.deleteFrom(Tables.SMQTT_OFFLINE)
+ .where(Tables.SMQTT_OFFLINE.TOPIC.eq(topic))
+ .execute();
+ }
+ return list;
+ } catch (Exception e) {
+ log.error("getOfflineMessages error topic:{}", topic, e);
+ return Collections.emptyList();
+ }
+ }
+
+ @Override
+ public void saveOfflineMessage(OfflineMessage offlineMessage) {
+ String topic = offlineMessage.getTopic();
+ int qos = offlineMessage.getQos();
+ byte[] body = offlineMessage.getBody();
+ String userProperties = offlineMessage.getUserProperties();
+
+ try (Connection connection = HikariCPConnectionProvider.singleTon().getConnection()) {
+ DSLContext dslContext = DSL.using(connection);
+ String bodyMsg = new String(body, CharsetUtil.UTF_8);
+ dslContext.insertInto(Tables.SMQTT_OFFLINE)
+ .columns(Tables.SMQTT_OFFLINE.TOPIC,
+ Tables.SMQTT_OFFLINE.QOS,
+ Tables.SMQTT_OFFLINE.BODY,
+ Tables.SMQTT_OFFLINE.CREATE_TIME,
+ Tables.SMQTT_OFFLINE.USER_PROPERTIES)
+ .values(topic, qos, bodyMsg, LocalDateTime.now(), userProperties)
+ .execute();
+ } catch (Exception e) {
+ log.error("saveOfflineMessages error message: {}", topic, e);
+ }
+ }
+
}
diff --git a/smqtt-persistent/smqtt-persistent-db/src/main/java/io/github/quickmsg/persistent/tables/Indexes.java b/smqtt-persistent/smqtt-persistent-db/src/main/java/io/github/quickmsg/persistent/tables/Indexes.java
index 376554f021b6152a7896eca06061af1461978a23..a09f89043af7f8298f73af2211b023d41804c644 100644
--- a/smqtt-persistent/smqtt-persistent-db/src/main/java/io/github/quickmsg/persistent/tables/Indexes.java
+++ b/smqtt-persistent/smqtt-persistent-db/src/main/java/io/github/quickmsg/persistent/tables/Indexes.java
@@ -4,6 +4,7 @@
package io.github.quickmsg.persistent.tables;
+import io.github.quickmsg.persistent.tables.tables.SmqttOffline;
import io.github.quickmsg.persistent.tables.tables.SmqttRetain;
import org.jooq.Index;
@@ -23,4 +24,6 @@ public class Indexes {
// -------------------------------------------------------------------------
public static final Index SMQTT_RETAIN_INDEX_TOPIC = Internal.createIndex(DSL.name("index_topic"), SmqttRetain.SMQTT_RETAIN, new OrderField[] { SmqttRetain.SMQTT_RETAIN.TOPIC }, false);
+
+ public static final Index SMQTT_OFFLINE_INDEX_TOPIC = Internal.createIndex(DSL.name("index_topic"), SmqttOffline.SMQTT_OFFLINE, new OrderField[] { SmqttOffline.SMQTT_OFFLINE.TOPIC }, false);
}
diff --git a/smqtt-persistent/smqtt-persistent-db/src/main/java/io/github/quickmsg/persistent/tables/Smqtt.java b/smqtt-persistent/smqtt-persistent-db/src/main/java/io/github/quickmsg/persistent/tables/Smqtt.java
index 010bb68d5d33942b02dee6b72aac2224097e81f5..c2a5712d0aaafc38cc224f7a8e6ea0a32442bb1a 100644
--- a/smqtt-persistent/smqtt-persistent-db/src/main/java/io/github/quickmsg/persistent/tables/Smqtt.java
+++ b/smqtt-persistent/smqtt-persistent-db/src/main/java/io/github/quickmsg/persistent/tables/Smqtt.java
@@ -6,6 +6,7 @@ package io.github.quickmsg.persistent.tables;
import io.github.quickmsg.persistent.tables.tables.Databasechangelog;
import io.github.quickmsg.persistent.tables.tables.Databasechangeloglock;
+import io.github.quickmsg.persistent.tables.tables.SmqttOffline;
import io.github.quickmsg.persistent.tables.tables.SmqttRetain;
import io.github.quickmsg.persistent.tables.tables.SmqttSession;
@@ -40,6 +41,9 @@ public class Smqtt extends SchemaImpl {
public final SmqttSession SMQTT_SESSION = SmqttSession.SMQTT_SESSION;
+ public static final SmqttOffline SMQTT_OFFLINE = SmqttOffline.SMQTT_OFFLINE;
+
+
private Smqtt() {
super("smqtt", null);
}
@@ -53,9 +57,10 @@ public class Smqtt extends SchemaImpl {
@Override
public final List> getTables() {
return Arrays.>asList(
- Databasechangelog.DATABASECHANGELOG,
- Databasechangeloglock.DATABASECHANGELOGLOCK,
- SmqttRetain.SMQTT_RETAIN,
- SmqttSession.SMQTT_SESSION);
+ Databasechangelog.DATABASECHANGELOG,
+ Databasechangeloglock.DATABASECHANGELOGLOCK,
+ SmqttRetain.SMQTT_RETAIN,
+ SmqttSession.SMQTT_SESSION,
+ SmqttOffline.SMQTT_OFFLINE);
}
}
diff --git a/smqtt-persistent/smqtt-persistent-db/src/main/java/io/github/quickmsg/persistent/tables/Tables.java b/smqtt-persistent/smqtt-persistent-db/src/main/java/io/github/quickmsg/persistent/tables/Tables.java
index dc24425ddce2ec62c6cb233277e00200935576e4..d1bdbc0d5b2547863b6e068430ec280d04528590 100644
--- a/smqtt-persistent/smqtt-persistent-db/src/main/java/io/github/quickmsg/persistent/tables/Tables.java
+++ b/smqtt-persistent/smqtt-persistent-db/src/main/java/io/github/quickmsg/persistent/tables/Tables.java
@@ -6,6 +6,7 @@ package io.github.quickmsg.persistent.tables;
import io.github.quickmsg.persistent.tables.tables.Databasechangelog;
import io.github.quickmsg.persistent.tables.tables.Databasechangeloglock;
+import io.github.quickmsg.persistent.tables.tables.SmqttOffline;
import io.github.quickmsg.persistent.tables.tables.SmqttRetain;
import io.github.quickmsg.persistent.tables.tables.SmqttSession;
@@ -27,4 +28,7 @@ public class Tables {
public static final SmqttSession SMQTT_SESSION = SmqttSession.SMQTT_SESSION;
+
+
+ public static final SmqttOffline SMQTT_OFFLINE = SmqttOffline.SMQTT_OFFLINE;
}
diff --git a/smqtt-persistent/smqtt-persistent-db/src/main/java/io/github/quickmsg/persistent/tables/tables/SmqttOffline.java b/smqtt-persistent/smqtt-persistent-db/src/main/java/io/github/quickmsg/persistent/tables/tables/SmqttOffline.java
new file mode 100644
index 0000000000000000000000000000000000000000..d6cd317ed18db9fb62c0f93e39763a1a1dc2b8f6
--- /dev/null
+++ b/smqtt-persistent/smqtt-persistent-db/src/main/java/io/github/quickmsg/persistent/tables/tables/SmqttOffline.java
@@ -0,0 +1,123 @@
+package io.github.quickmsg.persistent.tables.tables;
+
+import org.jooq.Field;
+import org.jooq.ForeignKey;
+import org.jooq.Index;
+import org.jooq.Name;
+import org.jooq.Record;
+import org.jooq.Row5;
+import org.jooq.Schema;
+import org.jooq.Table;
+import org.jooq.TableField;
+import org.jooq.TableOptions;
+import org.jooq.impl.DSL;
+import org.jooq.impl.SQLDataType;
+import org.jooq.impl.TableImpl;
+
+import java.time.LocalDateTime;
+import java.util.Arrays;
+import java.util.List;
+
+import io.github.quickmsg.persistent.tables.Indexes;
+import io.github.quickmsg.persistent.tables.Smqtt;
+import io.github.quickmsg.persistent.tables.tables.records.SmqttOfflineRecord;
+
+/**
+ * @Author: Jingwu.Zhou
+ * @Date: 2022/1/24
+ */
+public class SmqttOffline extends TableImpl {
+ private static final long serialVersionUID = 1L;
+
+
+ public static final SmqttOffline SMQTT_OFFLINE = new SmqttOffline();
+
+
+ @Override
+ public Class getRecordType() {
+ return SmqttOfflineRecord.class;
+ }
+
+ public final TableField TOPIC = createField(DSL.name("topic"), SQLDataType.VARCHAR(255).defaultValue(DSL.inline("NULL", SQLDataType.VARCHAR)), this, "话题");
+
+
+ public final TableField QOS = createField(DSL.name("qos"), SQLDataType.INTEGER.defaultValue(DSL.inline("NULL", SQLDataType.INTEGER)), this, "qos");
+
+
+ public final TableField BODY = createField(DSL.name("body"), SQLDataType.VARCHAR(255).defaultValue(DSL.inline("NULL", SQLDataType.VARCHAR)), this, "消息内容");
+
+
+ public final TableField CREATE_TIME = createField(DSL.name("create_time"), SQLDataType.LOCALDATETIME(0).defaultValue(DSL.inline("NULL", SQLDataType.LOCALDATETIME)), this, "记录保存时间");
+
+
+ public final TableField USER_PROPERTIES = createField(DSL.name("user_properties"), SQLDataType.VARCHAR(1000).defaultValue(DSL.inline("NULL", SQLDataType.VARCHAR)), this, "可变头userProperties");
+
+
+ private SmqttOffline(Name alias, Table aliased) {
+ this(alias, aliased, null);
+ }
+
+ private SmqttOffline(Name alias, Table aliased, Field>[] parameters) {
+ super(alias, null, aliased, parameters, DSL.comment(""), TableOptions.table());
+ }
+
+
+ public SmqttOffline(String alias) {
+ this(DSL.name(alias), SMQTT_OFFLINE);
+ }
+
+
+ public SmqttOffline(Name alias) {
+ this(alias, SMQTT_OFFLINE);
+ }
+
+
+ public SmqttOffline() {
+ this(DSL.name("smqtt_offline"), null);
+ }
+
+ public SmqttOffline(Table child, ForeignKey key) {
+ super(child, key, SMQTT_OFFLINE);
+ }
+
+ @Override
+ public Schema getSchema() {
+ return Smqtt.SMQTT;
+ }
+
+ @Override
+ public List getIndexes() {
+ return Arrays.asList(Indexes.SMQTT_OFFLINE_INDEX_TOPIC);
+ }
+
+ @Override
+ public SmqttOffline as(String alias) {
+ return new SmqttOffline(DSL.name(alias), this);
+ }
+
+ @Override
+ public SmqttOffline as(Name alias) {
+ return new SmqttOffline(alias, this);
+ }
+
+
+ @Override
+ public SmqttOffline rename(String name) {
+ return new SmqttOffline(DSL.name(name), null);
+ }
+
+
+ @Override
+ public SmqttOffline rename(Name name) {
+ return new SmqttOffline(name, null);
+ }
+
+ // -------------------------------------------------------------------------
+ // Row5 type methods
+ // -------------------------------------------------------------------------
+
+ @Override
+ public Row5 fieldsRow() {
+ return (Row5) super.fieldsRow();
+ }
+}
diff --git a/smqtt-persistent/smqtt-persistent-db/src/main/java/io/github/quickmsg/persistent/tables/tables/SmqttRetain.java b/smqtt-persistent/smqtt-persistent-db/src/main/java/io/github/quickmsg/persistent/tables/tables/SmqttRetain.java
index 11edf2cea374a2f38602a9f63415939ebbf81b10..5fb01bcbfd93008983f5b1b874551d9a4cd261b2 100644
--- a/smqtt-persistent/smqtt-persistent-db/src/main/java/io/github/quickmsg/persistent/tables/tables/SmqttRetain.java
+++ b/smqtt-persistent/smqtt-persistent-db/src/main/java/io/github/quickmsg/persistent/tables/tables/SmqttRetain.java
@@ -17,7 +17,7 @@ import org.jooq.ForeignKey;
import org.jooq.Index;
import org.jooq.Name;
import org.jooq.Record;
-import org.jooq.Row5;
+import org.jooq.Row6;
import org.jooq.Schema;
import org.jooq.Table;
import org.jooq.TableField;
@@ -57,6 +57,8 @@ public class SmqttRetain extends TableImpl {
public final TableField UPDATE_TIME = createField(DSL.name("update_time"), SQLDataType.LOCALDATETIME(0).defaultValue(DSL.inline("NULL", SQLDataType.LOCALDATETIME)), this, "记录更新时间");
+ public final TableField USER_PROPERTIES = createField(DSL.name("user_properties"), SQLDataType.VARCHAR(1000).defaultValue(DSL.inline("NULL", SQLDataType.VARCHAR)), this, "可变头userProperties");
+
private SmqttRetain(Name alias, Table aliased) {
this(alias, aliased, null);
}
@@ -121,7 +123,7 @@ public class SmqttRetain extends TableImpl {
// -------------------------------------------------------------------------
@Override
- public Row5 fieldsRow() {
- return (Row5) super.fieldsRow();
+ public Row6 fieldsRow() {
+ return (Row6) super.fieldsRow();
}
}
diff --git a/smqtt-persistent/smqtt-persistent-db/src/main/java/io/github/quickmsg/persistent/tables/tables/SmqttSession.java b/smqtt-persistent/smqtt-persistent-db/src/main/java/io/github/quickmsg/persistent/tables/tables/SmqttSession.java
index 8e46bde625398231e4431754155509ddfb98050c..cebf7fb296940e672ae26511dfdb18ee54568fce 100644
--- a/smqtt-persistent/smqtt-persistent-db/src/main/java/io/github/quickmsg/persistent/tables/tables/SmqttSession.java
+++ b/smqtt-persistent/smqtt-persistent-db/src/main/java/io/github/quickmsg/persistent/tables/tables/SmqttSession.java
@@ -13,7 +13,7 @@ import org.jooq.Field;
import org.jooq.ForeignKey;
import org.jooq.Name;
import org.jooq.Record;
-import org.jooq.Row6;
+import org.jooq.Row7;
import org.jooq.Schema;
import org.jooq.Table;
import org.jooq.TableField;
@@ -62,6 +62,8 @@ public class SmqttSession extends TableImpl {
public final TableField CREATE_TIME = createField(DSL.name("create_time"), SQLDataType.LOCALDATETIME(0).defaultValue(DSL.inline("NULL", SQLDataType.LOCALDATETIME)), this, "记录保存时间");
+ public final TableField USER_PROPERTIES = createField(DSL.name("user_properties"), SQLDataType.VARCHAR(1000).defaultValue(DSL.inline("NULL", SQLDataType.VARCHAR)), this, "可变头userProperties");
+
private SmqttSession(Name alias, Table aliased) {
this(alias, aliased, null);
}
@@ -121,7 +123,7 @@ public class SmqttSession extends TableImpl {
// -------------------------------------------------------------------------
@Override
- public Row6 fieldsRow() {
- return (Row6) super.fieldsRow();
+ public Row7 fieldsRow() {
+ return (Row7) super.fieldsRow();
}
}
diff --git a/smqtt-persistent/smqtt-persistent-db/src/main/java/io/github/quickmsg/persistent/tables/tables/records/SmqttOfflineRecord.java b/smqtt-persistent/smqtt-persistent-db/src/main/java/io/github/quickmsg/persistent/tables/tables/records/SmqttOfflineRecord.java
new file mode 100644
index 0000000000000000000000000000000000000000..5408bd50a6d2f28ea374db7a79a269103598e19f
--- /dev/null
+++ b/smqtt-persistent/smqtt-persistent-db/src/main/java/io/github/quickmsg/persistent/tables/tables/records/SmqttOfflineRecord.java
@@ -0,0 +1,214 @@
+package io.github.quickmsg.persistent.tables.tables.records;
+
+import org.jooq.Field;
+import org.jooq.Record5;
+import org.jooq.Row5;
+import org.jooq.impl.TableRecordImpl;
+
+import java.time.LocalDateTime;
+
+import io.github.quickmsg.persistent.tables.tables.SmqttOffline;
+
+/**
+ * @Author: Jingwu.Zhou
+ * @Date: 2022/1/24
+ */
+public class SmqttOfflineRecord extends TableRecordImpl implements Record5 {
+ private static final long serialVersionUID = 1L;
+
+ public void setTopic(String value) {
+ set(0, value);
+ }
+
+
+ public String getTopic() {
+ return (String) get(0);
+ }
+
+
+ public void setQos(Integer value) {
+ set(1, value);
+ }
+
+
+ public Integer getQos() {
+ return (Integer) get(1);
+ }
+
+
+ public void setBody(String value) {
+ set(2, value);
+ }
+
+
+ public String getBody() {
+ return (String) get(2);
+ }
+
+ public void setCreateTime(LocalDateTime value) {
+ set(3, value);
+ }
+
+ public LocalDateTime getCreateTime() {
+ return (LocalDateTime) get(3);
+ }
+
+
+ public void setUserProperties(String value) {
+ set(4, value);
+ }
+
+
+ public String getUserProperties() {
+ return (String) get(4);
+ }
+
+ // -------------------------------------------------------------------------
+ // Record5 type implementation
+ // -------------------------------------------------------------------------
+
+ @Override
+ public Row5 fieldsRow() {
+ return (Row5) super.fieldsRow();
+ }
+
+ @Override
+ public Row5 valuesRow() {
+ return (Row5) super.valuesRow();
+ }
+
+ @Override
+ public Field field1() {
+ return SmqttOffline.SMQTT_OFFLINE.TOPIC;
+ }
+
+ @Override
+ public Field field2() {
+ return SmqttOffline.SMQTT_OFFLINE.QOS;
+ }
+
+ @Override
+ public Field field3() {
+ return SmqttOffline.SMQTT_OFFLINE.BODY;
+ }
+
+ @Override
+ public Field field4() {
+ return SmqttOffline.SMQTT_OFFLINE.CREATE_TIME;
+ }
+
+ @Override
+ public Field field5() {
+ return SmqttOffline.SMQTT_OFFLINE.USER_PROPERTIES;
+ }
+
+ @Override
+ public String component1() {
+ return getTopic();
+ }
+
+ @Override
+ public Integer component2() {
+ return getQos();
+ }
+
+ @Override
+ public String component3() {
+ return getBody();
+ }
+
+ @Override
+ public LocalDateTime component4() {
+ return getCreateTime();
+ }
+
+ @Override
+ public String component5() {
+ return getUserProperties();
+ }
+
+ @Override
+ public String value1() {
+ return getTopic();
+ }
+
+ @Override
+ public Integer value2() {
+ return getQos();
+ }
+
+ @Override
+ public String value3() {
+ return getBody();
+ }
+
+ @Override
+ public LocalDateTime value4() {
+ return getCreateTime();
+ }
+
+ @Override
+ public String value5() {
+ return getUserProperties();
+ }
+
+ @Override
+ public SmqttOfflineRecord value1(String value) {
+ setTopic(value);
+ return this;
+ }
+
+ @Override
+ public SmqttOfflineRecord value2(Integer value) {
+ setQos(value);
+ return this;
+ }
+
+ @Override
+ public SmqttOfflineRecord value3(String value) {
+ setBody(value);
+ return this;
+ }
+
+ @Override
+ public SmqttOfflineRecord value4(LocalDateTime value) {
+ setCreateTime(value);
+ return this;
+ }
+
+ @Override
+ public SmqttOfflineRecord value5(String value) {
+ setUserProperties(value);
+ return this;
+ }
+
+ @Override
+ public SmqttOfflineRecord values(String value1, Integer value2, String value3, LocalDateTime value4, String value5) {
+ value1(value1);
+ value2(value2);
+ value3(value3);
+ value4(value4);
+ value5(value5);
+ return this;
+ }
+
+ // -------------------------------------------------------------------------
+ // Constructors
+ // -------------------------------------------------------------------------
+
+
+ public SmqttOfflineRecord() {
+ super(SmqttOffline.SMQTT_OFFLINE);
+ }
+
+
+ public SmqttOfflineRecord(String topic, Integer qos, String body, LocalDateTime createTime, LocalDateTime updateTime, String userProperties) {
+ super(SmqttOffline.SMQTT_OFFLINE);
+
+ setTopic(topic);
+ setQos(qos);
+ setBody(body);
+ setCreateTime(createTime);
+ setUserProperties(userProperties);
+ }
+}
diff --git a/smqtt-persistent/smqtt-persistent-db/src/main/java/io/github/quickmsg/persistent/tables/tables/records/SmqttRetainRecord.java b/smqtt-persistent/smqtt-persistent-db/src/main/java/io/github/quickmsg/persistent/tables/tables/records/SmqttRetainRecord.java
index df93c1d921501df7f94de8e9236708c23c724851..119e93764720473e5c51cbc6c5c504f6f650c270 100644
--- a/smqtt-persistent/smqtt-persistent-db/src/main/java/io/github/quickmsg/persistent/tables/tables/records/SmqttRetainRecord.java
+++ b/smqtt-persistent/smqtt-persistent-db/src/main/java/io/github/quickmsg/persistent/tables/tables/records/SmqttRetainRecord.java
@@ -9,8 +9,8 @@ import io.github.quickmsg.persistent.tables.tables.SmqttRetain;
import java.time.LocalDateTime;
import org.jooq.Field;
-import org.jooq.Record5;
-import org.jooq.Row5;
+import org.jooq.Record6;
+import org.jooq.Row6;
import org.jooq.impl.TableRecordImpl;
@@ -18,7 +18,7 @@ import org.jooq.impl.TableRecordImpl;
* This class is generated by jOOQ.
*/
@SuppressWarnings({ "all", "unchecked", "rawtypes" })
-public class SmqttRetainRecord extends TableRecordImpl implements Record5 {
+public class SmqttRetainRecord extends TableRecordImpl implements Record6 {
private static final long serialVersionUID = 1L;
@@ -69,18 +69,27 @@ public class SmqttRetainRecord extends TableRecordImpl implem
return (LocalDateTime) get(4);
}
+ public void setUserProperties(String value) {
+ set(5, value);
+ }
+
+
+ public String getUserProperties() {
+ return (String) get(5);
+ }
+
// -------------------------------------------------------------------------
// Record5 type implementation
// -------------------------------------------------------------------------
@Override
- public Row5 fieldsRow() {
- return (Row5) super.fieldsRow();
+ public Row6 fieldsRow() {
+ return (Row6) super.fieldsRow();
}
@Override
- public Row5 valuesRow() {
- return (Row5) super.valuesRow();
+ public Row6 valuesRow() {
+ return (Row6) super.valuesRow();
}
@Override
@@ -108,6 +117,11 @@ public class SmqttRetainRecord extends TableRecordImpl implem
return SmqttRetain.SMQTT_RETAIN.UPDATE_TIME;
}
+ @Override
+ public Field field6() {
+ return SmqttRetain.SMQTT_RETAIN.USER_PROPERTIES;
+ }
+
@Override
public String component1() {
return getTopic();
@@ -133,6 +147,11 @@ public class SmqttRetainRecord extends TableRecordImpl implem
return getUpdateTime();
}
+ @Override
+ public String component6() {
+ return getUserProperties();
+ }
+
@Override
public String value1() {
return getTopic();
@@ -158,6 +177,11 @@ public class SmqttRetainRecord extends TableRecordImpl implem
return getUpdateTime();
}
+ @Override
+ public String value6() {
+ return getUserProperties();
+ }
+
@Override
public SmqttRetainRecord value1(String value) {
setTopic(value);
@@ -189,12 +213,19 @@ public class SmqttRetainRecord extends TableRecordImpl implem
}
@Override
- public SmqttRetainRecord values(String value1, Integer value2, String value3, LocalDateTime value4, LocalDateTime value5) {
+ public SmqttRetainRecord value6(String value) {
+ setUserProperties(value);
+ return this;
+ }
+
+ @Override
+ public SmqttRetainRecord values(String value1, Integer value2, String value3, LocalDateTime value4, LocalDateTime value5, String value6) {
value1(value1);
value2(value2);
value3(value3);
value4(value4);
value5(value5);
+ value6(value6);
return this;
}
@@ -208,7 +239,7 @@ public class SmqttRetainRecord extends TableRecordImpl implem
}
- public SmqttRetainRecord(String topic, Integer qos, String body, LocalDateTime createTime, LocalDateTime updateTime) {
+ public SmqttRetainRecord(String topic, Integer qos, String body, LocalDateTime createTime, LocalDateTime updateTime, String userProperties) {
super(SmqttRetain.SMQTT_RETAIN);
setTopic(topic);
@@ -216,5 +247,6 @@ public class SmqttRetainRecord extends TableRecordImpl implem
setBody(body);
setCreateTime(createTime);
setUpdateTime(updateTime);
+ setUserProperties(userProperties);
}
}
diff --git a/smqtt-persistent/smqtt-persistent-db/src/main/java/io/github/quickmsg/persistent/tables/tables/records/SmqttSessionRecord.java b/smqtt-persistent/smqtt-persistent-db/src/main/java/io/github/quickmsg/persistent/tables/tables/records/SmqttSessionRecord.java
index 8904fe4687cad8881d4a1d647d069072dd8682e8..f7e845026d8b1f400f44007d83a04cf6adc635bb 100644
--- a/smqtt-persistent/smqtt-persistent-db/src/main/java/io/github/quickmsg/persistent/tables/tables/records/SmqttSessionRecord.java
+++ b/smqtt-persistent/smqtt-persistent-db/src/main/java/io/github/quickmsg/persistent/tables/tables/records/SmqttSessionRecord.java
@@ -9,8 +9,8 @@ import io.github.quickmsg.persistent.tables.tables.SmqttSession;
import java.time.LocalDateTime;
import org.jooq.Field;
-import org.jooq.Record6;
-import org.jooq.Row6;
+import org.jooq.Record7;
+import org.jooq.Row7;
import org.jooq.impl.TableRecordImpl;
@@ -18,7 +18,7 @@ import org.jooq.impl.TableRecordImpl;
* This class is generated by jOOQ.
*/
@SuppressWarnings({ "all", "unchecked", "rawtypes" })
-public class SmqttSessionRecord extends TableRecordImpl implements Record6 {
+public class SmqttSessionRecord extends TableRecordImpl implements Record7 {
private static final long serialVersionUID = 1L;
@@ -72,6 +72,15 @@ public class SmqttSessionRecord extends TableRecordImpl impl
return (String) get(4);
}
+ public void setUserProperties(String value) {
+ set(6, value);
+ }
+
+
+ public String getUserProperties() {
+ return (String) get(6);
+ }
+
public void setCreateTime(LocalDateTime value) {
set(5, value);
@@ -87,13 +96,13 @@ public class SmqttSessionRecord extends TableRecordImpl impl
// -------------------------------------------------------------------------
@Override
- public Row6 fieldsRow() {
- return (Row6) super.fieldsRow();
+ public Row7 fieldsRow() {
+ return (Row7) super.fieldsRow();
}
@Override
- public Row6 valuesRow() {
- return (Row6) super.valuesRow();
+ public Row7 valuesRow() {
+ return (Row7) super.valuesRow();
}
@Override
@@ -126,6 +135,11 @@ public class SmqttSessionRecord extends TableRecordImpl impl
return SmqttSession.SMQTT_SESSION.CREATE_TIME;
}
+ @Override
+ public Field field7() {
+ return SmqttSession.SMQTT_SESSION.USER_PROPERTIES;
+ }
+
@Override
public String component1() {
return getTopic();
@@ -156,6 +170,11 @@ public class SmqttSessionRecord extends TableRecordImpl impl
return getCreateTime();
}
+ @Override
+ public String component7() {
+ return getUserProperties();
+ }
+
@Override
public String value1() {
return getTopic();
@@ -186,6 +205,11 @@ public class SmqttSessionRecord extends TableRecordImpl impl
return getCreateTime();
}
+ @Override
+ public String value7() {
+ return getUserProperties();
+ }
+
@Override
public SmqttSessionRecord value1(String value) {
setTopic(value);
@@ -223,13 +247,20 @@ public class SmqttSessionRecord extends TableRecordImpl impl
}
@Override
- public SmqttSessionRecord values(String value1, String value2, Integer value3, Boolean value4, String value5, LocalDateTime value6) {
+ public SmqttSessionRecord value7(String value) {
+ setUserProperties(value);
+ return this;
+ }
+
+ @Override
+ public SmqttSessionRecord values(String value1, String value2, Integer value3, Boolean value4, String value5, LocalDateTime value6, String value7) {
value1(value1);
value2(value2);
value3(value3);
value4(value4);
value5(value5);
value6(value6);
+ value7(value7);
return this;
}
@@ -243,7 +274,7 @@ public class SmqttSessionRecord extends TableRecordImpl impl
}
- public SmqttSessionRecord(String topic, String clientId, Integer qos, Boolean retain, String body, LocalDateTime createTime) {
+ public SmqttSessionRecord(String topic, String clientId, Integer qos, Boolean retain, String body, LocalDateTime createTime, String userProperties) {
super(SmqttSession.SMQTT_SESSION);
setTopic(topic);
@@ -252,5 +283,6 @@ public class SmqttSessionRecord extends TableRecordImpl impl
setRetain(retain);
setBody(body);
setCreateTime(createTime);
+ setUserProperties(userProperties);
}
}
diff --git a/smqtt-persistent/smqtt-persistent-db/src/main/resources/liquibase/smqtt_db.xml b/smqtt-persistent/smqtt-persistent-db/src/main/resources/liquibase/smqtt_db.xml
index 8c59856239669a22f6501d2c4328a4575aae327e..06cb5bfe67fac0710efc9f7f2da2f5206bb0c451 100644
--- a/smqtt-persistent/smqtt-persistent-db/src/main/resources/liquibase/smqtt_db.xml
+++ b/smqtt-persistent/smqtt-persistent-db/src/main/resources/liquibase/smqtt_db.xml
@@ -14,6 +14,7 @@
+
@@ -30,10 +31,26 @@
+
+
+
+ create smqtt_offline table
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/smqtt-persistent/smqtt-persistent-redis/src/main/java/io/github/quickmsg/persistent/message/OfflineMessageEntity.java b/smqtt-persistent/smqtt-persistent-redis/src/main/java/io/github/quickmsg/persistent/message/OfflineMessageEntity.java
new file mode 100644
index 0000000000000000000000000000000000000000..44f33d79f2a238ef406d38535ad69d495aaeb957
--- /dev/null
+++ b/smqtt-persistent/smqtt-persistent-redis/src/main/java/io/github/quickmsg/persistent/message/OfflineMessageEntity.java
@@ -0,0 +1,30 @@
+package io.github.quickmsg.persistent.message;
+
+import com.fasterxml.jackson.annotation.JsonFormat;
+
+import java.io.Serializable;
+import java.util.Date;
+
+import lombok.Builder;
+import lombok.Data;
+
+/**
+ * @Author: Jingwu.Zhou
+ * @Date: 2022/1/24
+ */
+@Data
+@Builder
+public class OfflineMessageEntity implements Serializable {
+ private static final long serialVersionUID = 1095608914696359395L;
+
+ private String topic;
+
+ private Integer qos;
+
+ private byte[] body;
+
+ private String userProperties;
+
+ @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")
+ private Date createTime;
+}
diff --git a/smqtt-persistent/smqtt-persistent-redis/src/main/java/io/github/quickmsg/persistent/message/RetainMessageEntity.java b/smqtt-persistent/smqtt-persistent-redis/src/main/java/io/github/quickmsg/persistent/message/RetainMessageEntity.java
index f30dc0208e235749b77df0392bfca0e6fdbb4c5b..086c987049d3960ec93cd0f42d93a0d164b7b89d 100644
--- a/smqtt-persistent/smqtt-persistent-redis/src/main/java/io/github/quickmsg/persistent/message/RetainMessageEntity.java
+++ b/smqtt-persistent/smqtt-persistent-redis/src/main/java/io/github/quickmsg/persistent/message/RetainMessageEntity.java
@@ -24,6 +24,8 @@ public class RetainMessageEntity implements Serializable {
private byte[] body;
+ private String userProperties;
+
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")
private Date createTime;
diff --git a/smqtt-persistent/smqtt-persistent-redis/src/main/java/io/github/quickmsg/persistent/message/SessionMessageEntity.java b/smqtt-persistent/smqtt-persistent-redis/src/main/java/io/github/quickmsg/persistent/message/SessionMessageEntity.java
index 2966a12546e27774fa61ae8c630f5a11c8bbcd81..418d356b3e9f8539f0d90a4a41ca1b424898e5c5 100644
--- a/smqtt-persistent/smqtt-persistent-redis/src/main/java/io/github/quickmsg/persistent/message/SessionMessageEntity.java
+++ b/smqtt-persistent/smqtt-persistent-redis/src/main/java/io/github/quickmsg/persistent/message/SessionMessageEntity.java
@@ -25,6 +25,8 @@ public class SessionMessageEntity implements Serializable {
private Boolean retain;
+ private String userProperties;
+
private byte[] body;
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")
private Date createTime;
diff --git a/smqtt-persistent/smqtt-persistent-redis/src/main/java/io/github/quickmsg/persistent/registry/RedisMessageRegistry.java b/smqtt-persistent/smqtt-persistent-redis/src/main/java/io/github/quickmsg/persistent/registry/RedisMessageRegistry.java
index a24f5a978ac3192ed52b45a166feac363b035337..9abbbd67629c18af2b5fd5c2c8b092e326c54942 100644
--- a/smqtt-persistent/smqtt-persistent-redis/src/main/java/io/github/quickmsg/persistent/registry/RedisMessageRegistry.java
+++ b/smqtt-persistent/smqtt-persistent-redis/src/main/java/io/github/quickmsg/persistent/registry/RedisMessageRegistry.java
@@ -2,12 +2,13 @@ package io.github.quickmsg.persistent.registry;
import io.github.quickmsg.common.bootstrap.BootstrapKey;
import io.github.quickmsg.common.config.BootstrapConfig;
-import io.github.quickmsg.common.environment.EnvContext;
import io.github.quickmsg.common.message.MessageRegistry;
+import io.github.quickmsg.common.message.OfflineMessage;
import io.github.quickmsg.common.message.RetainMessage;
import io.github.quickmsg.common.message.SessionMessage;
import io.github.quickmsg.common.utils.TopicRegexUtils;
import io.github.quickmsg.persistent.factory.ClientFactory;
+import io.github.quickmsg.persistent.message.OfflineMessageEntity;
import io.github.quickmsg.persistent.message.RetainMessageEntity;
import io.github.quickmsg.persistent.message.SessionMessageEntity;
import io.github.quickmsg.persistent.strategy.ClientStrategy;
@@ -15,10 +16,11 @@ import lombok.extern.slf4j.Slf4j;
import org.redisson.api.RBucket;
import org.redisson.api.RKeys;
import org.redisson.api.RList;
+import org.redisson.api.RSetCache;
import org.redisson.api.RedissonClient;
-import java.sql.Connection;
import java.util.*;
+import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
@@ -59,6 +61,7 @@ public class RedisMessageRegistry implements MessageRegistry {
.qos(record.getQos())
.retain(record.getRetain())
.body(record.getBody())
+ .userProperties(record.getUserProperties())
.build()
).collect(Collectors.toList());
@@ -79,6 +82,7 @@ public class RedisMessageRegistry implements MessageRegistry {
int qos = sessionMessage.getQos();
boolean retain = sessionMessage.isRetain();
byte[] body = sessionMessage.getBody();
+ String userProperty = sessionMessage.getUserProperties();
try {
SessionMessageEntity sessionMessageEntity = SessionMessageEntity.builder()
@@ -87,6 +91,7 @@ public class RedisMessageRegistry implements MessageRegistry {
.qos(qos)
.body(body)
.retain(retain)
+ .userProperties(userProperty)
.createTime(new Date()).build();
RList list = redissonClient.getList(BootstrapKey.Redis.REDIS_SESSION_MESSAGE_PREFIX_KEY + clientIdentifier);
@@ -110,6 +115,7 @@ public class RedisMessageRegistry implements MessageRegistry {
.topic(topic)
.qos(qos)
.body(retainMessage.getBody())
+ .userProperties(retainMessage.getUserProperties())
.createTime(date)
.updateTime(date).build();
@@ -137,6 +143,7 @@ public class RedisMessageRegistry implements MessageRegistry {
.topic(item.getTopic())
.qos(item.getQos())
.body(item.getBody())
+ .userProperties(item.getUserProperties())
.build()
).orElse(null);
})
@@ -148,4 +155,49 @@ public class RedisMessageRegistry implements MessageRegistry {
return Collections.emptyList();
}
+ @Override
+ public List getOfflineMessage(String topic) {
+ // 一对一发送,只需要精确匹配
+ try {
+ RSetCache set = redissonClient.getSetCache(BootstrapKey.Redis.REDIS_OFFLINE_MESSAGE_PREFIX_KEY + topic);
+ List resList = set.stream().map(record -> OfflineMessage.builder()
+ .topic(record.getTopic())
+ .qos(record.getQos())
+ .body(record.getBody())
+ .userProperties(record.getUserProperties())
+ .build()
+ ).collect(Collectors.toList());
+
+ if (set.size() > 0) {
+ redissonClient.getBucket(BootstrapKey.Redis.REDIS_OFFLINE_MESSAGE_PREFIX_KEY + topic).delete();
+ }
+ return resList;
+ } catch (Exception e) {
+ log.error("getOfflineMessage error clientIdentifier:{}", topic, e);
+ return Collections.emptyList();
+ }
+ }
+
+ @Override
+ public void saveOfflineMessage(OfflineMessage offlineMessage) {
+ String topic = offlineMessage.getTopic();
+ int qos = offlineMessage.getQos();
+ byte[] body = offlineMessage.getBody();
+ String userProperties = offlineMessage.getUserProperties();
+ try {
+ OfflineMessageEntity offlineMessageEntity = OfflineMessageEntity.builder()
+ .topic(topic)
+ .qos(qos)
+ .body(body)
+ .userProperties(userProperties)
+ .createTime(new Date())
+ .build();
+
+ RSetCache set = redissonClient.getSetCache(BootstrapKey.Redis.REDIS_OFFLINE_MESSAGE_PREFIX_KEY + topic);
+ set.add(offlineMessageEntity, 7, TimeUnit.DAYS);
+ } catch (Exception e) {
+ log.error("saveOfflineMessage error message: {}", topic, e);
+ }
+ }
+
}
diff --git a/smqtt-rule/smqtt-rule-engine/src/main/java/io/github/quickmsg/rule/node/TopicRuleNode.java b/smqtt-rule/smqtt-rule-engine/src/main/java/io/github/quickmsg/rule/node/TopicRuleNode.java
index 2b28c1e7300118dbe560a73bdef6a1f23472eddc..f320d965bc38d88abde60ed0c62d20f9386c815e 100644
--- a/smqtt-rule/smqtt-rule-engine/src/main/java/io/github/quickmsg/rule/node/TopicRuleNode.java
+++ b/smqtt-rule/smqtt-rule-engine/src/main/java/io/github/quickmsg/rule/node/TopicRuleNode.java
@@ -47,7 +47,7 @@ public class TopicRuleNode implements RuleNode {
log.info("rule engine TopicRuleNode request {}", heapMqttMessage);
ProtocolAdaptor protocolAdaptor = receiveContext.getProtocolAdaptor();
protocolAdaptor.chooseProtocol(MockMqttChannel.wrapClientIdentifier(heapMqttMessage.getClientIdentifier()),
- new SmqttMessage<>(getMqttMessage(heapMqttMessage),heapMqttMessage.getTimestamp(),Boolean.TRUE), receiveContext);
+ new SmqttMessage<>(getMqttMessage(heapMqttMessage), heapMqttMessage.getTimestamp(), Boolean.TRUE), receiveContext);
executeNext(contextView);
}
@@ -58,7 +58,8 @@ public class TopicRuleNode implements RuleNode {
MqttQoS.valueOf(heapMqttMessage.getQos()),
0,
this.topic,
- PooledByteBufAllocator.DEFAULT.buffer().writeBytes(heapMqttMessage.getMessage()));
+ PooledByteBufAllocator.DEFAULT.buffer().writeBytes(heapMqttMessage.getMessage()),
+ heapMqttMessage.getProperties());
}
diff --git a/smqtt-spring-boot-starter/src/main/java/io/github/quickmsg/starter/AutoMqttConfiguration.java b/smqtt-spring-boot-starter/src/main/java/io/github/quickmsg/starter/AutoMqttConfiguration.java
index 60132949c90165647e253c05f617e0dabb137996..ba7bcae6b35b4a997ef83d0face3c354619b85e6 100644
--- a/smqtt-spring-boot-starter/src/main/java/io/github/quickmsg/starter/AutoMqttConfiguration.java
+++ b/smqtt-spring-boot-starter/src/main/java/io/github/quickmsg/starter/AutoMqttConfiguration.java
@@ -35,6 +35,7 @@ public class AutoMqttConfiguration {
.httpConfig(springBootstrapConfig.getHttp())
.websocketConfig(springBootstrapConfig.getWs())
.clusterConfig(springBootstrapConfig.getCluster())
+ .persistenceType(springBootstrapConfig.getPersistenceType())
.redisConfig(springBootstrapConfig.getRedis())
.databaseConfig(springBootstrapConfig.getDb())
.ruleChainDefinitions(springBootstrapConfig.getRules())
diff --git a/smqtt-spring-boot-starter/src/main/java/io/github/quickmsg/starter/SpringBootstrapConfig.java b/smqtt-spring-boot-starter/src/main/java/io/github/quickmsg/starter/SpringBootstrapConfig.java
index 7bf84d6c16b50882d8fccfff0c286a929f916705..b49ab112c1015ea6dd7efe4f3e972c7bb9d219ae 100644
--- a/smqtt-spring-boot-starter/src/main/java/io/github/quickmsg/starter/SpringBootstrapConfig.java
+++ b/smqtt-spring-boot-starter/src/main/java/io/github/quickmsg/starter/SpringBootstrapConfig.java
@@ -63,6 +63,10 @@ public class SpringBootstrapConfig {
private BootstrapConfig.MeterConfig meter;
+ /**
+ * 持久化方式,0:内存(默认),1:Redis,2:DB
+ */
+ private Integer persistenceType;
/**
* 数据库配置
*/