diff --git a/mica-mqtt-broker/src/main/java/org/dromara/mica/mqtt/broker/cluster/RedisMqttMessageStore.java b/mica-mqtt-broker/src/main/java/org/dromara/mica/mqtt/broker/cluster/RedisMqttMessageStore.java index ee8471c13b28e69008566e218b646db2d432fd3c..1a3424f64c7b75c3707c95f226e307e40ee4eb1d 100644 --- a/mica-mqtt-broker/src/main/java/org/dromara/mica/mqtt/broker/cluster/RedisMqttMessageStore.java +++ b/mica-mqtt-broker/src/main/java/org/dromara/mica/mqtt/broker/cluster/RedisMqttMessageStore.java @@ -56,8 +56,9 @@ public class RedisMqttMessageStore implements IMqttMessageStore { } @Override - public boolean addRetainMessage(String topic, Message message) { + public boolean addRetainMessage(String topic, long timeout, Message message) { redisCache.set(RedisKeys.MESSAGE_STORE_RETAIN.getKey(topic), message, messageSerializer::serialize); + redisCache.expire(RedisKeys.MESSAGE_STORE_RETAIN.getKey(topic), timeout); return true; } diff --git a/mica-mqtt-common/src/main/java/org/dromara/mica/mqtt/core/util/TopicUtil.java b/mica-mqtt-common/src/main/java/org/dromara/mica/mqtt/core/util/TopicUtil.java index c4b0661ba70d29dea7c08fcf6474478fd2cfc740..d5cdec485ae14cbcc15aad0890d618b360b4689c 100644 --- a/mica-mqtt-common/src/main/java/org/dromara/mica/mqtt/core/util/TopicUtil.java +++ b/mica-mqtt-common/src/main/java/org/dromara/mica/mqtt/core/util/TopicUtil.java @@ -86,6 +86,21 @@ public final class TopicUtil { } } + /** + * 解析保留消息主题, topicName + * + * @param topicName topicName + */ + public static String[] retainTopicName(String topicName) { + if (topicName.startsWith("$retain/")) { + String[] retainArray = topicName.split("/", 3); + if (retainArray.length == 3) { + return new String[]{retainArray[2], retainArray[1]}; + } + } + return new String[]{topicName, "-1"}; + } + /** * 判断 topicFilter topicName 是否匹配 * @@ -104,7 +119,8 @@ public final class TopicUtil { // 是否进入 + 号层级通配符 boolean inLayerWildcard = false; int wildcardCharLen = 0; - topicFilterLoop: for (int i = 0; i < topicFilterLength; i++) { + topicFilterLoop: + for (int i = 0; i < topicFilterLength; i++) { ch = topicFilterChars[i]; if (ch == MqttCodecUtil.TOPIC_WILDCARDS_MORE) { // 校验: # 通配符只能在最后一位 @@ -207,5 +223,4 @@ public final class TopicUtil { // 检查是否同时存在 "${" 和 "}",并且 "}" 在 "${" 之后 return startIndex != -1 && endIndex != -1 && endIndex > startIndex + 2; } - } diff --git a/mica-mqtt-server/src/main/java/org/dromara/mica/mqtt/core/server/MqttServer.java b/mica-mqtt-server/src/main/java/org/dromara/mica/mqtt/core/server/MqttServer.java index fc4bc60a2ea5ba7b919e4420b56a2eae1fbcc769..56e9949c38fe8c43210e9fd64bd6d8c867c3ee5d 100644 --- a/mica-mqtt-server/src/main/java/org/dromara/mica/mqtt/core/server/MqttServer.java +++ b/mica-mqtt-server/src/main/java/org/dromara/mica/mqtt/core/server/MqttServer.java @@ -145,7 +145,7 @@ public final class MqttServer { * @return 是否发送成功 */ public boolean publish(String clientId, String topic, Object payload, MqttQoS qos) { - return publish(clientId, topic, payload, qos, false); + return publish(clientId, topic, payload, qos, false, false); } /** @@ -158,7 +158,7 @@ public final class MqttServer { * @return 是否发送成功 */ public boolean publish(String clientId, String topic, Object payload, boolean retain) { - return publish(clientId, topic, payload, MqttQoS.QOS0, retain); + return publish(clientId, topic, payload, MqttQoS.QOS0, retain, false); } /** @@ -171,12 +171,14 @@ public final class MqttServer { * @param retain 是否在服务器上保留消息 * @return 是否发送成功 */ - public boolean publish(String clientId, String topic, Object payload, MqttQoS qos, boolean retain) { + public boolean publish(String clientId, String topic, Object payload, MqttQoS qos, boolean retain, boolean store) { // 校验 topic TopicUtil.validateTopicName(topic); // 存储保留消息 - if (retain) { - this.saveRetainMessage(topic, qos, payload); + if (retain && !store) { + String[] retainTopicName = TopicUtil.retainTopicName(topic); + topic = retainTopicName[0]; + this.saveRetainMessage(topic, Long.parseLong(retainTopicName[1]), qos, payload); } // 获取 context ChannelContext context = Tio.getByBsId(getServerConfig(), clientId); @@ -247,7 +249,7 @@ public final class MqttServer { * @return 是否发送成功 */ public boolean publishAll(String topic, Object payload, MqttQoS qos) { - return publishAll(topic, payload, qos, false); + return publishAll(topic, payload, qos, false, false); } /** @@ -259,7 +261,7 @@ public final class MqttServer { * @return 是否发送成功 */ public boolean publishAll(String topic, Object payload, boolean retain) { - return publishAll(topic, payload, MqttQoS.QOS0, retain); + return publishAll(topic, payload, MqttQoS.QOS0, retain, false); } /** @@ -271,12 +273,14 @@ public final class MqttServer { * @param retain 是否在服务器上保留消息 * @return 是否发送成功 */ - public boolean publishAll(String topic, Object payload, MqttQoS qos, boolean retain) { + public boolean publishAll(String topic, Object payload, MqttQoS qos, boolean retain, boolean store) { // 校验 topic TopicUtil.validateTopicName(topic); // 存储保留消息 - if (retain) { - this.saveRetainMessage(topic, qos, payload); + if (retain && !store) { + String[] retainTopicName = TopicUtil.retainTopicName(topic); + topic = retainTopicName[0]; + this.saveRetainMessage(topic, Long.parseLong(retainTopicName[1]), qos, payload); } // 查找订阅该 topic 的客户端 List subscribeList = sessionManager.searchSubscribe(topic); @@ -310,9 +314,9 @@ public final class MqttServer { String clientId = message.getClientId(); MqttQoS mqttQoS = MqttQoS.valueOf(message.getQos()); if (StrUtil.isBlank(clientId)) { - return publishAll(topic, message.getPayload(), mqttQoS, message.isRetain()); + return publishAll(topic, message.getPayload(), mqttQoS, message.isRetain(), message.isStore()); } else { - return publish(clientId, topic, message.getPayload(), mqttQoS, message.isRetain()); + return publish(clientId, topic, message.getPayload(), mqttQoS, message.isRetain(), message.isStore()); } } @@ -323,7 +327,7 @@ public final class MqttServer { * @param mqttQoS MqttQoS * @param payload ByteBuffer */ - private void saveRetainMessage(String topic, MqttQoS mqttQoS, Object payload) { + private void saveRetainMessage(String topic, long timeout, MqttQoS mqttQoS, Object payload) { Message retainMessage = new Message(); retainMessage.setTopic(topic); retainMessage.setQos(mqttQoS.value()); @@ -333,7 +337,9 @@ public final class MqttServer { retainMessage.setDup(false); retainMessage.setTimestamp(System.currentTimeMillis()); retainMessage.setNode(serverCreator.getNodeName()); - this.messageStore.addRetainMessage(topic, retainMessage); + if (this.messageStore.addRetainMessage(topic, timeout, retainMessage)) { + retainMessage.setStore(true); + } } /** diff --git a/mica-mqtt-server/src/main/java/org/dromara/mica/mqtt/core/server/cluster/MqttClusterMessageListener.java b/mica-mqtt-server/src/main/java/org/dromara/mica/mqtt/core/server/cluster/MqttClusterMessageListener.java index 14ff979f3dc3b00803340a8375c8b6705e2e3dfd..1da60bbc613d8117c1aabad78102f992efde72c0 100644 --- a/mica-mqtt-server/src/main/java/org/dromara/mica/mqtt/core/server/cluster/MqttClusterMessageListener.java +++ b/mica-mqtt-server/src/main/java/org/dromara/mica/mqtt/core/server/cluster/MqttClusterMessageListener.java @@ -16,14 +16,14 @@ package org.dromara.mica.mqtt.core.server.cluster; +import org.dromara.mica.mqtt.codec.MqttMessageBuilders; +import org.dromara.mica.mqtt.codec.MqttPublishMessage; +import org.dromara.mica.mqtt.codec.MqttQoS; import org.dromara.mica.mqtt.core.server.MqttServer; import org.dromara.mica.mqtt.core.server.enums.MessageType; import org.dromara.mica.mqtt.core.server.event.IMqttMessageListener; import org.dromara.mica.mqtt.core.server.model.Message; import org.dromara.mica.mqtt.core.server.session.IMqttSessionManager; -import org.dromara.mica.mqtt.codec.MqttMessageBuilders; -import org.dromara.mica.mqtt.codec.MqttPublishMessage; -import org.dromara.mica.mqtt.codec.MqttQoS; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.tio.core.ChannelContext; @@ -92,7 +92,7 @@ public class MqttClusterMessageListener { } else if (MessageType.HTTP_API == messageType) { // http rest api 消息也会转发到此 MqttQoS mqttQoS = MqttQoS.valueOf(message.getQos()); - mqttServer.publishAll(topic, message.getPayload(), mqttQoS, message.isRetain()); + mqttServer.publishAll(topic, message.getPayload(), mqttQoS, message.isRetain(), message.isStore()); // 触发消息 try { onHttpApiMessage(topic, mqttQoS, message); diff --git a/mica-mqtt-server/src/main/java/org/dromara/mica/mqtt/core/server/dispatcher/AbstractMqttMessageDispatcher.java b/mica-mqtt-server/src/main/java/org/dromara/mica/mqtt/core/server/dispatcher/AbstractMqttMessageDispatcher.java index 1d3fdfbba94718af4be04b647ab7afcba1e7d486..1d2651c39f8a50ecbf5578a2cc7a63799fc6387a 100644 --- a/mica-mqtt-server/src/main/java/org/dromara/mica/mqtt/core/server/dispatcher/AbstractMqttMessageDispatcher.java +++ b/mica-mqtt-server/src/main/java/org/dromara/mica/mqtt/core/server/dispatcher/AbstractMqttMessageDispatcher.java @@ -16,14 +16,14 @@ package org.dromara.mica.mqtt.core.server.dispatcher; +import org.dromara.mica.mqtt.codec.MqttMessageBuilders; +import org.dromara.mica.mqtt.codec.MqttPublishMessage; +import org.dromara.mica.mqtt.codec.MqttQoS; import org.dromara.mica.mqtt.core.server.MqttServer; import org.dromara.mica.mqtt.core.server.enums.MessageType; import org.dromara.mica.mqtt.core.server.event.IMqttMessageListener; import org.dromara.mica.mqtt.core.server.model.Message; import org.dromara.mica.mqtt.core.server.session.IMqttSessionManager; -import org.dromara.mica.mqtt.codec.MqttMessageBuilders; -import org.dromara.mica.mqtt.codec.MqttPublishMessage; -import org.dromara.mica.mqtt.codec.MqttQoS; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.tio.core.ChannelContext; @@ -74,7 +74,7 @@ public abstract class AbstractMqttMessageDispatcher implements org.dromara.mica. String topic = message.getTopic(); // http rest api 消息也会转发到此 MqttQoS mqttQoS = MqttQoS.valueOf(message.getQos()); - mqttServer.publishAll(topic, message.getPayload(), mqttQoS, message.isRetain()); + mqttServer.publishAll(topic, message.getPayload(), mqttQoS, message.isRetain(), message.isStore()); // 触发消息 try { onHttpApiMessage(topic, mqttQoS, message); diff --git a/mica-mqtt-server/src/main/java/org/dromara/mica/mqtt/core/server/model/Message.java b/mica-mqtt-server/src/main/java/org/dromara/mica/mqtt/core/server/model/Message.java index ec6aab562de7a1ab1bfd507f26dca3e9680f9a3b..793cfa163c535113508eead4e9ccae057f0832b9 100644 --- a/mica-mqtt-server/src/main/java/org/dromara/mica/mqtt/core/server/model/Message.java +++ b/mica-mqtt-server/src/main/java/org/dromara/mica/mqtt/core/server/model/Message.java @@ -73,6 +73,10 @@ public class Message implements Serializable { * retain */ private boolean retain; + /** + * store retain=true时 是否完成持久化 + */ + private boolean store; /** * 消息内容 */ @@ -178,6 +182,14 @@ public class Message implements Serializable { this.retain = retain; } + public boolean isStore() { + return store; + } + + public void setStore(boolean store) { + this.store = store; + } + public byte[] getPayload() { return payload; } diff --git a/mica-mqtt-server/src/main/java/org/dromara/mica/mqtt/core/server/store/IMqttMessageStore.java b/mica-mqtt-server/src/main/java/org/dromara/mica/mqtt/core/server/store/IMqttMessageStore.java index 96a525e0815507f8de023b1e6a8bcd7b260b7bff..8e661625197ac47b8270dc4bdcbf93ee6f061d55 100644 --- a/mica-mqtt-server/src/main/java/org/dromara/mica/mqtt/core/server/store/IMqttMessageStore.java +++ b/mica-mqtt-server/src/main/java/org/dromara/mica/mqtt/core/server/store/IMqttMessageStore.java @@ -56,10 +56,11 @@ public interface IMqttMessageStore { * 存储 retain 消息 * * @param topic topic + * @param timeout timeout * @param message message * @return boolean */ - boolean addRetainMessage(String topic, Message message); + boolean addRetainMessage(String topic, long timeout, Message message); /** * 清理该 topic 的 retain 消息 diff --git a/mica-mqtt-server/src/main/java/org/dromara/mica/mqtt/core/server/store/InMemoryMqttMessageStore.java b/mica-mqtt-server/src/main/java/org/dromara/mica/mqtt/core/server/store/InMemoryMqttMessageStore.java index 6445c717494ab063f48a9a98d3b5409db62a364a..7a3530e1d8e5533c758bb205a91479ab813a4a30 100644 --- a/mica-mqtt-server/src/main/java/org/dromara/mica/mqtt/core/server/store/InMemoryMqttMessageStore.java +++ b/mica-mqtt-server/src/main/java/org/dromara/mica/mqtt/core/server/store/InMemoryMqttMessageStore.java @@ -19,11 +19,13 @@ package org.dromara.mica.mqtt.core.server.store; import org.dromara.mica.mqtt.core.server.model.Message; import org.dromara.mica.mqtt.core.util.TopicUtil; +import org.tio.utils.cache.TimedCache; import java.util.ArrayList; import java.util.List; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.TimeUnit; /** * message store @@ -36,9 +38,9 @@ public class InMemoryMqttMessageStore implements IMqttMessageStore { */ private final ConcurrentMap willStore = new ConcurrentHashMap<>(); /** - * 保持消息 topic: Message + * 带有有效期的保持消息 topic: Message */ - private final ConcurrentMap retainStore = new ConcurrentHashMap<>(); + private final TimedCache retainStore = new TimedCache<>(Long.MAX_VALUE); @Override public boolean addWillMessage(String clientId, Message message) { @@ -58,8 +60,12 @@ public class InMemoryMqttMessageStore implements IMqttMessageStore { } @Override - public boolean addRetainMessage(String topic, Message message) { - retainStore.put(topic, message); + public boolean addRetainMessage(String topic, long timeout, Message message) { + if (timeout <= 0) { + retainStore.put(topic, message); + } else { + retainStore.put(topic, message, TimeUnit.SECONDS.toMillis(timeout)); + } return true; } @@ -72,11 +78,11 @@ public class InMemoryMqttMessageStore implements IMqttMessageStore { @Override public List getRetainMessage(String topicFilter) { List retainMessageList = new ArrayList<>(); - retainStore.forEach((topic, message) -> { + for (String topic : retainStore.keySet()) { if (TopicUtil.match(topicFilter, topic)) { - retainMessageList.add(message); + retainMessageList.add(retainStore.get(topic)); } - }); + } return retainMessageList; } diff --git a/mica-mqtt-server/src/main/java/org/dromara/mica/mqtt/core/server/support/DefaultMqttServerProcessor.java b/mica-mqtt-server/src/main/java/org/dromara/mica/mqtt/core/server/support/DefaultMqttServerProcessor.java index 56c7426099db42a9fbeca46030fe2644d8f5c53f..d61e784388e6a8aadc0d28cac70e2ede04d7b36d 100644 --- a/mica-mqtt-server/src/main/java/org/dromara/mica/mqtt/core/server/support/DefaultMqttServerProcessor.java +++ b/mica-mqtt-server/src/main/java/org/dromara/mica/mqtt/core/server/support/DefaultMqttServerProcessor.java @@ -467,6 +467,8 @@ public class DefaultMqttServerProcessor implements MqttServerProcessor { if (MqttQoS.QOS0 == mqttQoS || payload == null || payload.length == 0) { this.messageStore.clearRetainMessage(topicName); } else { + String[] retainTopicName = TopicUtil.retainTopicName(topicName); + topicName = retainTopicName[0]; Message retainMessage = new Message(); retainMessage.setTopic(topicName); retainMessage.setQos(mqttQoS.value()); @@ -480,7 +482,9 @@ public class DefaultMqttServerProcessor implements MqttServerProcessor { // 客户端 ip:端口 retainMessage.setPeerHost(clientNode.getPeerHost()); retainMessage.setNode(serverCreator.getNodeName()); - this.messageStore.addRetainMessage(topicName, retainMessage); + if (this.messageStore.addRetainMessage(topicName, Long.parseLong(retainTopicName[1]), retainMessage)) { + retainMessage.setStore(true); + } } } // 2. message @@ -506,9 +510,10 @@ public class DefaultMqttServerProcessor implements MqttServerProcessor { message.setNode(serverCreator.getNodeName()); // 3. 消息发布 if (messageListener != null) { + String finalTopicName = topicName; executor.submit(() -> { try { - messageListener.onMessage(context, clientId, topicName, mqttQoS, publishMessage, message); + messageListener.onMessage(context, clientId, finalTopicName, mqttQoS, publishMessage, message); } catch (Throwable e) { logger.error(e.getMessage(), e); }