From 269850aa38dfef7f38ac6a8b73c5a4c2762d91a1 Mon Sep 17 00:00:00 2001 From: 17712858268 <419479707@qq.com> Date: Tue, 28 Mar 2023 16:09:59 +0800 Subject: [PATCH 1/2] =?UTF-8?q?MQTT-QoS=3D2=E6=97=B6=E8=AE=A2=E9=98=85?= =?UTF-8?q?=E6=97=B6=E6=80=BB=E6=97=B6=E6=94=B6=E5=88=B0=E9=87=8D=E8=AF=95?= =?UTF-8?q?=E6=B6=88=E6=81=AF=E9=97=AE=E9=A2=98=EF=BC=8C=E8=A7=A3=E5=86=B3?= =?UTF-8?q?=E6=96=B9=E6=B3=95=EF=BC=9A=E5=8F=91=E5=B8=83=E6=94=B6=E5=88=B0?= =?UTF-8?q?=E6=8A=A5=E6=96=87=E6=94=B6=E5=88=B0=E5=90=8E=E6=89=A7=E8=A1=8C?= =?UTF-8?q?=E5=8F=96=E6=B6=88=E9=87=8D=E8=AF=95=E5=8A=9F=E8=83=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../io/github/quickmsg/core/protocol/PublishRecProtocol.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/smqttx-core/src/main/java/io/github/quickmsg/core/protocol/PublishRecProtocol.java b/smqttx-core/src/main/java/io/github/quickmsg/core/protocol/PublishRecProtocol.java index cd9464bc..cfd5e8fc 100644 --- a/smqttx-core/src/main/java/io/github/quickmsg/core/protocol/PublishRecProtocol.java +++ b/smqttx-core/src/main/java/io/github/quickmsg/core/protocol/PublishRecProtocol.java @@ -22,6 +22,8 @@ public class PublishRecProtocol implements Protocol { ReceiveContext receiveContext = contextView.get(ReceiveContext.class); LogManager logManager = receiveContext.getLogManager(); logManager.printWarn(mqttChannel, LogEvent.PUBLISH_REC, LogStatus.SUCCESS, JacksonUtil.bean2Json(message)); + //MQTT-QoS=2时订阅时收到重试消息问题:发布收到报文收到后执行取消重试功能 + contextView.get(ReceiveContext.class).getRetryManager().cancelRetry(mqttChannel, message.getMessageId()); receiveContext.getMetricManager().getMetricRegistry().getMetricCounter(CounterType.PUBLISH_EVENT).increment(); mqttChannel.write(MqttMessageUtils.buildPublishRel(message.getMessageId())); } -- Gitee From cc9bb088baa44e46bb6567b2e26ad1ca9c8da919 Mon Sep 17 00:00:00 2001 From: 17712858268 <419479707@qq.com> Date: Tue, 28 Mar 2023 21:53:09 +0800 Subject: [PATCH 2/2] =?UTF-8?q?=E4=BF=AE=E6=94=B9=E7=9A=84BUG=EF=BC=9A?= =?UTF-8?q?=E5=A4=9A=E4=B8=AA=E5=AE=A2=E6=88=B7=E7=AB=AF=E8=AE=A2=E9=98=85?= =?UTF-8?q?=E7=9B=B8=E5=90=8C=E4=B8=BB=E9=A2=98=EF=BC=8C=E8=AF=A5=E4=B8=BB?= =?UTF-8?q?=E9=A2=98=E5=8C=85=E5=90=AB=E5=A4=9A=E5=B1=82=E9=80=9A=E9=85=8D?= =?UTF-8?q?=E7=AC=A6#=E6=88=96=E5=8D=95=E5=B1=82=E9=80=9A=E9=85=8D?= =?UTF-8?q?=E7=AC=A6+=EF=BC=8C=E4=BE=8B=E5=A6=82=EF=BC=9A/name/smqttx/#?= =?UTF-8?q?=E3=80=82=E5=A6=82=E6=9E=9C=E5=85=B6=E4=B8=AD=E4=B8=80=E4=B8=AA?= =?UTF-8?q?=E8=AE=A2=E9=98=85=E8=80=85=E5=8F=96=E6=B6=88=E8=AE=A2=E9=98=85?= =?UTF-8?q?=EF=BC=8C=E6=8E=A5=E7=9D=80=E5=90=91=E9=98=9F=E5=88=97=EF=BC=88?= =?UTF-8?q?=E4=BE=8B=E5=A6=82=EF=BC=9A/name/smqttx/1=EF=BC=89=E5=8F=91?= =?UTF-8?q?=E5=B8=83=E6=B6=88=E6=81=AF=EF=BC=8C=E5=85=B6=E4=BB=96=E8=AE=A2?= =?UTF-8?q?=E9=98=85=E8=80=85=E4=B9=9F=E6=94=B6=E4=B8=8D=E5=88=B0=E6=B6=88?= =?UTF-8?q?=E6=81=AF=E3=80=82?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../integrate/cluster/IntegrateCluster.java | 7 ++++++- .../interate/IgniteIntegrateCluster.java | 4 ++++ .../interate/IgniteIntegrateTopics.java | 17 ++++++++++++++--- 3 files changed, 24 insertions(+), 4 deletions(-) diff --git a/smqttx-common/src/main/java/io/github/quickmsg/common/integrate/cluster/IntegrateCluster.java b/smqttx-common/src/main/java/io/github/quickmsg/common/integrate/cluster/IntegrateCluster.java index c7334315..0f2bc8c9 100644 --- a/smqttx-common/src/main/java/io/github/quickmsg/common/integrate/cluster/IntegrateCluster.java +++ b/smqttx-common/src/main/java/io/github/quickmsg/common/integrate/cluster/IntegrateCluster.java @@ -35,7 +35,12 @@ public interface IntegrateCluster extends IntegrateGetter { */ String getLocalNode(); - + /** + * acquire local node id + * + * @return String + */ + String getLocalNodeConsistentId(); /** * 訂閱 diff --git a/smqttx-integrate/src/main/java/io/github/quickmsg/interate/IgniteIntegrateCluster.java b/smqttx-integrate/src/main/java/io/github/quickmsg/interate/IgniteIntegrateCluster.java index be667838..93b5e862 100644 --- a/smqttx-integrate/src/main/java/io/github/quickmsg/interate/IgniteIntegrateCluster.java +++ b/smqttx-integrate/src/main/java/io/github/quickmsg/interate/IgniteIntegrateCluster.java @@ -63,6 +63,10 @@ public class IgniteIntegrateCluster implements IntegrateCluster, Serializable { return igniteIntegrate.getIgnite().cluster().localNode().addresses().stream().findFirst().orElse(ServerUtils.serverIp); } + @Override + public String getLocalNodeConsistentId() { + return igniteIntegrate.getIgnite().cluster().localNode().consistentId().toString(); + } @Override public void listenTopic(String topic) { diff --git a/smqttx-integrate/src/main/java/io/github/quickmsg/interate/IgniteIntegrateTopics.java b/smqttx-integrate/src/main/java/io/github/quickmsg/interate/IgniteIntegrateTopics.java index 8592d085..b1c16bd6 100644 --- a/smqttx-integrate/src/main/java/io/github/quickmsg/interate/IgniteIntegrateTopics.java +++ b/smqttx-integrate/src/main/java/io/github/quickmsg/interate/IgniteIntegrateTopics.java @@ -6,6 +6,7 @@ import io.github.quickmsg.common.integrate.Integrate; import io.github.quickmsg.common.integrate.SubscribeTopic; import io.github.quickmsg.common.integrate.topic.IntegrateTopics; import io.github.quickmsg.common.metric.CounterType; +import io.github.quickmsg.common.utils.IPUtils; import io.github.quickmsg.common.utils.TopicRegexUtils; import lombok.Getter; import lombok.extern.slf4j.Slf4j; @@ -17,6 +18,7 @@ import org.apache.ignite.configuration.CollectionConfiguration; import java.util.*; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArraySet; +import java.util.function.Function; import java.util.stream.Collectors; /** @@ -65,7 +67,7 @@ public class IgniteIntegrateTopics implements IntegrateTopics { integrate.getCluster().listenTopic(topic); mqttChannel.getTopics().add(subscribeTopic); if (isWildcard(topic)) { - shareCache.add(topic); + shareCache.add(String.format("%s%s", integrate.getCluster().getLocalNodeConsistentId(), topic)); } } } @@ -94,7 +96,7 @@ public class IgniteIntegrateTopics implements IntegrateTopics { private void clearCache(String topic) { integrate.getCluster().stopListenTopic(topic); if (isWildcard(topic)) { - shareCache.remove(topic); + shareCache.remove(String.format("%s%s", integrate.getCluster().getLocalNodeConsistentId(), topic)); } } @@ -122,7 +124,16 @@ public class IgniteIntegrateTopics implements IntegrateTopics { @Override public Set getWildcardTopics(String topic) { - return shareCache.stream().filter(tp->topic.matches(TopicRegexUtils.regexTopic(tp))).collect(Collectors.toSet()); + Set nodeIds = integrate.getCluster().getClusterNode(); + return shareCache.stream().map(tp -> { + for (String id : nodeIds) { + if(tp.contains(id)){ + tp = tp.replaceFirst(id, ""); + break; + } + } + return tp; + }).filter(tp->topic.matches(TopicRegexUtils.regexTopic(tp))).collect(Collectors.toSet()); } @Override -- Gitee