From 4a3495dfc8cea6714c471307233fabe8b66ba361 Mon Sep 17 00:00:00 2001 From: chackylee Date: Thu, 3 Jul 2025 11:13:46 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E5=A4=8Dqos=E4=B8=BA2=E6=97=B6?= =?UTF-8?q?=EF=BC=8C=E9=80=9A=E9=85=8D=E7=AC=A6=E6=A8=A1=E5=BC=8F=E8=AE=A2?= =?UTF-8?q?=E9=98=85=E8=80=85=E6=97=A0=E6=B3=95=E6=94=B6=E5=88=B0=E6=B6=88?= =?UTF-8?q?=E6=81=AF=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../core/protocol/PublishRelProtocol.java | 92 +++++++++++++++---- 1 file changed, 75 insertions(+), 17 deletions(-) diff --git a/smqttx-core/src/main/java/io/github/quickmsg/core/protocol/PublishRelProtocol.java b/smqttx-core/src/main/java/io/github/quickmsg/core/protocol/PublishRelProtocol.java index 6584e565..6a01d1e6 100644 --- a/smqttx-core/src/main/java/io/github/quickmsg/core/protocol/PublishRelProtocol.java +++ b/smqttx-core/src/main/java/io/github/quickmsg/core/protocol/PublishRelProtocol.java @@ -24,30 +24,88 @@ import java.util.Set; */ // todo 暂不支持qos2 public class PublishRelProtocol implements Protocol { +// @Override +// public void parseProtocol(PublishRelMessage message, MqttChannel mqttChannel, ContextView contextView) { +// ReceiveContext receiveContext = contextView.get(ReceiveContext.class); +// LogManager logManager = receiveContext.getLogManager(); +// logManager.printInfo(mqttChannel, LogEvent.PUBLISH_REL, LogStatus.SUCCESS, JacksonUtil.bean2Json(message)); +// PublishMessage publishMessage=mqttChannel.sendQos2Cache(message.getMessageId()); +// mqttChannel.write(MqttMessageUtils.buildPublishComp(message.getMessageId())); +// if(publishMessage!=null){ +// receiveContext.getMetricManager().getMetricRegistry().getMetricCounter(CounterType.PUBLISH_EVENT).increment(); +// logManager.printInfo(mqttChannel, LogEvent.PUBLISH, LogStatus.SUCCESS, JacksonUtil.bean2Json(message)); +// +// String originalTopic = publishMessage.getTopic(); +// ClusterMessage clusterMessage = new ClusterMessage(publishMessage); +// IntegrateCluster integrateCluster =receiveContext.getIntegrate().getCluster(); +// integrateCluster.sendCluster(clusterMessage.getTopic(), clusterMessage); +// IntegrateTopics topics=receiveContext.getIntegrate().getTopics(); +// if(topics.isWildcard(clusterMessage.getTopic())){ +// Set wildcardTopics = topics.getWildcardTopics(clusterMessage.getTopic()); +// if (wildcardTopics != null && wildcardTopics.size() > 0) { +// wildcardTopics.forEach(tp -> { +// clusterMessage.setTopic(tp); +// integrateCluster.sendCluster(tp, clusterMessage); +// }); +// } +// } +// +// } +// } + @Override public void parseProtocol(PublishRelMessage message, MqttChannel mqttChannel, ContextView contextView) { - ReceiveContext receiveContext = contextView.get(ReceiveContext.class); + // 获取接收上下文 + ReceiveContext receiveContext = contextView.get(ReceiveContext.class); LogManager logManager = receiveContext.getLogManager(); - logManager.printInfo(mqttChannel, LogEvent.PUBLISH_REL, LogStatus.SUCCESS, JacksonUtil.bean2Json(message)); - PublishMessage publishMessage=mqttChannel.sendQos2Cache(message.getMessageId()); + + // 打印接收日志 + logManager.printInfo(mqttChannel, LogEvent.PUBLISH_REL, LogStatus.SUCCESS, + JacksonUtil.bean2Json(message)); + + // 获取QoS2缓存消息 + PublishMessage publishMessage = mqttChannel.sendQos2Cache(message.getMessageId()); + + // 发送PUBCOMP响应 mqttChannel.write(MqttMessageUtils.buildPublishComp(message.getMessageId())); - if(publishMessage!=null){ - receiveContext.getMetricManager().getMetricRegistry().getMetricCounter(CounterType.PUBLISH_EVENT).increment(); - logManager.printInfo(mqttChannel, LogEvent.PUBLISH, LogStatus.SUCCESS, JacksonUtil.bean2Json(message)); + + if (publishMessage != null) { + // 更新发布事件指标 + receiveContext.getMetricManager().getMetricRegistry() + .getMetricCounter(CounterType.PUBLISH_EVENT).increment(); + + // 打印发布成功日志 + logManager.printInfo(mqttChannel, LogEvent.PUBLISH, LogStatus.SUCCESS, + JacksonUtil.bean2Json(message)); + + // 创建集群消息对象(基于原始发布消息) ClusterMessage clusterMessage = new ClusterMessage(publishMessage); - IntegrateCluster integrateCluster =receiveContext.getIntegrate().getCluster(); - integrateCluster.sendCluster(clusterMessage.getTopic(), clusterMessage); - IntegrateTopics topics=receiveContext.getIntegrate().getTopics(); - if(topics.isWildcard(clusterMessage.getTopic())){ - Set wildcardTopics = topics.getWildcardTopics(clusterMessage.getTopic()); - if (wildcardTopics != null && wildcardTopics.size() > 0) { - wildcardTopics.forEach(tp -> { - clusterMessage.setTopic(tp); - integrateCluster.sendCluster(tp, clusterMessage); - }); - } + String originalTopic = publishMessage.getTopic(); // 保存原始主题 + + // 获取系统组件 + IntegrateCluster integrateCluster = receiveContext.getIntegrate().getCluster(); + IntegrateTopics topics = receiveContext.getIntegrate().getTopics(); + // ================ 第一步:发送到具体主题订阅者 ================ + integrateCluster.sendCluster(originalTopic, clusterMessage); + + // ================ 第二步:发送到通配符订阅者 ================ + // 查找匹配当前主题的所有通配符 + Set wildcardTopics = topics.getWildcardTopics(originalTopic); + // 遍历所有匹配的通配符主题 + for (String wildcardTopic : wildcardTopics) { + // 重要:创建消息副本避免污染原始消息 + ClusterMessage wildcardMessage = new ClusterMessage(publishMessage); +// // 使用通配符主题作为路由键(但消息内部保留原始主题) +// integrateCluster.sendCluster(wildcardTopic, wildcardMessage); + wildcardMessage.setTopic(wildcardTopic); + integrateCluster.sendCluster(wildcardTopic, wildcardMessage); } + // ================ 日志增强:记录匹配的通配符 ================ + if (!wildcardTopics.isEmpty()) { + logManager.printInfo(mqttChannel, LogEvent.PUBLISH, LogStatus.SUCCESS, + "Matched wildcards: " + wildcardTopics + " for topic: " + originalTopic); + } } } -- Gitee