# windmq **Repository Path**: mqh2016/windmq ## Basic Information - **Project Name**: windmq - **Description**: 🎉 基于springboot快速整合开发服务端处理MQTT消息的库。支持低端设备快速加解密;支持阿里云、EMQTT;高可用部署;Token鉴权ACL。 - **Primary Language**: Java - **License**: Apache-2.0 - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 0 - **Forks**: 8 - **Created**: 2025-01-20 - **Last Updated**: 2025-01-20 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README # windmq - MQTT快速开发脚手架 star fork ### 前言 快速开发处理MQTT topic,一个方法注解就搞定 原样从项目里搬出来的,产线阿里云,测试EMQ,需要统一支持下 有些config和bean不太合理,过年有空整理下 此项目整合springboot部分和topic规则搬运了一个项目,刚接触这个,十分感谢前辈的经验https://gitee.com/yezhihao/mqtt-sample 关于共享订阅的高可用兼容,如果有方案还望各位不吝赐教 ### 功能 - MQTT客户端登录凭证分配(ACL支持阿里云\EMQ目前只支持账号密码,可自定义实现) - 适合低端设备的查表加密协议(详情见: com.stanwind.wmqtt.security.TableMsgEncrypt) - 高可用部署(多实例不同clientID上线,EMQ有提供共享订阅,但是阿里云只能靠规则引擎转发MQ,我们线上使用全盘负责机制,谁发命令谁处理) - 消息处理池(CPU核心数*2 + 1, ) - Topic注解匹配消息处理,支持模糊匹配(正则实现,可取topic路径参数)和精确匹配 ### 默认规则 - 对客户端发送的TOPIC均以 IOT_CLIENT/xxx形式 (配置可修改) - 对服务端发送的TOPIC均以 IOT_SERVER/xxx形式 (配置可修改) - 加密行需在payload开头2 byte表示采用哪一行数据进行加密 (若启用加密则IOT开头的topic均会加密,详见:com.stanwind.wmqtt.security.IotDeviceMessageEncrypt) - 为兼容阿里云 clientId均以GID_DEVICE@@@开头 (配置可修改) - Server采用签名登录,阿里云环境下Client分配的账号密码使用token登录,鉴权信息有效时长12小时 - topic中{instanceId}表示匹配当前实例ID,{deviceId}表示匹配当前设备序列号(详情: com.stanwind.wmqtt.MqttConfig) ### springboot支持版本 - 2.0.X.RELEASE ### 项目仓库 ```xml com.stanwind spring-boot-windmq 1.0.0-RELEASE ``` ### 样例工程 https://gitee.com/sense7/windmq-demo.git ### 参考依赖 ```xml com.stanwind spring-boot-windmq 1.0.0-RELEASE org.springframework.boot spring-boot-starter-integration org.springframework.integration spring-integration-mqtt org.eclipse.paho.client.mqttv3 org.eclipse.paho org.eclipse.paho org.eclipse.paho.client.mqttv3 1.2.1 ``` ### 启用windmq ```java @EnableWindMQ @SpringBootApplication(exclude = DataSourceAutoConfiguration.class) public class DemoApplication { public static void main(String[] args) { SpringApplication.run(DemoApplication.class, args); } } ``` ### 使用样例 - 临时订阅/取消(注入ProducerHolder) ```java void addTopic(String... topic); void addTopic(String topic, int qos); void addTopics(String[] topic, int[] qos); void removeTopic(String... topic); ``` - 消息发送 IMessageService ```java void notify(String deviceId, Object payload); void sendToTopic(String topic, Object payload); MqttResponse request(String deviceId, MqttRequest payload); MqttResponse request(String deviceId, MqttRequest payload, long timeout); ``` - 消息处理 ```java @TopicHandler(topic = "$SYS/brokers/{node}/clients/{deviceId}/connected") public void connected(MQTTMsg msg) { ClientReqVO clientReqVO = JSONObject.parseObject(msg.getPayload().toString(), ClientReqVO.class); process(clientReqVO); } ``` - 获取路径参数 ```java @Service public class DemoHandler extends BaseTopicHandler { @TopicHandler(topic = "IOT_SERVER/ping/{instanceId}/{taskId}/{param1}") public void uploadPingData(MQTTMsg msg) { String taskId = getParam("taskId"); String param1 = getParam("param1"); //或 MqttContext.getContext().getParams().getOrDefault("taskId", null); } } ``` - 高可用方案(临时订阅处理完取消 适用于服务端发送控制指令,携带临时随机topic,客户端往服务端指定topic写) ```java @TopicHandler(topic = "IOT_SERVER/ping/{instanceId}/{taskId}") public void uploadPingData(MQTTMsg msg) { if (!currentHandle()) { log.debug("非当前实例任务: [{}]", msg); return; } if (接收完毕) { //取消订阅 mqttConfig.removeTopic(msg.getTopic()); } } ``` - 生成客户端链接数据 ```java @Autowired private MqttConfig mqttConfig; @Autowired private ClientApi clientApi; public MqttConnVO generateMqttConnConfig(String sn) throws Exception { String r = mqttConfig.getAclRead().replaceAll(DEVICE_ID, stcUtil.sn2cli(sn)); String w = mqttConfig.getAclWrite().replaceAll(DEVICE_ID, stcUtil.sn2cli(sn)); ConnData connData = clientApi.getTokenConn(Utils.splitToList(r), Utils.splitToList(w)); MqttConnVO vo = new MqttConnVO(); //缺省外网地址则返回统一地址 否则返回外网地址 vo.setUris(ArrayUtils.isEmpty(mqttConfig.getPubServerURIs()) ? mqttConfig.getServerURIs() : mqttConfig.getPubServerURIs()); vo.setReadTopics(r); vo.setWriteTopics(w); vo.setEnc(mqttConfig.getEncTable()); vo.setEncSize(mqttConfig.getEncCount()); BeanUtils.copyProperties(connData, vo); log.info("{} 获取mqtt: {}", sn, vo); return vo; } @Data public class MqttConnVO implements Serializable { private static final long serialVersionUID = 1L; private String[] uris; private Long expire; private String username; private String password; private String readTopics; private String writeTopics; private String enc; private Integer encSize; } ``` ### 使用建议 - 配置参见bootstrap.yml - 测试环境使用EMQ,产线使用阿里云