diff --git a/smqtt-bootstrap/pom.xml b/smqtt-bootstrap/pom.xml index 1f67f6d19a35971acb604dc67d441429e65b3469..dddb998cd53713e5caade0066e83ac9340ecdc64 100644 --- a/smqtt-bootstrap/pom.xml +++ b/smqtt-bootstrap/pom.xml @@ -57,6 +57,21 @@ io.github.quickmsg 1.1.2 + + org.springframework.boot + spring-boot + 2.2.1.RELEASE + + + org.springframework.boot + spring-boot-autoconfigure + 2.2.1.RELEASE + + + io.github.quickmsg + smqtt-spring-boot-starter + 1.1.2 + diff --git a/smqtt-bootstrap/src/main/java/io/github/quickmsg/AbstractStarter.java b/smqtt-bootstrap/src/main/java/io/github/quickmsg/AbstractStarter.java index a41fc41d5b650df8ceebad70243c64562245edf9..a26dffdedd08dc9744eae506a30266df3f8d1068 100644 --- a/smqtt-bootstrap/src/main/java/io/github/quickmsg/AbstractStarter.java +++ b/smqtt-bootstrap/src/main/java/io/github/quickmsg/AbstractStarter.java @@ -53,6 +53,7 @@ public abstract class AbstractStarter { .httpConfig(config.getSmqttConfig().getHttpConfig()) .websocketConfig(config.getSmqttConfig().getWebsocketConfig()) .clusterConfig(config.getSmqttConfig().getClusterConfig()) + .persistenceType(config.getPersistenceType()) .redisConfig(config.getSmqttConfig().getRedisConfig()) .databaseConfig(config.getSmqttConfig().getDatabaseConfig()) .meterConfig(config.getSmqttConfig().getMeterConfig()) diff --git a/smqtt-bootstrap/src/main/java/io/github/quickmsg/springboot/SpringBootStarter.java b/smqtt-bootstrap/src/main/java/io/github/quickmsg/springboot/SpringBootStarter.java new file mode 100644 index 0000000000000000000000000000000000000000..4bb5e3e1c12b9e3b0650351aa778f6362768f22f --- /dev/null +++ b/smqtt-bootstrap/src/main/java/io/github/quickmsg/springboot/SpringBootStarter.java @@ -0,0 +1,20 @@ +package io.github.quickmsg.springboot; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.context.annotation.ComponentScan; + +import io.github.quickmsg.starter.EnableMqttServer; + +/** + * @Author: Jingwu.Zhou + * @Date: 2022/1/24 + */ +@SpringBootApplication +@ComponentScan(basePackages = {"io.github.quickmsg"}) +@EnableMqttServer +public class SpringBootStarter { + public static void main(String[] args) { + SpringApplication.run(SpringBootStarter.class, args); + } +} diff --git a/smqtt-bootstrap/src/main/resources/application.yml b/smqtt-bootstrap/src/main/resources/application.yml new file mode 100644 index 0000000000000000000000000000000000000000..62e3393d6f623aedd6a61331831ef23651f91763 --- /dev/null +++ b/smqtt-bootstrap/src/main/resources/application.yml @@ -0,0 +1,79 @@ + smqtt: + logLevel: INFO # 系统日志 + tcp: # tcp配置 + port: 1883 # mqtt端口号 + username: smqtt # mqtt连接默认用户名 生产环境建议spi去注入PasswordAuthentication接口 + password: smqtt # mqtt连接默认密码 生产环境建议spi去注入PasswordAuthentication接口 + wiretap: true # 二进制日志 前提是 smqtt.logLevel = DEBUG + bossThreadSize: 4 # boss线程 + workThreadSize: 8 # work线程 + lowWaterMark: 4000000 # 不建议配置 默认 32768 + highWaterMark: 80000000 # 不建议配置 默认 65536 + businessThreadSize: 16 # 业务线程数 默认=cpu核心数*10 + businessQueueSize: 100000 #业务队列 默认=100000 + # globalReadWriteSize: 10000000,100000000 全局读写大小限制 + # channelReadWriteSize: 10000000,100000000 单个channel读写大小限制 + ssl: # ssl配置 + enable: false # 开关 + key: /user/server.key # 指定ssl文件 默认系统生成 + crt: /user/server.crt # 指定ssl文件 默认系统生成 + http: # http相关配置 端口固定60000 + enable: true # 开关 + accessLog: false # http访问日志 + ssl: # ssl配置 + enable: false + admin: # 后台管理配置 + enable: true # 开关 + username: smqtt # 访问用户名 + password: smqtt # 访问密码 + ws: # websocket配置 + enable: false # 开关 + port: 8999 # 端口 + path: /mqtt # ws 的访问path mqtt.js请设置此选项 + cluster: # 集群配置 + enable: false # 集群开关 + url: 127.0.0.1:7771,127.0.0.1:7772 # 启动节点 + port: 7771 # 端口 + node: node-1 # 集群节点名称 唯一 + external: + host: localhost # 用于映射容器ip 请不要随意设置,如果不需要请移除此选项 + port: 7777 # 用于映射容器端口 请不要随意设置,如果不需要请移除此选项 + meter: + meterType: PROMETHEUS # INFLUXDB , PROMETHEUS + + persistenceType: 1 # 持久化方式,0:内存(默认),1:Redis,2:DB + db: # 参数值配置参考https://github.com/brettwooldridge/HikariCP/wiki/MySQL-Configuration + jdbcUrl: jdbc:mysql://172.16.1.77:3306/smqtt + username: root + password: jx@xw!@#$~|{} + dataSourceCachePrepStmts: false + dataSourcePrepStmtCacheSize: 250 + dataSourcePrepStmtCacheSqlLimit: 2048 + dataSourceUseServerPrepStmts: true + dataSourceUseLocalSessionState: true + dataSourceRewriteBatchedStatements: true + dataSourceCacheResultSetMetadata: true + dataSourceCacheServerConfiguration: true + dataSourceElideSetAutoCommits: true + dataSourceMaintainTimeStats: false + redis: # redis 请参考 https://doc.smqtt.cc/%E5%85%B6%E4%BB%96/1.store.html 【如果没有引入相关依赖请移除此配置】 + mode: sentinel + database: 7 + password: xuanwu-T3st*17 + timeout: 3000 + poolMinIdle: 8 + poolConnTimeout: 3000 + poolSize: 10 + single: + address: 172.16.1.16:6392 + cluster: + scanInterval: 1000 + nodes: 172.16.1.16:6390,172.16.1.16:6391,172.16.1.16:6392 + readMode: SLAVE + retryAttempts: 3 + slaveConnectionPoolSize: 64 + masterConnectionPoolSize: 64 + retryInterval: 1500 + sentinel: + master: mq_master + nodes: 172.16.1.16:26371,172.16.1.16:26372,172.16.1.16:26373 \ No newline at end of file diff --git a/smqtt-bootstrap/src/main/resources/test.yaml b/smqtt-bootstrap/src/main/resources/test.yaml deleted file mode 100644 index e4203080c7a10e0811258fd56cb1cf8b0f78465e..0000000000000000000000000000000000000000 --- a/smqtt-bootstrap/src/main/resources/test.yaml +++ /dev/null @@ -1,3 +0,0 @@ -smqtt: - tcp: - port: 7000 \ No newline at end of file diff --git a/smqtt-common/src/main/java/io/github/quickmsg/common/bootstrap/BootstrapKey.java b/smqtt-common/src/main/java/io/github/quickmsg/common/bootstrap/BootstrapKey.java index ad5a890440d307136f1e9d9571441c9df4f33736..bd7da1b0cf77f7b9c6426c927b41ca9b3459ae3f 100644 --- a/smqtt-common/src/main/java/io/github/quickmsg/common/bootstrap/BootstrapKey.java +++ b/smqtt-common/src/main/java/io/github/quickmsg/common/bootstrap/BootstrapKey.java @@ -14,6 +14,9 @@ public class BootstrapKey { /*redis前缀key*/ public static final String REDIS_SESSION_MESSAGE_PREFIX_KEY = "smqtt:session:message:"; + /*redis前缀key 离线消息*/ + public static final String REDIS_OFFLINE_MESSAGE_PREFIX_KEY = "smqtt:offline:message:"; + /*模式*/ public static final String REDIS_MODE = "redis.mode"; diff --git a/smqtt-common/src/main/java/io/github/quickmsg/common/config/BootstrapConfig.java b/smqtt-common/src/main/java/io/github/quickmsg/common/config/BootstrapConfig.java index 6776a4da13c2a270c997bd50798e6862b300247e..c563a3d5bdfb9ca8739681ab2706393016e3c58e 100644 --- a/smqtt-common/src/main/java/io/github/quickmsg/common/config/BootstrapConfig.java +++ b/smqtt-common/src/main/java/io/github/quickmsg/common/config/BootstrapConfig.java @@ -279,6 +279,11 @@ public class BootstrapConfig { private ClusterExternal external; } + /** + * 持久化方式,0:内存(默认),1:Redis,2:DB + */ + private Integer persistenceType; + @Data @Builder @NoArgsConstructor diff --git a/smqtt-common/src/main/java/io/github/quickmsg/common/config/Configuration.java b/smqtt-common/src/main/java/io/github/quickmsg/common/config/Configuration.java index ca994d2b793e07445cb35b8d9d21f7d34250ec6e..de77bc9affc551ac4748f0fe4fc369732b2daea0 100644 --- a/smqtt-common/src/main/java/io/github/quickmsg/common/config/Configuration.java +++ b/smqtt-common/src/main/java/io/github/quickmsg/common/config/Configuration.java @@ -122,8 +122,12 @@ public interface Configuration { */ BootstrapConfig.MeterConfig getMeterConfig(); - - + /** + * 获取持久化方式 + * + * @return + */ + Integer getPersistenceType(); diff --git a/smqtt-common/src/main/java/io/github/quickmsg/common/interceptor/MessageProxy.java b/smqtt-common/src/main/java/io/github/quickmsg/common/interceptor/MessageProxy.java index cb6d17b619f128b6376236f54806ea4c049095b9..e34f7c8ab2697a988d3869a33468ae79f114b8da 100644 --- a/smqtt-common/src/main/java/io/github/quickmsg/common/interceptor/MessageProxy.java +++ b/smqtt-common/src/main/java/io/github/quickmsg/common/interceptor/MessageProxy.java @@ -78,6 +78,7 @@ public class MessageProxy { .topic(header.topicName()) .retain(fixedHeader.isRetain()) .qos(fixedHeader.qosLevel().value()) + .properties(header.properties()) .build(); } diff --git a/smqtt-common/src/main/java/io/github/quickmsg/common/message/HeapMqttMessage.java b/smqtt-common/src/main/java/io/github/quickmsg/common/message/HeapMqttMessage.java index 19c340dac9389abc45642e94393119623571ad09..2ab22342784775c9b8d3cc01102251df3380543e 100644 --- a/smqtt-common/src/main/java/io/github/quickmsg/common/message/HeapMqttMessage.java +++ b/smqtt-common/src/main/java/io/github/quickmsg/common/message/HeapMqttMessage.java @@ -1,6 +1,7 @@ package io.github.quickmsg.common.message; import io.github.quickmsg.common.utils.JacksonUtil; +import io.netty.handler.codec.mqtt.MqttProperties; import io.netty.util.internal.StringUtil; import lombok.AllArgsConstructor; import lombok.Builder; @@ -31,6 +32,8 @@ public class HeapMqttMessage { private byte[] message; + private MqttProperties properties; + public Map getKeyMap() { Map keys = new HashMap<>(5); diff --git a/smqtt-common/src/main/java/io/github/quickmsg/common/message/MessageRegistry.java b/smqtt-common/src/main/java/io/github/quickmsg/common/message/MessageRegistry.java index c21887dba432d7aa5498b958707ec25a005e632f..3a5d4a6f8e2bd62f37a3fcfbdfce88a7992e79a5 100644 --- a/smqtt-common/src/main/java/io/github/quickmsg/common/message/MessageRegistry.java +++ b/smqtt-common/src/main/java/io/github/quickmsg/common/message/MessageRegistry.java @@ -47,4 +47,18 @@ public interface MessageRegistry extends StartUp { List getRetainMessage(String topic); + /** + * 持久化离线消息 + * + * @param message + */ + void saveOfflineMessage(OfflineMessage message); + + /** + * 获取离线消息 + * + * @param topic + * @return + */ + List getOfflineMessage(String topic); } diff --git a/smqtt-common/src/main/java/io/github/quickmsg/common/message/MqttMessageBuilder.java b/smqtt-common/src/main/java/io/github/quickmsg/common/message/MqttMessageBuilder.java index d6a16539cc6c0dfaa5f0e63141210ed658e9ea7f..5346a17693eb9bbb3f86e9ab851216c3a343735c 100644 --- a/smqtt-common/src/main/java/io/github/quickmsg/common/message/MqttMessageBuilder.java +++ b/smqtt-common/src/main/java/io/github/quickmsg/common/message/MqttMessageBuilder.java @@ -4,6 +4,13 @@ import io.netty.buffer.ByteBuf; import io.netty.handler.codec.mqtt.*; import java.util.List; +import java.util.Map; + +import static io.netty.handler.codec.mqtt.MqttConnectReturnCode.CONNECTION_REFUSED_BAD_USERNAME_OR_PASSWORD; +import static io.netty.handler.codec.mqtt.MqttConnectReturnCode.CONNECTION_REFUSED_CLIENT_IDENTIFIER_NOT_VALID; +import static io.netty.handler.codec.mqtt.MqttConnectReturnCode.CONNECTION_REFUSED_NOT_AUTHORIZED_5; +import static io.netty.handler.codec.mqtt.MqttConnectReturnCode.CONNECTION_REFUSED_SERVER_UNAVAILABLE_5; +import static io.netty.handler.codec.mqtt.MqttConnectReturnCode.CONNECTION_REFUSED_UNSUPPORTED_PROTOCOL_VERSION; /** @@ -11,6 +18,32 @@ import java.util.List; */ public class MqttMessageBuilder { + private static MqttProperties genMqttProperties(Map userPropertiesMap) { + MqttProperties mqttProperties = null; + if (userPropertiesMap != null) { + mqttProperties = new MqttProperties(); + MqttProperties.UserProperties userProperties = new MqttProperties.UserProperties(); + for (Map.Entry entry : userPropertiesMap.entrySet()) { + userProperties.add(entry.getKey(), entry.getValue()); + } + mqttProperties.add(userProperties); + } + return mqttProperties; + } + + public static MqttPublishMessage buildPub(boolean isDup, MqttQoS qoS, int messageId, String topic, ByteBuf message, MqttProperties properties) { + MqttFixedHeader mqttFixedHeader = new MqttFixedHeader(MqttMessageType.PUBLISH, isDup, qoS, false, 0); + MqttPublishVariableHeader mqttPublishVariableHeader = new MqttPublishVariableHeader(topic, messageId, properties); + MqttPublishMessage mqttPublishMessage = new MqttPublishMessage(mqttFixedHeader, mqttPublishVariableHeader, message); + return mqttPublishMessage; + } + + public static MqttPublishMessage buildPub(boolean isDup, MqttQoS qoS, int messageId, String topic, ByteBuf message, Map userPropertiesMap) { + MqttFixedHeader mqttFixedHeader = new MqttFixedHeader(MqttMessageType.PUBLISH, isDup, qoS, false, 0); + MqttPublishVariableHeader mqttPublishVariableHeader = new MqttPublishVariableHeader(topic, messageId, genMqttProperties(userPropertiesMap)); + MqttPublishMessage mqttPublishMessage = new MqttPublishMessage(mqttFixedHeader, mqttPublishVariableHeader, message); + return mqttPublishMessage; + } public static MqttPublishMessage buildPub(boolean isDup, MqttQoS qoS, int messageId, String topic, ByteBuf message) { MqttFixedHeader mqttFixedHeader = new MqttFixedHeader(MqttMessageType.PUBLISH, isDup, qoS, false, 0); @@ -69,8 +102,33 @@ public class MqttMessageBuilder { return new MqttUnsubAckMessage(mqttFixedHeader, variableHeader); } - public static MqttConnAckMessage buildConnectAck(MqttConnectReturnCode connectReturnCode) { - MqttConnAckVariableHeader mqttConnAckVariableHeader = new MqttConnAckVariableHeader(connectReturnCode, false); + public static MqttConnAckMessage buildConnectAck(MqttConnectReturnCode connectReturnCode, int version) { + MqttProperties properties = MqttProperties.NO_PROPERTIES; + if (MqttVersion.MQTT_5.protocolLevel() == (byte) version) { + properties = new MqttProperties(); + properties.add(new MqttProperties.IntegerProperty(MqttProperties.MqttPropertyType.RETAIN_AVAILABLE.value(), 1)); + // don't support shared subscription + properties.add(new MqttProperties.IntegerProperty(MqttProperties.MqttPropertyType.SHARED_SUBSCRIPTION_AVAILABLE.value(), 0)); + switch (connectReturnCode) { + case CONNECTION_REFUSED_IDENTIFIER_REJECTED: + connectReturnCode = CONNECTION_REFUSED_CLIENT_IDENTIFIER_NOT_VALID; + break; + case CONNECTION_REFUSED_UNACCEPTABLE_PROTOCOL_VERSION: + connectReturnCode = CONNECTION_REFUSED_UNSUPPORTED_PROTOCOL_VERSION; + break; + case CONNECTION_REFUSED_SERVER_UNAVAILABLE: + connectReturnCode = CONNECTION_REFUSED_SERVER_UNAVAILABLE_5; + break; + case CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD: + connectReturnCode = CONNECTION_REFUSED_BAD_USERNAME_OR_PASSWORD; + break; + case CONNECTION_REFUSED_NOT_AUTHORIZED: + connectReturnCode = CONNECTION_REFUSED_NOT_AUTHORIZED_5; + break; + + } + } + MqttConnAckVariableHeader mqttConnAckVariableHeader = new MqttConnAckVariableHeader(connectReturnCode, false, properties); MqttFixedHeader mqttFixedHeader = new MqttFixedHeader( MqttMessageType.CONNACK, false, MqttQoS.AT_MOST_ONCE, false, 0X02); return new MqttConnAckMessage(mqttFixedHeader, mqttConnAckVariableHeader); diff --git a/smqtt-common/src/main/java/io/github/quickmsg/common/message/OfflineMessage.java b/smqtt-common/src/main/java/io/github/quickmsg/common/message/OfflineMessage.java new file mode 100644 index 0000000000000000000000000000000000000000..9771c5657bf2410287f92d32279b32d1fcb3f9f1 --- /dev/null +++ b/smqtt-common/src/main/java/io/github/quickmsg/common/message/OfflineMessage.java @@ -0,0 +1,62 @@ +package io.github.quickmsg.common.message; + +import java.util.HashMap; +import java.util.Optional; + +import io.github.quickmsg.common.channel.MqttChannel; +import io.github.quickmsg.common.utils.JacksonUtil; +import io.github.quickmsg.common.utils.MessageUtils; +import io.netty.buffer.PooledByteBufAllocator; +import io.netty.handler.codec.mqtt.MqttProperties; +import io.netty.handler.codec.mqtt.MqttPublishMessage; +import io.netty.handler.codec.mqtt.MqttPublishVariableHeader; +import io.netty.handler.codec.mqtt.MqttQoS; +import lombok.Builder; +import lombok.Data; + +/** + * @Author: Jingwu.Zhou + * @Date: 2022/1/24 + */ +@Data +@Builder +public class OfflineMessage { + private int qos; + + private String topic; + + private byte[] body; + + private String userProperties; + + public static OfflineMessage of(MqttPublishMessage mqttPublishMessage) { + MqttPublishVariableHeader publishVariableHeader = mqttPublishMessage.variableHeader(); + return OfflineMessage.builder() + .topic(publishVariableHeader.topicName()) + .qos(mqttPublishMessage.fixedHeader().qosLevel().value()) + .body(MessageUtils.copyByteBuf(mqttPublishMessage.payload())) + .userProperties(JacksonUtil.map2Json(Optional.ofNullable(publishVariableHeader + .properties() + .getProperties(MqttProperties.MqttPropertyType.USER_PROPERTY.value())) + .map(list -> { + HashMap propertiesMap = new HashMap<>(list.size()); + list.forEach(property -> { + MqttProperties.StringPair pair = (MqttProperties.StringPair) property.value(); + propertiesMap.put(pair.key, pair.value); + }); + return propertiesMap; + }).orElseGet(HashMap::new))) + .build(); + } + + public MqttPublishMessage toPublishMessage(MqttChannel mqttChannel) { + return MqttMessageBuilder.buildPub( + false, + MqttQoS.valueOf(this.qos), + qos > 0 ? mqttChannel.generateMessageId() : 0, + topic, + PooledByteBufAllocator.DEFAULT.directBuffer().writeBytes(body), + JacksonUtil.json2Map(userProperties, String.class, String.class)); + } + +} diff --git a/smqtt-common/src/main/java/io/github/quickmsg/common/message/RetainMessage.java b/smqtt-common/src/main/java/io/github/quickmsg/common/message/RetainMessage.java index fbc623529cdcbc7e0f9fb110feb2f8899ebc30e4..2d67c21e50281155a26fec9f839a6bfa3c8929a7 100644 --- a/smqtt-common/src/main/java/io/github/quickmsg/common/message/RetainMessage.java +++ b/smqtt-common/src/main/java/io/github/quickmsg/common/message/RetainMessage.java @@ -1,8 +1,13 @@ package io.github.quickmsg.common.message; +import java.util.HashMap; +import java.util.Optional; + import io.github.quickmsg.common.channel.MqttChannel; +import io.github.quickmsg.common.utils.JacksonUtil; import io.github.quickmsg.common.utils.MessageUtils; import io.netty.buffer.PooledByteBufAllocator; +import io.netty.handler.codec.mqtt.MqttProperties; import io.netty.handler.codec.mqtt.MqttPublishMessage; import io.netty.handler.codec.mqtt.MqttPublishVariableHeader; import io.netty.handler.codec.mqtt.MqttQoS; @@ -22,12 +27,25 @@ public class RetainMessage { private byte[] body; + private String userProperties; + public static RetainMessage of(MqttPublishMessage mqttPublishMessage) { MqttPublishVariableHeader publishVariableHeader = mqttPublishMessage.variableHeader(); return RetainMessage.builder() .topic(publishVariableHeader.topicName()) .qos(mqttPublishMessage.fixedHeader().qosLevel().value()) .body(MessageUtils.copyByteBuf(mqttPublishMessage.payload())) + .userProperties(JacksonUtil.map2Json(Optional.ofNullable(publishVariableHeader + .properties() + .getProperties(MqttProperties.MqttPropertyType.USER_PROPERTY.value())) + .map(list -> { + HashMap propertiesMap = new HashMap<>(list.size()); + list.forEach(property -> { + MqttProperties.StringPair pair = (MqttProperties.StringPair) property.value(); + propertiesMap.put(pair.key, pair.value); + }); + return propertiesMap; + }).orElseGet(HashMap::new))) .build(); } @@ -37,7 +55,8 @@ public class RetainMessage { MqttQoS.valueOf(this.qos), qos > 0 ? mqttChannel.generateMessageId() : 0, topic, - PooledByteBufAllocator.DEFAULT.directBuffer().writeBytes(body)); + PooledByteBufAllocator.DEFAULT.directBuffer().writeBytes(body), + JacksonUtil.json2Map(userProperties, String.class, String.class)); } } diff --git a/smqtt-common/src/main/java/io/github/quickmsg/common/message/SessionMessage.java b/smqtt-common/src/main/java/io/github/quickmsg/common/message/SessionMessage.java index a3596af3aa8e4686cf1786d39598480c42efc2d8..f5aecea5ff339c029a868517f2e1d7fb8c243de8 100644 --- a/smqtt-common/src/main/java/io/github/quickmsg/common/message/SessionMessage.java +++ b/smqtt-common/src/main/java/io/github/quickmsg/common/message/SessionMessage.java @@ -1,8 +1,13 @@ package io.github.quickmsg.common.message; +import java.util.HashMap; +import java.util.Optional; + import io.github.quickmsg.common.channel.MqttChannel; +import io.github.quickmsg.common.utils.JacksonUtil; import io.github.quickmsg.common.utils.MessageUtils; import io.netty.buffer.PooledByteBufAllocator; +import io.netty.handler.codec.mqtt.MqttProperties; import io.netty.handler.codec.mqtt.MqttPublishMessage; import io.netty.handler.codec.mqtt.MqttPublishVariableHeader; import io.netty.handler.codec.mqtt.MqttQoS; @@ -27,6 +32,8 @@ public class SessionMessage { private boolean retain; + private String userProperties; + public static SessionMessage of(String clientIdentifier, MqttPublishMessage mqttPublishMessage) { MqttPublishVariableHeader publishVariableHeader = mqttPublishMessage.variableHeader(); return SessionMessage.builder() @@ -35,6 +42,17 @@ public class SessionMessage { .qos(mqttPublishMessage.fixedHeader().qosLevel().value()) .retain(mqttPublishMessage.fixedHeader().isRetain()) .body(MessageUtils.copyByteBuf(mqttPublishMessage.payload())) + .userProperties(JacksonUtil.map2Json(Optional.ofNullable(publishVariableHeader + .properties() + .getProperties(MqttProperties.MqttPropertyType.USER_PROPERTY.value())) + .map(list -> { + HashMap propertiesMap = new HashMap<>(list.size()); + list.forEach(property -> { + MqttProperties.StringPair pair = (MqttProperties.StringPair) property.value(); + propertiesMap.put(pair.key, pair.value); + }); + return propertiesMap; + }).orElseGet(HashMap::new))) .build(); } @@ -44,7 +62,8 @@ public class SessionMessage { MqttQoS.valueOf(this.qos), qos > 0 ? mqttChannel.generateMessageId() : 0, topic, - PooledByteBufAllocator.DEFAULT.directBuffer().writeBytes(body)); + PooledByteBufAllocator.DEFAULT.directBuffer().writeBytes(body), + JacksonUtil.json2Map(userProperties, String.class, String.class)); } } diff --git a/smqtt-common/src/main/java/io/github/quickmsg/common/utils/MessageUtils.java b/smqtt-common/src/main/java/io/github/quickmsg/common/utils/MessageUtils.java index fa197573b91107d755751afe9c9cf6c6e2877aa1..1b949a1639f554c09cee0e025b6eee41690182fa 100644 --- a/smqtt-common/src/main/java/io/github/quickmsg/common/utils/MessageUtils.java +++ b/smqtt-common/src/main/java/io/github/quickmsg/common/utils/MessageUtils.java @@ -67,7 +67,8 @@ public class MessageUtils { MqttPublishVariableHeader mqttPublishVariableHeader = message.variableHeader(); MqttFixedHeader mqttFixedHeader = message.fixedHeader(); MqttFixedHeader newFixedHeader = new MqttFixedHeader(mqttFixedHeader.messageType(), false, mqttQoS, false, mqttFixedHeader.remainingLength()); - MqttPublishVariableHeader newHeader = new MqttPublishVariableHeader(mqttPublishVariableHeader.topicName(), messageId); + // mqtt 5 support properties + MqttPublishVariableHeader newHeader = new MqttPublishVariableHeader(mqttPublishVariableHeader.topicName(), messageId, mqttPublishVariableHeader.properties()); return new MqttPublishMessage(newFixedHeader, newHeader, message.payload().copy()); } diff --git a/smqtt-core/pom.xml b/smqtt-core/pom.xml index dbb26bfb59fcfad27bf26045f1afee2353a59415..be754e809a6fec72ef710ec6d16043eb366b682c 100644 --- a/smqtt-core/pom.xml +++ b/smqtt-core/pom.xml @@ -31,6 +31,16 @@ smqtt-metric-prometheus 1.1.2 + + io.github.quickmsg + smqtt-persistent-redis + 1.1.2 + + + io.github.quickmsg + smqtt-persistent-db + 1.1.2 + \ No newline at end of file diff --git a/smqtt-core/src/main/java/io/github/quickmsg/core/Bootstrap.java b/smqtt-core/src/main/java/io/github/quickmsg/core/Bootstrap.java index 2ab1843fde8d307151c4c7001508d22c63f2d7d7..407ed515b3c822ab82464c8bfa5c50e40032f730 100644 --- a/smqtt-core/src/main/java/io/github/quickmsg/core/Bootstrap.java +++ b/smqtt-core/src/main/java/io/github/quickmsg/core/Bootstrap.java @@ -43,6 +43,8 @@ public class Bootstrap { private BootstrapConfig.ClusterConfig clusterConfig; + private Integer persistenceType; + private BootstrapConfig.RedisConfig redisConfig; private BootstrapConfig.DatabaseConfig databaseConfig; @@ -94,6 +96,7 @@ public class Bootstrap { Optional.ofNullable(tcpConfig.getMessageMaxSize()).ifPresent(mqttConfiguration::setMessageMaxSize); Optional.ofNullable(clusterConfig).ifPresent(mqttConfiguration::setClusterConfig); Optional.ofNullable(meterConfig).ifPresent(mqttConfiguration::setMeterConfig); + Optional.ofNullable(persistenceType).ifPresent(mqttConfiguration::setPersistenceType); if (websocketConfig != null && websocketConfig.isEnable()) { mqttConfiguration.setWebSocketPort(websocketConfig.getPort()); diff --git a/smqtt-core/src/main/java/io/github/quickmsg/core/cluster/ClusterReceiver.java b/smqtt-core/src/main/java/io/github/quickmsg/core/cluster/ClusterReceiver.java index d751f33ba3eb3b42e7f01106fdb31dc4cc4e4ec3..62d5f3a39dbf0be92f69cd5092cd3c3c2211d79f 100644 --- a/smqtt-core/src/main/java/io/github/quickmsg/core/cluster/ClusterReceiver.java +++ b/smqtt-core/src/main/java/io/github/quickmsg/core/cluster/ClusterReceiver.java @@ -56,7 +56,7 @@ public class ClusterReceiver { MqttQoS.valueOf(heapMqttMessage.getQos()), 0, heapMqttMessage.getTopic(), - PooledByteBufAllocator.DEFAULT.buffer().writeBytes(heapMqttMessage.getMessage())), System.currentTimeMillis(), Boolean.TRUE); + PooledByteBufAllocator.DEFAULT.buffer().writeBytes(heapMqttMessage.getMessage()), heapMqttMessage.getProperties()), System.currentTimeMillis(), Boolean.TRUE); } } diff --git a/smqtt-core/src/main/java/io/github/quickmsg/core/http/HttpConfiguration.java b/smqtt-core/src/main/java/io/github/quickmsg/core/http/HttpConfiguration.java index 12fc2b003a703d0bb4a7593d0ae6078fcd258284..8c9a6161e1a7c903a6626e090e5f86732b039540 100644 --- a/smqtt-core/src/main/java/io/github/quickmsg/core/http/HttpConfiguration.java +++ b/smqtt-core/src/main/java/io/github/quickmsg/core/http/HttpConfiguration.java @@ -37,6 +37,8 @@ public class HttpConfiguration implements Configuration { private BootstrapConfig.MeterConfig meterConfig; + private Integer persistenceType = 0; + @Override public ConnectModel getConnectModel() { diff --git a/smqtt-core/src/main/java/io/github/quickmsg/core/mqtt/AbstractReceiveContext.java b/smqtt-core/src/main/java/io/github/quickmsg/core/mqtt/AbstractReceiveContext.java index 288e898e0ed8a0db2bbbafb88711cd2882db4864..2b78f347bcd47d769ee63d4eb2a3707d293710be 100644 --- a/smqtt-core/src/main/java/io/github/quickmsg/core/mqtt/AbstractReceiveContext.java +++ b/smqtt-core/src/main/java/io/github/quickmsg/core/mqtt/AbstractReceiveContext.java @@ -30,6 +30,8 @@ import io.github.quickmsg.core.spi.DefaultTopicRegistry; import io.github.quickmsg.dsl.RuleDslParser; import io.github.quickmsg.metric.InfluxDbMetricFactory; import io.github.quickmsg.metric.PrometheusMetricFactory; +import io.github.quickmsg.persistent.registry.DbMessageRegistry; +import io.github.quickmsg.persistent.registry.RedisMessageRegistry; import io.github.quickmsg.rule.source.SourceManager; import io.netty.handler.traffic.GlobalChannelTrafficShapingHandler; import io.netty.handler.traffic.GlobalTrafficShapingHandler; @@ -91,7 +93,7 @@ public abstract class AbstractReceiveContext implements this.topicRegistry = topicRegistry(); this.loopResources = LoopResources.create("smqtt-cluster-io", configuration.getBossThreadSize(), configuration.getWorkThreadSize(), true); this.trafficHandlerLoader = trafficHandlerLoader(); - this.messageRegistry = messageRegistry(); + this.messageRegistry = messageRegistry(abstractConfiguration.getPersistenceType()); this.clusterRegistry = clusterRegistry(); this.passwordAuthentication = basicAuthentication(); this.channelRegistry.startUp(abstractConfiguration.getEnvironmentMap()); @@ -122,8 +124,15 @@ public abstract class AbstractReceiveContext implements return Event::sender; } - private MessageRegistry messageRegistry() { - return Optional.ofNullable(MessageRegistry.INSTANCE).orElseGet(DefaultMessageRegistry::new); + private MessageRegistry messageRegistry(int persistenceType) { + switch (persistenceType) { + case 1: + return new RedisMessageRegistry(); + case 2: + return new DbMessageRegistry(); + default: + return new DefaultMessageRegistry(); + } } private PasswordAuthentication basicAuthentication() { diff --git a/smqtt-core/src/main/java/io/github/quickmsg/core/mqtt/MqttConfiguration.java b/smqtt-core/src/main/java/io/github/quickmsg/core/mqtt/MqttConfiguration.java index 03bda94cb65e1bc261e3fb8cc9c0e1deaf84bf53..e3444aafe77a0bdeb9cdb9c5a2a7d30046c8631a 100644 --- a/smqtt-core/src/main/java/io/github/quickmsg/core/mqtt/MqttConfiguration.java +++ b/smqtt-core/src/main/java/io/github/quickmsg/core/mqtt/MqttConfiguration.java @@ -69,4 +69,6 @@ public class MqttConfiguration extends AbstractSslHandler implements AbstractCon private Integer messageMaxSize = 4194304; + private Integer persistenceType = 0; + } diff --git a/smqtt-core/src/main/java/io/github/quickmsg/core/protocol/ConnectProtocol.java b/smqtt-core/src/main/java/io/github/quickmsg/core/protocol/ConnectProtocol.java index 6e6bd47a320e2d820938b5e5f7a4f306e5c15a18..e90042add3e51b4fc70d8ec7486c70ab5102a435 100644 --- a/smqtt-core/src/main/java/io/github/quickmsg/core/protocol/ConnectProtocol.java +++ b/smqtt-core/src/main/java/io/github/quickmsg/core/protocol/ConnectProtocol.java @@ -68,7 +68,7 @@ public class ConnectProtocol implements Protocol { if (mqttReceiveContext.getConfiguration().getConnectModel() == ConnectModel.UNIQUE) { if (channelRegistry.exists(clientIdentifier)) { return mqttChannel.write( - MqttMessageBuilder.buildConnectAck(MqttConnectReturnCode.CONNECTION_REFUSED_IDENTIFIER_REJECTED), + MqttMessageBuilder.buildConnectAck(MqttConnectReturnCode.CONNECTION_REFUSED_IDENTIFIER_REJECTED, mqttConnectVariableHeader.version()), false).then(mqttChannel.close()); } } else { @@ -78,16 +78,16 @@ public class ConnectProtocol implements Protocol { existMqttChannel.close().subscribe(); } else { return mqttChannel.write( - MqttMessageBuilder.buildConnectAck(MqttConnectReturnCode.CONNECTION_REFUSED_IDENTIFIER_REJECTED), + MqttMessageBuilder.buildConnectAck(MqttConnectReturnCode.CONNECTION_REFUSED_IDENTIFIER_REJECTED, mqttConnectVariableHeader.version()), false).then(mqttChannel.close()); } } } /*protocol version support*/ if (MqttVersion.MQTT_3_1_1.protocolLevel() != (byte) mqttConnectVariableHeader.version() - && MqttVersion.MQTT_3_1.protocolLevel() != (byte) mqttConnectVariableHeader.version()) { + && MqttVersion.MQTT_5.protocolLevel() != (byte) mqttConnectVariableHeader.version()) { return mqttChannel.write( - MqttMessageBuilder.buildConnectAck(MqttConnectReturnCode.CONNECTION_REFUSED_UNACCEPTABLE_PROTOCOL_VERSION), + MqttMessageBuilder.buildConnectAck(MqttConnectReturnCode.CONNECTION_REFUSED_UNACCEPTABLE_PROTOCOL_VERSION, mqttConnectVariableHeader.version()), false).then(mqttChannel.close()); } /*password check*/ @@ -147,11 +147,11 @@ public class ConnectProtocol implements Protocol { eventRegistry.registry(Event.CONNECT, mqttChannel, message, mqttReceiveContext); - return mqttChannel.write(MqttMessageBuilder.buildConnectAck(MqttConnectReturnCode.CONNECTION_ACCEPTED), false) + return mqttChannel.write(MqttMessageBuilder.buildConnectAck(MqttConnectReturnCode.CONNECTION_ACCEPTED, mqttConnectVariableHeader.version()), false) .then(Mono.fromRunnable(() -> sendOfflineMessage(mqttReceiveContext.getMessageRegistry(), mqttChannel))); } else { return mqttChannel.write( - MqttMessageBuilder.buildConnectAck(MqttConnectReturnCode.CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD), + MqttMessageBuilder.buildConnectAck(MqttConnectReturnCode.CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD, mqttConnectVariableHeader.version()), false).then(mqttChannel.close()); } } catch (Exception e) { diff --git a/smqtt-core/src/main/java/io/github/quickmsg/core/protocol/PublishProtocol.java b/smqtt-core/src/main/java/io/github/quickmsg/core/protocol/PublishProtocol.java index 1a405e0799b2a5f9ce21db92daa533e2fa29afbe..eadaa2784e4c36ae45a41fdfc15aea0ed0432245 100644 --- a/smqtt-core/src/main/java/io/github/quickmsg/core/protocol/PublishProtocol.java +++ b/smqtt-core/src/main/java/io/github/quickmsg/core/protocol/PublishProtocol.java @@ -30,6 +30,8 @@ public class PublishProtocol implements Protocol { private static List MESSAGE_TYPE_LIST = new ArrayList<>(); + private static String ONE2ONE_TOPIC_PREFIX = "ipush/mt/msg/"; + static { MESSAGE_TYPE_LIST.add(MqttMessageType.PUBLISH); } @@ -52,6 +54,10 @@ public class PublishProtocol implements Protocol { MessageRegistry messageRegistry = receiveContext.getMessageRegistry(); Set mqttChannels = topicRegistry.getSubscribesByTopic(variableHeader.topicName(), message.fixedHeader().qosLevel()); + // 定制化处理,一对一发送的离线消息进行持久化,保证送达 + if (mqttChannels.isEmpty() && variableHeader.topicName().startsWith(ONE2ONE_TOPIC_PREFIX)) { + messageRegistry.saveOfflineMessage(OfflineMessage.of(message)); + } // http mock if (mqttChannel.getIsMock()) { return send(mqttChannels, message, messageRegistry, filterRetainMessage(message, messageRegistry)); diff --git a/smqtt-core/src/main/java/io/github/quickmsg/core/protocol/SubscribeProtocol.java b/smqtt-core/src/main/java/io/github/quickmsg/core/protocol/SubscribeProtocol.java index 6a1c2f4d6409c53364b74d21a25338d99eb66d72..1f7914a54a2902849b6f796a9d938bcf9268b8ed 100644 --- a/smqtt-core/src/main/java/io/github/quickmsg/core/protocol/SubscribeProtocol.java +++ b/smqtt-core/src/main/java/io/github/quickmsg/core/protocol/SubscribeProtocol.java @@ -11,13 +11,13 @@ import io.github.quickmsg.common.protocol.Protocol; import io.github.quickmsg.common.topic.SubscribeTopic; import io.github.quickmsg.common.topic.TopicRegistry; import io.netty.handler.codec.mqtt.MqttMessageType; -import io.netty.handler.codec.mqtt.MqttSubAckMessage; import io.netty.handler.codec.mqtt.MqttSubscribeMessage; import reactor.core.publisher.Mono; import reactor.util.context.ContextView; import java.util.ArrayList; import java.util.List; +import java.util.Optional; import java.util.Set; import java.util.stream.Collectors; @@ -40,7 +40,10 @@ public class SubscribeProtocol implements Protocol { Set mqttTopicSubscriptions = message.payload().topicSubscriptions() .stream() - .peek(mqttTopicSubscription -> this.loadRetainMessage(messageRegistry, mqttChannel, mqttTopicSubscription.topicName())) + .peek(mqttTopicSubscription -> { + this.loadRetainMessage(messageRegistry, mqttChannel, mqttTopicSubscription.topicName()); + this.loadOfflineMessage(messageRegistry, mqttChannel, mqttTopicSubscription.topicName()); + }) .map(mqttTopicSubscription -> new SubscribeTopic(mqttTopicSubscription.topicName(), mqttTopicSubscription.qualityOfService(), mqttChannel)) .collect(Collectors.toSet()); @@ -57,6 +60,12 @@ public class SubscribeProtocol implements Protocol { .collect(Collectors.toList())), false)); } + /** + * 获取保留消息 + * @param messageRegistry + * @param mqttChannel + * @param topicName + */ private void loadRetainMessage(MessageRegistry messageRegistry, MqttChannel mqttChannel, String topicName) { messageRegistry.getRetainMessage(topicName) .forEach(retainMessage -> @@ -64,6 +73,19 @@ public class SubscribeProtocol implements Protocol { .subscribe()); } + /** + * 获取离线消息 + * @param messageRegistry + * @param mqttChannel + * @param topicName + */ + private void loadOfflineMessage(MessageRegistry messageRegistry, MqttChannel mqttChannel, String topicName) { + Optional.ofNullable(messageRegistry.getOfflineMessage(topicName)).ifPresent(msgList -> + msgList.forEach(offlineMessage -> + mqttChannel.write(offlineMessage.toPublishMessage(mqttChannel), offlineMessage.getQos() > 0) + .subscribe())); + } + @Override public List getMqttMessageTypes() { return MESSAGE_TYPE_LIST; diff --git a/smqtt-core/src/main/java/io/github/quickmsg/core/spi/DefaultMessageRegistry.java b/smqtt-core/src/main/java/io/github/quickmsg/core/spi/DefaultMessageRegistry.java index 256d8170f42227019a8c1ec680095a9ecc04b99f..67e35485b15b48e108a2ac4a50f2010d2faaf537 100644 --- a/smqtt-core/src/main/java/io/github/quickmsg/core/spi/DefaultMessageRegistry.java +++ b/smqtt-core/src/main/java/io/github/quickmsg/core/spi/DefaultMessageRegistry.java @@ -1,10 +1,12 @@ package io.github.quickmsg.core.spi; import io.github.quickmsg.common.message.MessageRegistry; +import io.github.quickmsg.common.message.OfflineMessage; import io.github.quickmsg.common.message.RetainMessage; import io.github.quickmsg.common.message.SessionMessage; import io.github.quickmsg.common.utils.TopicRegexUtils; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -21,6 +23,8 @@ public class DefaultMessageRegistry implements MessageRegistry { private Map retainMessages = new ConcurrentHashMap<>(); + private Map> offlineMessages = new ConcurrentHashMap<>(); + @Override public List getSessionMessage(String clientIdentifier) { @@ -47,4 +51,15 @@ public class DefaultMessageRegistry implements MessageRegistry { .collect(Collectors.toList()); } + @Override + public List getOfflineMessage(String topic) { + return offlineMessages.remove(topic); + } + + @Override + public void saveOfflineMessage(OfflineMessage offlineMessage) { + List offlineList = offlineMessages.computeIfAbsent(offlineMessage.getTopic(), key -> new CopyOnWriteArrayList<>()); + offlineList.add(offlineMessage); + } + } diff --git a/smqtt-persistent/smqtt-persistent-db/src/main/java/io/github/quickmsg/persistent/registry/DbMessageRegistry.java b/smqtt-persistent/smqtt-persistent-db/src/main/java/io/github/quickmsg/persistent/registry/DbMessageRegistry.java index 92992c29dee58d8e71718b6cf1bbaf86452e9a35..20d9d996672b7fc62a6aaf31736cc9c413c76583 100644 --- a/smqtt-persistent/smqtt-persistent-db/src/main/java/io/github/quickmsg/persistent/registry/DbMessageRegistry.java +++ b/smqtt-persistent/smqtt-persistent-db/src/main/java/io/github/quickmsg/persistent/registry/DbMessageRegistry.java @@ -2,6 +2,7 @@ package io.github.quickmsg.persistent.registry; import io.github.quickmsg.common.config.BootstrapConfig; import io.github.quickmsg.common.message.MessageRegistry; +import io.github.quickmsg.common.message.OfflineMessage; import io.github.quickmsg.common.message.RetainMessage; import io.github.quickmsg.common.message.SessionMessage; import io.github.quickmsg.common.utils.TopicRegexUtils; @@ -86,6 +87,7 @@ public class DbMessageRegistry implements MessageRegistry { .qos(record.getQos()) .topic(record.getTopic()) .body(record.getBody().getBytes()) + .userProperties(record.getUserProperties()) .clientIdentifier(record.getClientId()) .retain(record.getRetain()) .build()) @@ -111,6 +113,7 @@ public class DbMessageRegistry implements MessageRegistry { int qos = sessionMessage.getQos(); boolean retain = sessionMessage.isRetain(); byte[] body = sessionMessage.getBody(); + String userProperties = sessionMessage.getUserProperties(); try (Connection connection = HikariCPConnectionProvider.singleTon().getConnection()) { DSLContext dslContext = DSL.using(connection); @@ -121,8 +124,9 @@ public class DbMessageRegistry implements MessageRegistry { Tables.SMQTT_SESSION.QOS, Tables.SMQTT_SESSION.RETAIN, Tables.SMQTT_SESSION.BODY, - Tables.SMQTT_SESSION.CREATE_TIME) - .values(topic, clientIdentifier, qos, retain, bodyMsg, LocalDateTime.now()) + Tables.SMQTT_SESSION.CREATE_TIME, + Tables.SMQTT_SESSION.USER_PROPERTIES) + .values(topic, clientIdentifier, qos, retain, bodyMsg, LocalDateTime.now(), userProperties) .execute(); } catch (Exception e) { log.error("sendSessionMessages error message: {}", clientIdentifier, e); @@ -144,24 +148,26 @@ public class DbMessageRegistry implements MessageRegistry { .from(Tables.SMQTT_RETAIN) .where(Tables.SMQTT_RETAIN.TOPIC.eq(topic)) .fetchAny(); + String bodyMsg = new String(retainMessage.getBody(), CharsetUtil.UTF_8); + String userProperties = retainMessage.getUserProperties(); if (integerRecord1 != null && integerRecord1.value1() != null && integerRecord1.value1() > 0) { // 更新记录 - String bodyMsg = new String(retainMessage.getBody(), CharsetUtil.UTF_8); dslContext.update(Tables.SMQTT_RETAIN) .set(Tables.SMQTT_RETAIN.QOS, qos) .set(Tables.SMQTT_RETAIN.BODY, bodyMsg) .set(Tables.SMQTT_RETAIN.UPDATE_TIME, LocalDateTime.now()) + .set(Tables.SMQTT_RETAIN.USER_PROPERTIES, userProperties) .where(Tables.SMQTT_RETAIN.TOPIC.eq(topic)) .execute(); } else { // 新增记录 - String bodyMsg = new String(retainMessage.getBody(), CharsetUtil.UTF_8); dslContext.insertInto(Tables.SMQTT_RETAIN) .columns(Tables.SMQTT_RETAIN.TOPIC, Tables.SMQTT_RETAIN.QOS, Tables.SMQTT_RETAIN.BODY, - Tables.SMQTT_RETAIN.CREATE_TIME) - .values(topic, qos, bodyMsg, LocalDateTime.now()) + Tables.SMQTT_RETAIN.CREATE_TIME, + Tables.SMQTT_RETAIN.USER_PROPERTIES) + .values(topic, qos, bodyMsg, LocalDateTime.now(), userProperties) .execute(); } } @@ -183,6 +189,7 @@ public class DbMessageRegistry implements MessageRegistry { .topic(record.getTopic()) .qos(record.getQos()) .body(getBody(record.getBody())) + .userProperties(record.getUserProperties()) .build() ) .collect(Collectors.toList()); @@ -198,4 +205,59 @@ public class DbMessageRegistry implements MessageRegistry { return StringUtils.isBlank(body) ? null : body.getBytes(CharsetUtil.UTF_8); } + @Override + public List getOfflineMessage(String topic) { + try (Connection connection = HikariCPConnectionProvider.singleTon().getConnection()) { + DSLContext dslContext = DSL.using(connection); + + List list = dslContext + .selectFrom(Tables.SMQTT_OFFLINE) + .where(Tables.SMQTT_OFFLINE.TOPIC.eq(topic)) + .fetch() + .stream() + .map(record -> + OfflineMessage.builder() + .qos(record.getQos()) + .topic(record.getTopic()) + .body(record.getBody().getBytes()) + .userProperties(record.getUserProperties()) + .build()) + .collect(Collectors.toList()); + + if (list.size() > 0) { + // 删除记录 + dslContext.deleteFrom(Tables.SMQTT_OFFLINE) + .where(Tables.SMQTT_OFFLINE.TOPIC.eq(topic)) + .execute(); + } + return list; + } catch (Exception e) { + log.error("getOfflineMessages error topic:{}", topic, e); + return Collections.emptyList(); + } + } + + @Override + public void saveOfflineMessage(OfflineMessage offlineMessage) { + String topic = offlineMessage.getTopic(); + int qos = offlineMessage.getQos(); + byte[] body = offlineMessage.getBody(); + String userProperties = offlineMessage.getUserProperties(); + + try (Connection connection = HikariCPConnectionProvider.singleTon().getConnection()) { + DSLContext dslContext = DSL.using(connection); + String bodyMsg = new String(body, CharsetUtil.UTF_8); + dslContext.insertInto(Tables.SMQTT_OFFLINE) + .columns(Tables.SMQTT_OFFLINE.TOPIC, + Tables.SMQTT_OFFLINE.QOS, + Tables.SMQTT_OFFLINE.BODY, + Tables.SMQTT_OFFLINE.CREATE_TIME, + Tables.SMQTT_OFFLINE.USER_PROPERTIES) + .values(topic, qos, bodyMsg, LocalDateTime.now(), userProperties) + .execute(); + } catch (Exception e) { + log.error("saveOfflineMessages error message: {}", topic, e); + } + } + } diff --git a/smqtt-persistent/smqtt-persistent-db/src/main/java/io/github/quickmsg/persistent/tables/Indexes.java b/smqtt-persistent/smqtt-persistent-db/src/main/java/io/github/quickmsg/persistent/tables/Indexes.java index 376554f021b6152a7896eca06061af1461978a23..a09f89043af7f8298f73af2211b023d41804c644 100644 --- a/smqtt-persistent/smqtt-persistent-db/src/main/java/io/github/quickmsg/persistent/tables/Indexes.java +++ b/smqtt-persistent/smqtt-persistent-db/src/main/java/io/github/quickmsg/persistent/tables/Indexes.java @@ -4,6 +4,7 @@ package io.github.quickmsg.persistent.tables; +import io.github.quickmsg.persistent.tables.tables.SmqttOffline; import io.github.quickmsg.persistent.tables.tables.SmqttRetain; import org.jooq.Index; @@ -23,4 +24,6 @@ public class Indexes { // ------------------------------------------------------------------------- public static final Index SMQTT_RETAIN_INDEX_TOPIC = Internal.createIndex(DSL.name("index_topic"), SmqttRetain.SMQTT_RETAIN, new OrderField[] { SmqttRetain.SMQTT_RETAIN.TOPIC }, false); + + public static final Index SMQTT_OFFLINE_INDEX_TOPIC = Internal.createIndex(DSL.name("index_topic"), SmqttOffline.SMQTT_OFFLINE, new OrderField[] { SmqttOffline.SMQTT_OFFLINE.TOPIC }, false); } diff --git a/smqtt-persistent/smqtt-persistent-db/src/main/java/io/github/quickmsg/persistent/tables/Smqtt.java b/smqtt-persistent/smqtt-persistent-db/src/main/java/io/github/quickmsg/persistent/tables/Smqtt.java index 010bb68d5d33942b02dee6b72aac2224097e81f5..c2a5712d0aaafc38cc224f7a8e6ea0a32442bb1a 100644 --- a/smqtt-persistent/smqtt-persistent-db/src/main/java/io/github/quickmsg/persistent/tables/Smqtt.java +++ b/smqtt-persistent/smqtt-persistent-db/src/main/java/io/github/quickmsg/persistent/tables/Smqtt.java @@ -6,6 +6,7 @@ package io.github.quickmsg.persistent.tables; import io.github.quickmsg.persistent.tables.tables.Databasechangelog; import io.github.quickmsg.persistent.tables.tables.Databasechangeloglock; +import io.github.quickmsg.persistent.tables.tables.SmqttOffline; import io.github.quickmsg.persistent.tables.tables.SmqttRetain; import io.github.quickmsg.persistent.tables.tables.SmqttSession; @@ -40,6 +41,9 @@ public class Smqtt extends SchemaImpl { public final SmqttSession SMQTT_SESSION = SmqttSession.SMQTT_SESSION; + public static final SmqttOffline SMQTT_OFFLINE = SmqttOffline.SMQTT_OFFLINE; + + private Smqtt() { super("smqtt", null); } @@ -53,9 +57,10 @@ public class Smqtt extends SchemaImpl { @Override public final List> getTables() { return Arrays.>asList( - Databasechangelog.DATABASECHANGELOG, - Databasechangeloglock.DATABASECHANGELOGLOCK, - SmqttRetain.SMQTT_RETAIN, - SmqttSession.SMQTT_SESSION); + Databasechangelog.DATABASECHANGELOG, + Databasechangeloglock.DATABASECHANGELOGLOCK, + SmqttRetain.SMQTT_RETAIN, + SmqttSession.SMQTT_SESSION, + SmqttOffline.SMQTT_OFFLINE); } } diff --git a/smqtt-persistent/smqtt-persistent-db/src/main/java/io/github/quickmsg/persistent/tables/Tables.java b/smqtt-persistent/smqtt-persistent-db/src/main/java/io/github/quickmsg/persistent/tables/Tables.java index dc24425ddce2ec62c6cb233277e00200935576e4..d1bdbc0d5b2547863b6e068430ec280d04528590 100644 --- a/smqtt-persistent/smqtt-persistent-db/src/main/java/io/github/quickmsg/persistent/tables/Tables.java +++ b/smqtt-persistent/smqtt-persistent-db/src/main/java/io/github/quickmsg/persistent/tables/Tables.java @@ -6,6 +6,7 @@ package io.github.quickmsg.persistent.tables; import io.github.quickmsg.persistent.tables.tables.Databasechangelog; import io.github.quickmsg.persistent.tables.tables.Databasechangeloglock; +import io.github.quickmsg.persistent.tables.tables.SmqttOffline; import io.github.quickmsg.persistent.tables.tables.SmqttRetain; import io.github.quickmsg.persistent.tables.tables.SmqttSession; @@ -27,4 +28,7 @@ public class Tables { public static final SmqttSession SMQTT_SESSION = SmqttSession.SMQTT_SESSION; + + + public static final SmqttOffline SMQTT_OFFLINE = SmqttOffline.SMQTT_OFFLINE; } diff --git a/smqtt-persistent/smqtt-persistent-db/src/main/java/io/github/quickmsg/persistent/tables/tables/SmqttOffline.java b/smqtt-persistent/smqtt-persistent-db/src/main/java/io/github/quickmsg/persistent/tables/tables/SmqttOffline.java new file mode 100644 index 0000000000000000000000000000000000000000..d6cd317ed18db9fb62c0f93e39763a1a1dc2b8f6 --- /dev/null +++ b/smqtt-persistent/smqtt-persistent-db/src/main/java/io/github/quickmsg/persistent/tables/tables/SmqttOffline.java @@ -0,0 +1,123 @@ +package io.github.quickmsg.persistent.tables.tables; + +import org.jooq.Field; +import org.jooq.ForeignKey; +import org.jooq.Index; +import org.jooq.Name; +import org.jooq.Record; +import org.jooq.Row5; +import org.jooq.Schema; +import org.jooq.Table; +import org.jooq.TableField; +import org.jooq.TableOptions; +import org.jooq.impl.DSL; +import org.jooq.impl.SQLDataType; +import org.jooq.impl.TableImpl; + +import java.time.LocalDateTime; +import java.util.Arrays; +import java.util.List; + +import io.github.quickmsg.persistent.tables.Indexes; +import io.github.quickmsg.persistent.tables.Smqtt; +import io.github.quickmsg.persistent.tables.tables.records.SmqttOfflineRecord; + +/** + * @Author: Jingwu.Zhou + * @Date: 2022/1/24 + */ +public class SmqttOffline extends TableImpl { + private static final long serialVersionUID = 1L; + + + public static final SmqttOffline SMQTT_OFFLINE = new SmqttOffline(); + + + @Override + public Class getRecordType() { + return SmqttOfflineRecord.class; + } + + public final TableField TOPIC = createField(DSL.name("topic"), SQLDataType.VARCHAR(255).defaultValue(DSL.inline("NULL", SQLDataType.VARCHAR)), this, "话题"); + + + public final TableField QOS = createField(DSL.name("qos"), SQLDataType.INTEGER.defaultValue(DSL.inline("NULL", SQLDataType.INTEGER)), this, "qos"); + + + public final TableField BODY = createField(DSL.name("body"), SQLDataType.VARCHAR(255).defaultValue(DSL.inline("NULL", SQLDataType.VARCHAR)), this, "消息内容"); + + + public final TableField CREATE_TIME = createField(DSL.name("create_time"), SQLDataType.LOCALDATETIME(0).defaultValue(DSL.inline("NULL", SQLDataType.LOCALDATETIME)), this, "记录保存时间"); + + + public final TableField USER_PROPERTIES = createField(DSL.name("user_properties"), SQLDataType.VARCHAR(1000).defaultValue(DSL.inline("NULL", SQLDataType.VARCHAR)), this, "可变头userProperties"); + + + private SmqttOffline(Name alias, Table aliased) { + this(alias, aliased, null); + } + + private SmqttOffline(Name alias, Table aliased, Field[] parameters) { + super(alias, null, aliased, parameters, DSL.comment(""), TableOptions.table()); + } + + + public SmqttOffline(String alias) { + this(DSL.name(alias), SMQTT_OFFLINE); + } + + + public SmqttOffline(Name alias) { + this(alias, SMQTT_OFFLINE); + } + + + public SmqttOffline() { + this(DSL.name("smqtt_offline"), null); + } + + public SmqttOffline(Table child, ForeignKey key) { + super(child, key, SMQTT_OFFLINE); + } + + @Override + public Schema getSchema() { + return Smqtt.SMQTT; + } + + @Override + public List getIndexes() { + return Arrays.asList(Indexes.SMQTT_OFFLINE_INDEX_TOPIC); + } + + @Override + public SmqttOffline as(String alias) { + return new SmqttOffline(DSL.name(alias), this); + } + + @Override + public SmqttOffline as(Name alias) { + return new SmqttOffline(alias, this); + } + + + @Override + public SmqttOffline rename(String name) { + return new SmqttOffline(DSL.name(name), null); + } + + + @Override + public SmqttOffline rename(Name name) { + return new SmqttOffline(name, null); + } + + // ------------------------------------------------------------------------- + // Row5 type methods + // ------------------------------------------------------------------------- + + @Override + public Row5 fieldsRow() { + return (Row5) super.fieldsRow(); + } +} diff --git a/smqtt-persistent/smqtt-persistent-db/src/main/java/io/github/quickmsg/persistent/tables/tables/SmqttRetain.java b/smqtt-persistent/smqtt-persistent-db/src/main/java/io/github/quickmsg/persistent/tables/tables/SmqttRetain.java index 11edf2cea374a2f38602a9f63415939ebbf81b10..5fb01bcbfd93008983f5b1b874551d9a4cd261b2 100644 --- a/smqtt-persistent/smqtt-persistent-db/src/main/java/io/github/quickmsg/persistent/tables/tables/SmqttRetain.java +++ b/smqtt-persistent/smqtt-persistent-db/src/main/java/io/github/quickmsg/persistent/tables/tables/SmqttRetain.java @@ -17,7 +17,7 @@ import org.jooq.ForeignKey; import org.jooq.Index; import org.jooq.Name; import org.jooq.Record; -import org.jooq.Row5; +import org.jooq.Row6; import org.jooq.Schema; import org.jooq.Table; import org.jooq.TableField; @@ -57,6 +57,8 @@ public class SmqttRetain extends TableImpl { public final TableField UPDATE_TIME = createField(DSL.name("update_time"), SQLDataType.LOCALDATETIME(0).defaultValue(DSL.inline("NULL", SQLDataType.LOCALDATETIME)), this, "记录更新时间"); + public final TableField USER_PROPERTIES = createField(DSL.name("user_properties"), SQLDataType.VARCHAR(1000).defaultValue(DSL.inline("NULL", SQLDataType.VARCHAR)), this, "可变头userProperties"); + private SmqttRetain(Name alias, Table aliased) { this(alias, aliased, null); } @@ -121,7 +123,7 @@ public class SmqttRetain extends TableImpl { // ------------------------------------------------------------------------- @Override - public Row5 fieldsRow() { - return (Row5) super.fieldsRow(); + public Row6 fieldsRow() { + return (Row6) super.fieldsRow(); } } diff --git a/smqtt-persistent/smqtt-persistent-db/src/main/java/io/github/quickmsg/persistent/tables/tables/SmqttSession.java b/smqtt-persistent/smqtt-persistent-db/src/main/java/io/github/quickmsg/persistent/tables/tables/SmqttSession.java index 8e46bde625398231e4431754155509ddfb98050c..cebf7fb296940e672ae26511dfdb18ee54568fce 100644 --- a/smqtt-persistent/smqtt-persistent-db/src/main/java/io/github/quickmsg/persistent/tables/tables/SmqttSession.java +++ b/smqtt-persistent/smqtt-persistent-db/src/main/java/io/github/quickmsg/persistent/tables/tables/SmqttSession.java @@ -13,7 +13,7 @@ import org.jooq.Field; import org.jooq.ForeignKey; import org.jooq.Name; import org.jooq.Record; -import org.jooq.Row6; +import org.jooq.Row7; import org.jooq.Schema; import org.jooq.Table; import org.jooq.TableField; @@ -62,6 +62,8 @@ public class SmqttSession extends TableImpl { public final TableField CREATE_TIME = createField(DSL.name("create_time"), SQLDataType.LOCALDATETIME(0).defaultValue(DSL.inline("NULL", SQLDataType.LOCALDATETIME)), this, "记录保存时间"); + public final TableField USER_PROPERTIES = createField(DSL.name("user_properties"), SQLDataType.VARCHAR(1000).defaultValue(DSL.inline("NULL", SQLDataType.VARCHAR)), this, "可变头userProperties"); + private SmqttSession(Name alias, Table aliased) { this(alias, aliased, null); } @@ -121,7 +123,7 @@ public class SmqttSession extends TableImpl { // ------------------------------------------------------------------------- @Override - public Row6 fieldsRow() { - return (Row6) super.fieldsRow(); + public Row7 fieldsRow() { + return (Row7) super.fieldsRow(); } } diff --git a/smqtt-persistent/smqtt-persistent-db/src/main/java/io/github/quickmsg/persistent/tables/tables/records/SmqttOfflineRecord.java b/smqtt-persistent/smqtt-persistent-db/src/main/java/io/github/quickmsg/persistent/tables/tables/records/SmqttOfflineRecord.java new file mode 100644 index 0000000000000000000000000000000000000000..5408bd50a6d2f28ea374db7a79a269103598e19f --- /dev/null +++ b/smqtt-persistent/smqtt-persistent-db/src/main/java/io/github/quickmsg/persistent/tables/tables/records/SmqttOfflineRecord.java @@ -0,0 +1,214 @@ +package io.github.quickmsg.persistent.tables.tables.records; + +import org.jooq.Field; +import org.jooq.Record5; +import org.jooq.Row5; +import org.jooq.impl.TableRecordImpl; + +import java.time.LocalDateTime; + +import io.github.quickmsg.persistent.tables.tables.SmqttOffline; + +/** + * @Author: Jingwu.Zhou + * @Date: 2022/1/24 + */ +public class SmqttOfflineRecord extends TableRecordImpl implements Record5 { + private static final long serialVersionUID = 1L; + + public void setTopic(String value) { + set(0, value); + } + + + public String getTopic() { + return (String) get(0); + } + + + public void setQos(Integer value) { + set(1, value); + } + + + public Integer getQos() { + return (Integer) get(1); + } + + + public void setBody(String value) { + set(2, value); + } + + + public String getBody() { + return (String) get(2); + } + + public void setCreateTime(LocalDateTime value) { + set(3, value); + } + + public LocalDateTime getCreateTime() { + return (LocalDateTime) get(3); + } + + + public void setUserProperties(String value) { + set(4, value); + } + + + public String getUserProperties() { + return (String) get(4); + } + + // ------------------------------------------------------------------------- + // Record5 type implementation + // ------------------------------------------------------------------------- + + @Override + public Row5 fieldsRow() { + return (Row5) super.fieldsRow(); + } + + @Override + public Row5 valuesRow() { + return (Row5) super.valuesRow(); + } + + @Override + public Field field1() { + return SmqttOffline.SMQTT_OFFLINE.TOPIC; + } + + @Override + public Field field2() { + return SmqttOffline.SMQTT_OFFLINE.QOS; + } + + @Override + public Field field3() { + return SmqttOffline.SMQTT_OFFLINE.BODY; + } + + @Override + public Field field4() { + return SmqttOffline.SMQTT_OFFLINE.CREATE_TIME; + } + + @Override + public Field field5() { + return SmqttOffline.SMQTT_OFFLINE.USER_PROPERTIES; + } + + @Override + public String component1() { + return getTopic(); + } + + @Override + public Integer component2() { + return getQos(); + } + + @Override + public String component3() { + return getBody(); + } + + @Override + public LocalDateTime component4() { + return getCreateTime(); + } + + @Override + public String component5() { + return getUserProperties(); + } + + @Override + public String value1() { + return getTopic(); + } + + @Override + public Integer value2() { + return getQos(); + } + + @Override + public String value3() { + return getBody(); + } + + @Override + public LocalDateTime value4() { + return getCreateTime(); + } + + @Override + public String value5() { + return getUserProperties(); + } + + @Override + public SmqttOfflineRecord value1(String value) { + setTopic(value); + return this; + } + + @Override + public SmqttOfflineRecord value2(Integer value) { + setQos(value); + return this; + } + + @Override + public SmqttOfflineRecord value3(String value) { + setBody(value); + return this; + } + + @Override + public SmqttOfflineRecord value4(LocalDateTime value) { + setCreateTime(value); + return this; + } + + @Override + public SmqttOfflineRecord value5(String value) { + setUserProperties(value); + return this; + } + + @Override + public SmqttOfflineRecord values(String value1, Integer value2, String value3, LocalDateTime value4, String value5) { + value1(value1); + value2(value2); + value3(value3); + value4(value4); + value5(value5); + return this; + } + + // ------------------------------------------------------------------------- + // Constructors + // ------------------------------------------------------------------------- + + + public SmqttOfflineRecord() { + super(SmqttOffline.SMQTT_OFFLINE); + } + + + public SmqttOfflineRecord(String topic, Integer qos, String body, LocalDateTime createTime, LocalDateTime updateTime, String userProperties) { + super(SmqttOffline.SMQTT_OFFLINE); + + setTopic(topic); + setQos(qos); + setBody(body); + setCreateTime(createTime); + setUserProperties(userProperties); + } +} diff --git a/smqtt-persistent/smqtt-persistent-db/src/main/java/io/github/quickmsg/persistent/tables/tables/records/SmqttRetainRecord.java b/smqtt-persistent/smqtt-persistent-db/src/main/java/io/github/quickmsg/persistent/tables/tables/records/SmqttRetainRecord.java index df93c1d921501df7f94de8e9236708c23c724851..119e93764720473e5c51cbc6c5c504f6f650c270 100644 --- a/smqtt-persistent/smqtt-persistent-db/src/main/java/io/github/quickmsg/persistent/tables/tables/records/SmqttRetainRecord.java +++ b/smqtt-persistent/smqtt-persistent-db/src/main/java/io/github/quickmsg/persistent/tables/tables/records/SmqttRetainRecord.java @@ -9,8 +9,8 @@ import io.github.quickmsg.persistent.tables.tables.SmqttRetain; import java.time.LocalDateTime; import org.jooq.Field; -import org.jooq.Record5; -import org.jooq.Row5; +import org.jooq.Record6; +import org.jooq.Row6; import org.jooq.impl.TableRecordImpl; @@ -18,7 +18,7 @@ import org.jooq.impl.TableRecordImpl; * This class is generated by jOOQ. */ @SuppressWarnings({ "all", "unchecked", "rawtypes" }) -public class SmqttRetainRecord extends TableRecordImpl implements Record5 { +public class SmqttRetainRecord extends TableRecordImpl implements Record6 { private static final long serialVersionUID = 1L; @@ -69,18 +69,27 @@ public class SmqttRetainRecord extends TableRecordImpl implem return (LocalDateTime) get(4); } + public void setUserProperties(String value) { + set(5, value); + } + + + public String getUserProperties() { + return (String) get(5); + } + // ------------------------------------------------------------------------- // Record5 type implementation // ------------------------------------------------------------------------- @Override - public Row5 fieldsRow() { - return (Row5) super.fieldsRow(); + public Row6 fieldsRow() { + return (Row6) super.fieldsRow(); } @Override - public Row5 valuesRow() { - return (Row5) super.valuesRow(); + public Row6 valuesRow() { + return (Row6) super.valuesRow(); } @Override @@ -108,6 +117,11 @@ public class SmqttRetainRecord extends TableRecordImpl implem return SmqttRetain.SMQTT_RETAIN.UPDATE_TIME; } + @Override + public Field field6() { + return SmqttRetain.SMQTT_RETAIN.USER_PROPERTIES; + } + @Override public String component1() { return getTopic(); @@ -133,6 +147,11 @@ public class SmqttRetainRecord extends TableRecordImpl implem return getUpdateTime(); } + @Override + public String component6() { + return getUserProperties(); + } + @Override public String value1() { return getTopic(); @@ -158,6 +177,11 @@ public class SmqttRetainRecord extends TableRecordImpl implem return getUpdateTime(); } + @Override + public String value6() { + return getUserProperties(); + } + @Override public SmqttRetainRecord value1(String value) { setTopic(value); @@ -189,12 +213,19 @@ public class SmqttRetainRecord extends TableRecordImpl implem } @Override - public SmqttRetainRecord values(String value1, Integer value2, String value3, LocalDateTime value4, LocalDateTime value5) { + public SmqttRetainRecord value6(String value) { + setUserProperties(value); + return this; + } + + @Override + public SmqttRetainRecord values(String value1, Integer value2, String value3, LocalDateTime value4, LocalDateTime value5, String value6) { value1(value1); value2(value2); value3(value3); value4(value4); value5(value5); + value6(value6); return this; } @@ -208,7 +239,7 @@ public class SmqttRetainRecord extends TableRecordImpl implem } - public SmqttRetainRecord(String topic, Integer qos, String body, LocalDateTime createTime, LocalDateTime updateTime) { + public SmqttRetainRecord(String topic, Integer qos, String body, LocalDateTime createTime, LocalDateTime updateTime, String userProperties) { super(SmqttRetain.SMQTT_RETAIN); setTopic(topic); @@ -216,5 +247,6 @@ public class SmqttRetainRecord extends TableRecordImpl implem setBody(body); setCreateTime(createTime); setUpdateTime(updateTime); + setUserProperties(userProperties); } } diff --git a/smqtt-persistent/smqtt-persistent-db/src/main/java/io/github/quickmsg/persistent/tables/tables/records/SmqttSessionRecord.java b/smqtt-persistent/smqtt-persistent-db/src/main/java/io/github/quickmsg/persistent/tables/tables/records/SmqttSessionRecord.java index 8904fe4687cad8881d4a1d647d069072dd8682e8..f7e845026d8b1f400f44007d83a04cf6adc635bb 100644 --- a/smqtt-persistent/smqtt-persistent-db/src/main/java/io/github/quickmsg/persistent/tables/tables/records/SmqttSessionRecord.java +++ b/smqtt-persistent/smqtt-persistent-db/src/main/java/io/github/quickmsg/persistent/tables/tables/records/SmqttSessionRecord.java @@ -9,8 +9,8 @@ import io.github.quickmsg.persistent.tables.tables.SmqttSession; import java.time.LocalDateTime; import org.jooq.Field; -import org.jooq.Record6; -import org.jooq.Row6; +import org.jooq.Record7; +import org.jooq.Row7; import org.jooq.impl.TableRecordImpl; @@ -18,7 +18,7 @@ import org.jooq.impl.TableRecordImpl; * This class is generated by jOOQ. */ @SuppressWarnings({ "all", "unchecked", "rawtypes" }) -public class SmqttSessionRecord extends TableRecordImpl implements Record6 { +public class SmqttSessionRecord extends TableRecordImpl implements Record7 { private static final long serialVersionUID = 1L; @@ -72,6 +72,15 @@ public class SmqttSessionRecord extends TableRecordImpl impl return (String) get(4); } + public void setUserProperties(String value) { + set(6, value); + } + + + public String getUserProperties() { + return (String) get(6); + } + public void setCreateTime(LocalDateTime value) { set(5, value); @@ -87,13 +96,13 @@ public class SmqttSessionRecord extends TableRecordImpl impl // ------------------------------------------------------------------------- @Override - public Row6 fieldsRow() { - return (Row6) super.fieldsRow(); + public Row7 fieldsRow() { + return (Row7) super.fieldsRow(); } @Override - public Row6 valuesRow() { - return (Row6) super.valuesRow(); + public Row7 valuesRow() { + return (Row7) super.valuesRow(); } @Override @@ -126,6 +135,11 @@ public class SmqttSessionRecord extends TableRecordImpl impl return SmqttSession.SMQTT_SESSION.CREATE_TIME; } + @Override + public Field field7() { + return SmqttSession.SMQTT_SESSION.USER_PROPERTIES; + } + @Override public String component1() { return getTopic(); @@ -156,6 +170,11 @@ public class SmqttSessionRecord extends TableRecordImpl impl return getCreateTime(); } + @Override + public String component7() { + return getUserProperties(); + } + @Override public String value1() { return getTopic(); @@ -186,6 +205,11 @@ public class SmqttSessionRecord extends TableRecordImpl impl return getCreateTime(); } + @Override + public String value7() { + return getUserProperties(); + } + @Override public SmqttSessionRecord value1(String value) { setTopic(value); @@ -223,13 +247,20 @@ public class SmqttSessionRecord extends TableRecordImpl impl } @Override - public SmqttSessionRecord values(String value1, String value2, Integer value3, Boolean value4, String value5, LocalDateTime value6) { + public SmqttSessionRecord value7(String value) { + setUserProperties(value); + return this; + } + + @Override + public SmqttSessionRecord values(String value1, String value2, Integer value3, Boolean value4, String value5, LocalDateTime value6, String value7) { value1(value1); value2(value2); value3(value3); value4(value4); value5(value5); value6(value6); + value7(value7); return this; } @@ -243,7 +274,7 @@ public class SmqttSessionRecord extends TableRecordImpl impl } - public SmqttSessionRecord(String topic, String clientId, Integer qos, Boolean retain, String body, LocalDateTime createTime) { + public SmqttSessionRecord(String topic, String clientId, Integer qos, Boolean retain, String body, LocalDateTime createTime, String userProperties) { super(SmqttSession.SMQTT_SESSION); setTopic(topic); @@ -252,5 +283,6 @@ public class SmqttSessionRecord extends TableRecordImpl impl setRetain(retain); setBody(body); setCreateTime(createTime); + setUserProperties(userProperties); } } diff --git a/smqtt-persistent/smqtt-persistent-db/src/main/resources/liquibase/smqtt_db.xml b/smqtt-persistent/smqtt-persistent-db/src/main/resources/liquibase/smqtt_db.xml index 8c59856239669a22f6501d2c4328a4575aae327e..06cb5bfe67fac0710efc9f7f2da2f5206bb0c451 100644 --- a/smqtt-persistent/smqtt-persistent-db/src/main/resources/liquibase/smqtt_db.xml +++ b/smqtt-persistent/smqtt-persistent-db/src/main/resources/liquibase/smqtt_db.xml @@ -14,6 +14,7 @@ + @@ -30,10 +31,26 @@ + + + + create smqtt_offline table + + + + + + + + + + + + \ No newline at end of file diff --git a/smqtt-persistent/smqtt-persistent-redis/src/main/java/io/github/quickmsg/persistent/message/OfflineMessageEntity.java b/smqtt-persistent/smqtt-persistent-redis/src/main/java/io/github/quickmsg/persistent/message/OfflineMessageEntity.java new file mode 100644 index 0000000000000000000000000000000000000000..44f33d79f2a238ef406d38535ad69d495aaeb957 --- /dev/null +++ b/smqtt-persistent/smqtt-persistent-redis/src/main/java/io/github/quickmsg/persistent/message/OfflineMessageEntity.java @@ -0,0 +1,30 @@ +package io.github.quickmsg.persistent.message; + +import com.fasterxml.jackson.annotation.JsonFormat; + +import java.io.Serializable; +import java.util.Date; + +import lombok.Builder; +import lombok.Data; + +/** + * @Author: Jingwu.Zhou + * @Date: 2022/1/24 + */ +@Data +@Builder +public class OfflineMessageEntity implements Serializable { + private static final long serialVersionUID = 1095608914696359395L; + + private String topic; + + private Integer qos; + + private byte[] body; + + private String userProperties; + + @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss") + private Date createTime; +} diff --git a/smqtt-persistent/smqtt-persistent-redis/src/main/java/io/github/quickmsg/persistent/message/RetainMessageEntity.java b/smqtt-persistent/smqtt-persistent-redis/src/main/java/io/github/quickmsg/persistent/message/RetainMessageEntity.java index f30dc0208e235749b77df0392bfca0e6fdbb4c5b..086c987049d3960ec93cd0f42d93a0d164b7b89d 100644 --- a/smqtt-persistent/smqtt-persistent-redis/src/main/java/io/github/quickmsg/persistent/message/RetainMessageEntity.java +++ b/smqtt-persistent/smqtt-persistent-redis/src/main/java/io/github/quickmsg/persistent/message/RetainMessageEntity.java @@ -24,6 +24,8 @@ public class RetainMessageEntity implements Serializable { private byte[] body; + private String userProperties; + @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss") private Date createTime; diff --git a/smqtt-persistent/smqtt-persistent-redis/src/main/java/io/github/quickmsg/persistent/message/SessionMessageEntity.java b/smqtt-persistent/smqtt-persistent-redis/src/main/java/io/github/quickmsg/persistent/message/SessionMessageEntity.java index 2966a12546e27774fa61ae8c630f5a11c8bbcd81..418d356b3e9f8539f0d90a4a41ca1b424898e5c5 100644 --- a/smqtt-persistent/smqtt-persistent-redis/src/main/java/io/github/quickmsg/persistent/message/SessionMessageEntity.java +++ b/smqtt-persistent/smqtt-persistent-redis/src/main/java/io/github/quickmsg/persistent/message/SessionMessageEntity.java @@ -25,6 +25,8 @@ public class SessionMessageEntity implements Serializable { private Boolean retain; + private String userProperties; + private byte[] body; @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss") private Date createTime; diff --git a/smqtt-persistent/smqtt-persistent-redis/src/main/java/io/github/quickmsg/persistent/registry/RedisMessageRegistry.java b/smqtt-persistent/smqtt-persistent-redis/src/main/java/io/github/quickmsg/persistent/registry/RedisMessageRegistry.java index a24f5a978ac3192ed52b45a166feac363b035337..9abbbd67629c18af2b5fd5c2c8b092e326c54942 100644 --- a/smqtt-persistent/smqtt-persistent-redis/src/main/java/io/github/quickmsg/persistent/registry/RedisMessageRegistry.java +++ b/smqtt-persistent/smqtt-persistent-redis/src/main/java/io/github/quickmsg/persistent/registry/RedisMessageRegistry.java @@ -2,12 +2,13 @@ package io.github.quickmsg.persistent.registry; import io.github.quickmsg.common.bootstrap.BootstrapKey; import io.github.quickmsg.common.config.BootstrapConfig; -import io.github.quickmsg.common.environment.EnvContext; import io.github.quickmsg.common.message.MessageRegistry; +import io.github.quickmsg.common.message.OfflineMessage; import io.github.quickmsg.common.message.RetainMessage; import io.github.quickmsg.common.message.SessionMessage; import io.github.quickmsg.common.utils.TopicRegexUtils; import io.github.quickmsg.persistent.factory.ClientFactory; +import io.github.quickmsg.persistent.message.OfflineMessageEntity; import io.github.quickmsg.persistent.message.RetainMessageEntity; import io.github.quickmsg.persistent.message.SessionMessageEntity; import io.github.quickmsg.persistent.strategy.ClientStrategy; @@ -15,10 +16,11 @@ import lombok.extern.slf4j.Slf4j; import org.redisson.api.RBucket; import org.redisson.api.RKeys; import org.redisson.api.RList; +import org.redisson.api.RSetCache; import org.redisson.api.RedissonClient; -import java.sql.Connection; import java.util.*; +import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import java.util.stream.StreamSupport; @@ -59,6 +61,7 @@ public class RedisMessageRegistry implements MessageRegistry { .qos(record.getQos()) .retain(record.getRetain()) .body(record.getBody()) + .userProperties(record.getUserProperties()) .build() ).collect(Collectors.toList()); @@ -79,6 +82,7 @@ public class RedisMessageRegistry implements MessageRegistry { int qos = sessionMessage.getQos(); boolean retain = sessionMessage.isRetain(); byte[] body = sessionMessage.getBody(); + String userProperty = sessionMessage.getUserProperties(); try { SessionMessageEntity sessionMessageEntity = SessionMessageEntity.builder() @@ -87,6 +91,7 @@ public class RedisMessageRegistry implements MessageRegistry { .qos(qos) .body(body) .retain(retain) + .userProperties(userProperty) .createTime(new Date()).build(); RList list = redissonClient.getList(BootstrapKey.Redis.REDIS_SESSION_MESSAGE_PREFIX_KEY + clientIdentifier); @@ -110,6 +115,7 @@ public class RedisMessageRegistry implements MessageRegistry { .topic(topic) .qos(qos) .body(retainMessage.getBody()) + .userProperties(retainMessage.getUserProperties()) .createTime(date) .updateTime(date).build(); @@ -137,6 +143,7 @@ public class RedisMessageRegistry implements MessageRegistry { .topic(item.getTopic()) .qos(item.getQos()) .body(item.getBody()) + .userProperties(item.getUserProperties()) .build() ).orElse(null); }) @@ -148,4 +155,49 @@ public class RedisMessageRegistry implements MessageRegistry { return Collections.emptyList(); } + @Override + public List getOfflineMessage(String topic) { + // 一对一发送,只需要精确匹配 + try { + RSetCache set = redissonClient.getSetCache(BootstrapKey.Redis.REDIS_OFFLINE_MESSAGE_PREFIX_KEY + topic); + List resList = set.stream().map(record -> OfflineMessage.builder() + .topic(record.getTopic()) + .qos(record.getQos()) + .body(record.getBody()) + .userProperties(record.getUserProperties()) + .build() + ).collect(Collectors.toList()); + + if (set.size() > 0) { + redissonClient.getBucket(BootstrapKey.Redis.REDIS_OFFLINE_MESSAGE_PREFIX_KEY + topic).delete(); + } + return resList; + } catch (Exception e) { + log.error("getOfflineMessage error clientIdentifier:{}", topic, e); + return Collections.emptyList(); + } + } + + @Override + public void saveOfflineMessage(OfflineMessage offlineMessage) { + String topic = offlineMessage.getTopic(); + int qos = offlineMessage.getQos(); + byte[] body = offlineMessage.getBody(); + String userProperties = offlineMessage.getUserProperties(); + try { + OfflineMessageEntity offlineMessageEntity = OfflineMessageEntity.builder() + .topic(topic) + .qos(qos) + .body(body) + .userProperties(userProperties) + .createTime(new Date()) + .build(); + + RSetCache set = redissonClient.getSetCache(BootstrapKey.Redis.REDIS_OFFLINE_MESSAGE_PREFIX_KEY + topic); + set.add(offlineMessageEntity, 7, TimeUnit.DAYS); + } catch (Exception e) { + log.error("saveOfflineMessage error message: {}", topic, e); + } + } + } diff --git a/smqtt-rule/smqtt-rule-engine/src/main/java/io/github/quickmsg/rule/node/TopicRuleNode.java b/smqtt-rule/smqtt-rule-engine/src/main/java/io/github/quickmsg/rule/node/TopicRuleNode.java index 2b28c1e7300118dbe560a73bdef6a1f23472eddc..f320d965bc38d88abde60ed0c62d20f9386c815e 100644 --- a/smqtt-rule/smqtt-rule-engine/src/main/java/io/github/quickmsg/rule/node/TopicRuleNode.java +++ b/smqtt-rule/smqtt-rule-engine/src/main/java/io/github/quickmsg/rule/node/TopicRuleNode.java @@ -47,7 +47,7 @@ public class TopicRuleNode implements RuleNode { log.info("rule engine TopicRuleNode request {}", heapMqttMessage); ProtocolAdaptor protocolAdaptor = receiveContext.getProtocolAdaptor(); protocolAdaptor.chooseProtocol(MockMqttChannel.wrapClientIdentifier(heapMqttMessage.getClientIdentifier()), - new SmqttMessage<>(getMqttMessage(heapMqttMessage),heapMqttMessage.getTimestamp(),Boolean.TRUE), receiveContext); + new SmqttMessage<>(getMqttMessage(heapMqttMessage), heapMqttMessage.getTimestamp(), Boolean.TRUE), receiveContext); executeNext(contextView); } @@ -58,7 +58,8 @@ public class TopicRuleNode implements RuleNode { MqttQoS.valueOf(heapMqttMessage.getQos()), 0, this.topic, - PooledByteBufAllocator.DEFAULT.buffer().writeBytes(heapMqttMessage.getMessage())); + PooledByteBufAllocator.DEFAULT.buffer().writeBytes(heapMqttMessage.getMessage()), + heapMqttMessage.getProperties()); } diff --git a/smqtt-spring-boot-starter/src/main/java/io/github/quickmsg/starter/AutoMqttConfiguration.java b/smqtt-spring-boot-starter/src/main/java/io/github/quickmsg/starter/AutoMqttConfiguration.java index 60132949c90165647e253c05f617e0dabb137996..ba7bcae6b35b4a997ef83d0face3c354619b85e6 100644 --- a/smqtt-spring-boot-starter/src/main/java/io/github/quickmsg/starter/AutoMqttConfiguration.java +++ b/smqtt-spring-boot-starter/src/main/java/io/github/quickmsg/starter/AutoMqttConfiguration.java @@ -35,6 +35,7 @@ public class AutoMqttConfiguration { .httpConfig(springBootstrapConfig.getHttp()) .websocketConfig(springBootstrapConfig.getWs()) .clusterConfig(springBootstrapConfig.getCluster()) + .persistenceType(springBootstrapConfig.getPersistenceType()) .redisConfig(springBootstrapConfig.getRedis()) .databaseConfig(springBootstrapConfig.getDb()) .ruleChainDefinitions(springBootstrapConfig.getRules()) diff --git a/smqtt-spring-boot-starter/src/main/java/io/github/quickmsg/starter/SpringBootstrapConfig.java b/smqtt-spring-boot-starter/src/main/java/io/github/quickmsg/starter/SpringBootstrapConfig.java index 7bf84d6c16b50882d8fccfff0c286a929f916705..b49ab112c1015ea6dd7efe4f3e972c7bb9d219ae 100644 --- a/smqtt-spring-boot-starter/src/main/java/io/github/quickmsg/starter/SpringBootstrapConfig.java +++ b/smqtt-spring-boot-starter/src/main/java/io/github/quickmsg/starter/SpringBootstrapConfig.java @@ -63,6 +63,10 @@ public class SpringBootstrapConfig { private BootstrapConfig.MeterConfig meter; + /** + * 持久化方式,0:内存(默认),1:Redis,2:DB + */ + private Integer persistenceType; /** * 数据库配置 */