# SimpleMqttPool **Repository Path**: MinJun520/mqtt_pool ## Basic Information - **Project Name**: SimpleMqttPool - **Description**: 基于org.eclipse.paho.client.mqttv3实现的一个简易的Mqtt连接池 - **Primary Language**: Java - **License**: MulanPSL-2.0 - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 2 - **Forks**: 6 - **Created**: 2020-09-11 - **Last Updated**: 2022-04-29 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README # SimpleMqttPool 一个自定义的MQTT连接池 ## 介绍 在《[用Spring Boot实现一个简易的MQTT客户端](https://gitee.com/kami_xenos/simple-mqtt-client)》中介绍了一个自定义的MQTT的客户端的实现,但单一的客户端并不能满足实际的开发需求,这里基于之前实现的自定义客户端实现一个简单的连接池 ## 具体实现 一个简单的连接池需要这样几个要素: - 最小连接数 - 最大连接数 - 获取连接对象的方法 - 归还连接对象的方法 - 一个监视线程用于释放多余的连接 ### 创建两个集合用于存储*当前所有连接* 和*当前可用的空闲连接* ```java import java.util.HashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; public class SimpleMqttPool { /** * 用于存储所有的客户端 * key: clientId * value: client */ Map connectionMap = new ConcurrentHashMap<>(); /** * 用于存储当前可用的空闲客户端 */ List idleClients = new LinkedList<>(); } ``` ### 提供一个初始化方法,执行时创建最小连接数的连接对象 创建连接对象时需要的参数,作为成员变量,通过构造方法传递 ```java /*************配置信息类******************/ import lombok.Getter; import lombok.Setter; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.stereotype.Component; @Component @Setter @Getter @ConfigurationProperties("mqtt.client") public class SimpleMqttClientProperties { private String clientid; private String userName; private int timeOut; private int aliveTime; private int maxConnectTimes; private String[] topics; private int[] qos; } /***************连接池类*******************/ import cn.xenoscode.simplepool.config.SimpleMqttClientProperties; import java.util.HashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; /** * @author Xenos * @version V1.0 * @Package cn.xenoscode.simplepool.mqtt * @date 2020/9/5 14:07 */ public class SimpleMqttPool { /** * 用于存储所有的客户端 * key: clientId * value: client */ private Map connectionMap = new ConcurrentHashMap<>(); /** * 用于存储当前可用的空闲客户端 */ private List idleClients = new LinkedList<>(); /** * 连接池最小连接数 */ private int minPoolSize; /** * 连接池最大连接数 */ private int maxPoolSize; /** * 连接服务器地址 */ private String host; /** * 连接的配置信息 */ private SimpleMqttClientProperties clientProperties; public SimpleMqttPool(int minPoolSize, int maxPoolSize, String host, SimpleMqttClientProperties clientProperties) { this.minPoolSize = minPoolSize; this.maxPoolSize = maxPoolSize; this.host = host; this.clientProperties = clientProperties; } public void initialize() { String clientIdProperty = clientProperties.getClientid(); for (int i = 0; i < minPoolSize; i++) { //应为MQTT连接的客户端要求clientId不能重复,在这里做些处理 String clientId = clientIdProperty + "_" + (i + 1); SimpleMqttClient mqttClient = new SimpleMqttClient(clientId, clientProperties.getUserName(), host, clientProperties.getTimeOut(), clientProperties.getAliveTime(), clientProperties.getTopics(), clientProperties.getQos(), clientProperties.getMaxConnectTimes()); //连接客户端 mqttClient.connect(); //将创建的客户端保存在集合中 connectionMap.put(clientId, mqttClient); idleClients.add(mqttClient); } } } ``` ### 对外提供获取客户端和归还客户端的方法 ```java public SimpleMqttClient getClient() throws Exception { SimpleMqttClient client = null; synchronized (idleClients) { //创建一个迭代器,用于获取连接 Iterator iterator = idleClients.iterator(); while (iterator.hasNext()) { SimpleMqttClient simpleMqttClient = iterator.next(); // 判断连接是否可用 if (simpleMqttClient.isConnected()) { client = simpleMqttClient; System.out.println("连接被取出,ID为: " + client.getClientid()); //从空闲池中删除该连接 iterator.remove(); break; } } } //如果此时client还为空,说明当前空闲池中没有可用的连接 if (client == null) { //如果此时连接总数未达到连接池最大值,创建新的连接并返回 if (connectionMap.size() < maxPoolSize) { //获取未被使用客户端ID int i = 1; String clientId = clientProperties.getClientid() + "_" + i; while(connectionMap.get(clientId) != null) { i++; clientId = clientProperties.getClientid() + "_" + i; } //创建新的连接 SimpleMqttClient mqttClient = new SimpleMqttClient(clientId, clientProperties.getUserName(), host, clientProperties.getTimeOut(), clientProperties.getAliveTime(), clientProperties.getTopics(), clientProperties.getQos(), clientProperties.getMaxConnectTimes()); mqttClient.connect(); System.out.println("新建连接对象, ID为:" + clientId); client = mqttClient; //把连接保存在集合中 connectionMap.put(clientId, mqttClient); } else { throw new Exception("MQTT连接池连接数已达最大值"); } } return client; } public void releaseClient(SimpleMqttClient client) { synchronized (idleClients) { idleClients.add(client); System.out.println("将ID为 【" + client.getClientid() + "】 的连接放回连接池中"); } } ``` 这里我希望一个客户端将消息发送完成后才能继续被使用,给`SimpleMqttClient`加上一个标识位,再通过回调去修改标识位 ```java /****************SimpleMqttClient**********************/ public class SimpleMqttClient { /***其他属性****/ /** * 客户端空闲标识 */ private boolean idled = true; public boolean isIdled() { return idled; } public void setIdled(boolean idled) { this.idled = idled; } /***其他代码****/ } /********************回调类***************************/ import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; import org.eclipse.paho.client.mqttv3.MqttCallbackExtended; import org.eclipse.paho.client.mqttv3.MqttMessage; import java.time.LocalDateTime; import java.time.format.DateTimeFormatter; public class SimpleMqttClientCallback implements MqttCallbackExtended { private SimpleMqttClient client; private int connectTimes = 0; public SimpleMqttClientCallback(SimpleMqttClient client) { this.client = client; } @Override public void connectComplete(boolean b, String s) { System.out.println("————" + client.getClientid() + " 连接成功!————"); //连接成功后,自动订阅主题 client.subscribe(); connectTimes = 0; } @Override public void connectionLost(Throwable throwable) { //可以在此处做重连处理 if (connectTimes < client.getMaxConnectTimes()) { client.refresh(); connectTimes++; } else { client.disconnect(); } } @Override public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception { LocalDateTime startTime = LocalDateTime.now(); System.out.println("[MQTT]" + client.getClientid() + " ----成功接收消息!---- 时间: " + startTime.format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS"))); String content = new String(mqttMessage.getPayload()); System.out.println("接收消息主题 : " + topic); System.out.println("接收消息Qos : " + mqttMessage.getQos()); System.out.println("接收消息内容 : " + content); } @Override public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) { System.out.println("[MQTT]" + client.getClientid() + " ----成功发送消息!---- 时间: " + LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS"))); //消息发送成功后,将客户端设为空闲状态 client.setIdled(true); } } ``` 修改连接池的`getClient()`方法 ```java /** * 获取连接 * @return * @throws Exception */ public SimpleMqttClient getClient() throws Exception { SimpleMqttClient client = null; synchronized (idleClients) { //创建一个迭代器,用于获取连接 Iterator iterator = idleClients.iterator(); while (iterator.hasNext()) { SimpleMqttClient simpleMqttClient = iterator.next(); // 判断连接是否可用 if (isAvailableClient(simpleMqttClient)) { client = simpleMqttClient; System.out.println("连接被取出,ID为: " + client.getClientid()); //从空闲池中删除该连接 iterator.remove(); break; } } } //如果此时client还为空,说明当前空闲池中没有可用的连接,创建新的连接 if (client == null) { client = createNewClient(); } //将连接对象设为非空闲状态 client.setIdled(false); return client; } /** * 判断连接对象是否可用 * @param client * @return */ private boolean isAvailableClient(SimpleMqttClient client) { return client.isConnected() && client.isIdled(); } /** * 创建未被使用clientId * @return clientId */ private String createClientId() { int i = 1; String clientId = clientProperties.getClientid() + "_" + i; while(connectionMap.get(clientId) != null) { i++; clientId = clientProperties.getClientid() + "_" + i; } return clientId; } /** * 创建一个新的客户端并连接 * @return 连接客户端 * @throws Exception 已达连接池上线 */ private SimpleMqttClient createNewClient() throws Exception { SimpleMqttClient client = null; if (connectionMap.size() < maxPoolSize) { //获取未被使用客户端ID String clientId = createClientId(); //创建新的连接 SimpleMqttClient mqttClient = new SimpleMqttClient(clientId, clientProperties.getUserName(), host, clientProperties.getTimeOut(), clientProperties.getAliveTime(), clientProperties.getTopics(), clientProperties.getQos(), clientProperties.getMaxConnectTimes()); mqttClient.connect(); System.out.println("新建连接对象, ID为:" + clientId); client = mqttClient; //把连接保存在集合中 connectionMap.put(clientId, mqttClient); } else { throw new Exception("MQTT连接池连接数已达最大值"); } return client; } ``` 到这里连接池的基础功能应该算是完成了,可以简单的测试一下 - 编写配置文件 ```yaml mqtt: # emq的默认端口为1883 host: tcp://127.0.0.1:1883 client: clientid: testClient user-name: test time-out: 10 alive-time: 20 max-connect-times: 5 topics: ["HELLOWORLD"] qos: [2] pool: min-pool-size: 5 max-pool-size: 20 ``` - 编写配置类 ```java /********************连接池属性类**************************/ import lombok.Getter; import lombok.Setter; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.stereotype.Component; @Component @Setter @Getter @ConfigurationProperties("mqtt.pool") public class SimpleMqttPoolProperties { private int minPoolSize; private int maxPoolSize; } /*******************配置类********************************/ import cn.xenoscode.simplepool.mqtt.SimpleMqttPool; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class MqttClientPoolConfiguration { @Value("${mqtt.host}") private String host; @Autowired private SimpleMqttClientProperties simpleMqttClientProperties; @Autowired private SimpleMqttPoolProperties simpleMqttPoolProperties; @Bean("mqttPool") public SimpleMqttPool mqttPool() { SimpleMqttPool simpleMqttPool = new SimpleMqttPool(simpleMqttPoolProperties.getMinPoolSize(), simpleMqttPoolProperties.getMaxPoolSize(), host, simpleMqttClientProperties); return simpleMqttPool; } } ``` - 初始化连接池 ```java /***************** Initialize ***************************/ import cn.xenoscode.simplepool.mqtt.SimpleMqttPool; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.CommandLineRunner; import org.springframework.stereotype.Component; @Component public class Initialize implements CommandLineRunner { @Autowired private SimpleMqttPool mqttPool; @Override public void run(String... args) throws Exception { mqttPool.initialize(); } } ``` - 测试用的controller ```java import cn.xenoscode.simplepool.mqtt.SimpleMqttClient; import cn.xenoscode.simplepool.mqtt.SimpleMqttPool; import org.eclipse.paho.client.mqttv3.MqttMessage; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RestController; @RestController public class TestController { @Autowired private SimpleMqttPool mqttPool; /** * 每次调用该接口,向服务器推送100条数据 */ @GetMapping("/test/publish") public void publishTest() { byte[] payload = "publish test".getBytes(); MqttMessage mqttMessage = new MqttMessage(); mqttMessage.setQos(2); mqttMessage.setPayload(payload); String topic = "PUBLISH_TEST"; int times = 100; for (int i = 0; i < times; i++) { try { SimpleMqttClient client = mqttPool.getClient(); client.publish(topic, mqttMessage); mqttPool.releaseClient(client); } catch (Exception e) { System.out.println(e.toString()); } } } } ``` 启动项目 ```txt . ____ _ __ _ _ /\\ / ___'_ __ _ _(_)_ __ __ _ \ \ \ \ ( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \ \\/ ___)| |_)| | | | | || (_| | ) ) ) ) ' |____| .__|_| |_|_| |_\__, | / / / / =========|_|==============|___/=/_/_/_/ :: Spring Boot :: (v2.3.3.RELEASE) 2020-09-08 17:20:15.114 INFO 12268 --- [ main] c.x.s.SimpleMqttPoolApplication : Starting SimpleMqttPoolApplication on Xenos with PID 12268 (D:\study\workspace\SimpleMqttPool\simple-mqtt-pool\target\classes started by 89314 in D:\study\workspace\SimpleMqttPool) 2020-09-08 17:20:15.117 INFO 12268 --- [ main] c.x.s.SimpleMqttPoolApplication : No active profile set, falling back to default profiles: default 2020-09-08 17:20:15.805 INFO 12268 --- [ main] o.s.b.w.embedded.tomcat.TomcatWebServer : Tomcat initialized with port(s): 8080 (http) 2020-09-08 17:20:15.814 INFO 12268 --- [ main] o.apache.catalina.core.StandardService : Starting service [Tomcat] 2020-09-08 17:20:15.814 INFO 12268 --- [ main] org.apache.catalina.core.StandardEngine : Starting Servlet engine: [Apache Tomcat/9.0.37] 2020-09-08 17:20:15.877 INFO 12268 --- [ main] o.a.c.c.C.[Tomcat].[localhost].[/] : Initializing Spring embedded WebApplicationContext 2020-09-08 17:20:15.877 INFO 12268 --- [ main] w.s.c.ServletWebServerApplicationContext : Root WebApplicationContext: initialization completed in 727 ms 2020-09-08 17:20:16.008 INFO 12268 --- [ main] o.s.s.concurrent.ThreadPoolTaskExecutor : Initializing ExecutorService 'applicationTaskExecutor' 2020-09-08 17:20:16.115 INFO 12268 --- [ main] o.s.b.w.embedded.tomcat.TomcatWebServer : Tomcat started on port(s): 8080 (http) with context path '' 2020-09-08 17:20:16.121 INFO 12268 --- [ main] c.x.s.SimpleMqttPoolApplication : Started SimpleMqttPoolApplication in 1.308 seconds (JVM running for 1.596) ————testClient_1 连接成功!———— ————testClient_2 连接成功!———— ————testClient_3 连接成功!———— ————testClient_4 连接成功!———— ————testClient_5 连接成功!———— ``` 我们可用看到日志输出有5个客户端连接,登录EMQ的Dashboard查看 ![connection](./img/pool-connection.PNG) 进行数据订阅测试 ![subscribe-test](./img/pool-subscribe-test.PNG) ```txt [MQTT]testClient_1 ----成功接收消息!---- 时间: 2020-09-08 17:25:29.051 [MQTT]testClient_4 ----成功接收消息!---- 时间: 2020-09-08 17:25:29.051 [MQTT]testClient_5 ----成功接收消息!---- 时间: 2020-09-08 17:25:29.051 [MQTT]testClient_2 ----成功接收消息!---- 时间: 2020-09-08 17:25:29.051 [MQTT]testClient_3 ----成功接收消息!---- 时间: 2020-09-08 17:25:29.051 接收消息主题 : HELLOWORLD 接收消息主题 : HELLOWORLD 接收消息Qos : 2 接收消息内容 : { "msg": "Hello, World!" } 接收消息主题 : HELLOWORLD 接收消息Qos : 2 接收消息内容 : { "msg": "Hello, World!" } 接收消息主题 : HELLOWORLD 接收消息Qos : 2 接收消息Qos : 2 接收消息内容 : { "msg": "Hello, World!" } 接收消息主题 : HELLOWORLD 接收消息Qos : 2 接收消息内容 : { "msg": "Hello, World!" } 接收消息内容 : { "msg": "Hello, World!" } ``` 可以看到控制台这里每个连接都接收到了数据,但我们希望只接收一次数据,在查阅EMQ的文档后,发现它提供了一种共享订阅的模式,改写客户端的订阅方法 ```java public class SimpleMqttClient { /************其他代码************/ /** * 消息订阅 */ public void subscribe() { if (client != null && client.isConnected()) { try { IMqttToken token = client.subscribe(listTopic, listQos); token.waitForCompletion(); } catch (MqttException e) { e.printStackTrace(); System.out.println(clientid + "订阅主题时发生错误: " + e.toString()); } } } /** * 消息共享订阅 */ public void sharedSubscribe() { if (client != null && client.isConnected()) { try { String[] topics = new String[listTopic.length]; for (int i = 0; i < listTopic.length; i++) { topics[i] = "$queue/" + listTopic[i]; } //EMQ共享订阅模式 IMqttToken token = client.subscribe(topics, listQos); token.waitForCompletion(); } catch (MqttException e) { e.printStackTrace(); System.out.println(clientid + "订阅主题时发生错误: " + e.toString()); } } } } /************回调类************/ public class SimpleMqttClientCallback implements MqttCallbackExtended { /*****************其他代码******************/ @Override public void connectComplete(boolean b, String s) { System.out.println("————" + client.getClientid() + " 连接成功!————"); //连接成功后,自动订阅主题 client.sharedSubscribe(); } /*****************其他代码******************/ } ``` 再次测试 ```txt ————testClient_1 连接成功!———— ————testClient_2 连接成功!———— ————testClient_3 连接成功!———— ————testClient_4 连接成功!———— ————testClient_5 连接成功!———— [MQTT]testClient_4 ----成功接收消息!---- 时间: 2020-09-08 17:31:43.639 接收消息主题 : HELLOWORLD 接收消息Qos : 2 接收消息内容 : { "msg": "Hello, World!" } ``` 服务这里只接收到了一条数据,符合我们的需求。 消息下发测试 调用接口`localhost:8080/test/publish` 向服务器推送数据 调用接口之前的服务器数据 ![publish](./img/pool-before-publish.PNG) 调用接口 ```txt [MQTT]testClient_3 ----成功发送消息!---- 时间: 2020-09-08 17:35:35.274 将ID为 【testClient_4】 的连接放回连接池中 连接被取出,ID为: testClient_5 [MQTT]testClient_4 ----成功发送消息!---- 时间: 2020-09-08 17:35:35.275 将ID为 【testClient_5】 的连接放回连接池中 连接被取出,ID为: testClient_1 [MQTT]testClient_5 ----成功发送消息!---- 时间: 2020-09-08 17:35:35.275 将ID为 【testClient_1】 的连接放回连接池中 连接被取出,ID为: testClient_2 [MQTT]testClient_1 ----成功发送消息!---- 时间: 2020-09-08 17:35:35.275 将ID为 【testClient_2】 的连接放回连接池中 连接被取出,ID为: testClient_3 [MQTT]testClient_2 ----成功发送消息!---- 时间: 2020-09-08 17:35:35.275 将ID为 【testClient_3】 的连接放回连接池中 连接被取出,ID为: testClient_4 [MQTT]testClient_3 ----成功发送消息!---- 时间: 2020-09-08 17:35:35.276 将ID为 【testClient_4】 的连接放回连接池中 连接被取出,ID为: testClient_5 [MQTT]testClient_4 ----成功发送消息!---- 时间: 2020-09-08 17:35:35.276 将ID为 【testClient_5】 的连接放回连接池中 [MQTT]testClient_5 ----成功发送消息!---- 时间: 2020-09-08 17:35:35.276 ... ... ``` 可以看到日志输出中的信息,再看一下服务器数据 ![publish](./img/pool-after-publish.PNG) 这样连接池的基本功能就已经算是完成了 ### 编写监视线程 上面已经完成了连接池的基本功能,我们还需要编写监视线程去执行一些事务,如: - 回收多余的连接 - 回收不可用的连接 - 监视池中的连接数量,保证连接数达到最小连接数 ```java public class SimpleMqttPool { /************其他属性****************/ /** * 监视线程的间隔实际 */ private long overseeInterval; /** * 用于存储客户端处于连接中断状态下被监视线程扫描的次数 * key: clientId * value: times */ private Map disconnectedClientMap = new HashMap<>(); /** * 用于存储客户端处于忙碌状态下被监视线程扫描的次数 * key: clientId * value: times */ private Map busyClientMap = new HashMap<>(); /** * 超时回数 * 当监视线程检测到一个客户端处于忙碌的状态的回数超过该值时,更改客户端的状态 */ private final int OVER_TIMES = 5; public SimpleMqttPool(int minPoolSize, int maxPoolSize, long overseeInterval, String host, SimpleMqttClientProperties clientProperties) { this.minPoolSize = minPoolSize; this.maxPoolSize = maxPoolSize; this.overseeInterval = overseeInterval; this.host = host; this.clientProperties = clientProperties; } /** * 连接池初始化 */ public void initialize() { String clientIdProperty = clientProperties.getClientid(); for (int i = 0; i < minPoolSize; i++) { //应为MQTT连接的客户端要求clientId不能重复,在这里做些处理 String clientId = clientIdProperty + "_" + (i + 1); SimpleMqttClient mqttClient = new SimpleMqttClient(clientId, clientProperties.getUserName(), host, clientProperties.getTimeOut(), clientProperties.getAliveTime(), clientProperties.getTopics(), clientProperties.getQos(), clientProperties.getMaxConnectTimes()); //连接客户端 mqttClient.connect(); //将创建的客户端保存在集合中 connectionMap.put(clientId, mqttClient); idleClients.add(mqttClient); } //启动监视线程 Thread thread = new Thread(new Watcher()); thread.start(); } /********其他代码********/ class Watcher implements Runnable { @Override public void run() { while (true) { System.out.println("__________监视线程执行中___________"); try { synchronized (idleClients) { //1.清除无效连接 clearUnusableConnections(); //2.修改处于忙碌的连接状态 refreshBusyConnections(); //3.清除多余连接 closeConnections(); //4.如果连接数量小于最小连接数量,创建连接 createClients(); } System.out.println("__________监视线程执行完毕___________"); Thread.sleep(overseeInterval); } catch (Exception e) { System.out.println("监视线程执行时发生错误:" + e.toString()); } } } private boolean isBusy() { //空闲池中可用的连接数大于最小连接数时为非繁忙状态 int availableClientNum = (int) idleClients.stream().filter(SimpleMqttPool.this::isAvailableClient).count(); System.out.println("连接池当前可用连接数:" + availableClientNum); return availableClientNum < minPoolSize; } /** * 清楚不可用的连接,连接中断的,虽然客户端设置了断线重连策略,但也有可能连不上服务器,这些连接需要被清除 * 设置检查到一个客户端连接中断被检查到一定次数后,将其清除 */ private void clearUnusableConnections() { System.out.println("正在清除无效连接"); Collection connections = connectionMap.values(); Iterator iterator = connections.iterator(); while (iterator.hasNext()) { SimpleMqttClient client = iterator.next(); if (!client.isConnected()) { Integer times = disconnectedClientMap.containsKey(client.getClientid()) ? disconnectedClientMap.get(client.getClientid()) : 0; times++; if (times > OVER_TIMES) { connectionMap.remove(client.getClientid()); SimpleMqttClient finalClient = client; idleClients.removeIf((idleClient) -> idleClient.getClientid() == finalClient.getClientid()); disconnectedClientMap.remove(client.getClientid()); client.disconnect(); client = null; } else { disconnectedClientMap.put(client.getClientid(), times); } } } } /** * 通过回调设置连接客户端的空闲标识,如果回调失败,则客户端永远处于忙碌状态,需要通过监视线程修改这些客户端的标识 * 这里设置监视线程监视到一个客户端处于忙碌状态达到一定次数后,强制更改其状态 */ private void refreshBusyConnections() { System.out.println("正在重置忙碌连接"); Collection connections = connectionMap.values(); Iterator iterator = connections.iterator(); while (iterator.hasNext()) { SimpleMqttClient client = iterator.next(); if (isAvailableClient(client)) { busyClientMap.put(client.getClientid(), 0); } else { Integer times = busyClientMap.containsKey(client.getClientid()) ? busyClientMap.get(client.getClientid()) : 0; times++; if (times > OVER_TIMES) { if (client.isConnected()) { //将标识位改为空闲 client.setIdled(true); times = 0; } busyClientMap.put(client.getClientid(), times); } } } } /** * 关闭多余连接 */ private void closeConnections() { if (!isBusy()) { System.out.println("正在关闭多余连接"); for (int i = idleClients.size(); i > minPoolSize; i--) { SimpleMqttClient client = idleClients.get(i - 1); if (isAvailableClient(client)) { idleClients.remove(i - 1); connectionMap.remove(client.getClientid()); busyClientMap.remove(client.getClientid()); client.disconnect(); client = null; } } } } /** * 监测连接池中连接个数,如果连接小于最小连接个数,创建连接 */ private void createClients() { if (connectionMap.size() < minPoolSize) { System.out.println("正在创建新的连接"); for (int i = connectionMap.size(); i < minPoolSize; i++) { try { SimpleMqttClient client = createNewClient(); client.setIdled(true); idleClients.add(client); } catch (Exception e) { System.out.println("监视线程创建新的连接时发生异常:" + e.toString()); } } } } } } ``` 通过配置文件,读取监视线程相关的一些参数,然后在连接池初始化时,启动监视线程。由于存在监视线程对连接进行监视,就没有必要在回调类中设置断线重连 到这里,一个自定义的MQTT连接池就算完成了。 ### 进行多线程测试 编写测试接口 ```java /** * 每次调用该接口,开辟多个线程进行向服务器推送数据,每次线程推送100条 */ @GetMapping("/test/publish/mutil") public void mutilPublishTest() { byte[] payload = "publish test".getBytes(); MqttMessage mqttMessage = new MqttMessage(); mqttMessage.setQos(2); mqttMessage.setPayload(payload); String topic = "PUBLISH_TEST"; int threadCount = 5; for (int i = 0; i < threadCount; i++) { Thread thread = new Thread(new Runnable() { @Override public void run() { int times = 100; for (int j = 0; j < times; j++) { try { SimpleMqttClient client = mqttPool.getClient(); client.publish(topic, mqttMessage); mqttPool.releaseClient(client); } catch (Exception e) { LoggerFactory.getLogger(TestController.class).error(e.toString()); } } } }); thread.start(); } } ``` 经过测试,上面的代码在多线程进行数据推送时,存在大量的bug,这里直接给出修复后的代码 ```java import cn.xenoscode.simplepool.config.SimpleMqttClientProperties; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.*; import java.util.concurrent.ConcurrentHashMap; public class SimpleMqttPool { private static Logger logger = LoggerFactory.getLogger(SimpleMqttPool.class); /** * 用于存储所有的客户端 * key: clientId * value: client */ private Map connectionMap = new ConcurrentHashMap<>(); /** * 用于存储当前可用的空闲客户端 */ private List idleClients = new LinkedList<>(); /** * 连接池最小连接数 */ private int minPoolSize; /** * 连接池最大连接数 */ private int maxPoolSize; /** * 监视线程的间隔实际 */ private long overseeInterval; /** * 连接服务器地址 */ private String host; /** * 连接的配置信息 */ private SimpleMqttClientProperties clientProperties; /** * 用于存储客户端处于连接中断状态下被监视线程扫描的次数 * key: clientId * value: times */ private Map disconnectedClientMap = new HashMap<>(); /** * 用于存储客户端处于忙碌状态下被监视线程扫描的次数 * key: clientId * value: times */ private Map busyClientMap = new HashMap<>(); /** * 超时回数 * 当监视线程检测到一个客户端处于忙碌的状态的回数超过该值时,更改客户端的状态 */ private final int OVER_TIMES = 5; public SimpleMqttPool(int minPoolSize, int maxPoolSize, long overseeInterval, String host, SimpleMqttClientProperties clientProperties) { this.minPoolSize = minPoolSize; this.maxPoolSize = maxPoolSize; this.overseeInterval = overseeInterval; this.host = host; this.clientProperties = clientProperties; } /** * 连接池初始化 */ public void initialize() { String clientIdProperty = clientProperties.getClientid(); for (int i = 0; i < minPoolSize; i++) { //应为MQTT连接的客户端要求clientId不能重复,在这里做些处理 String clientId = clientIdProperty + "_" + (i + 1); SimpleMqttClient mqttClient = new SimpleMqttClient(clientId, clientProperties.getUserName(), host, clientProperties.getTimeOut(), clientProperties.getAliveTime(), clientProperties.getTopics(), clientProperties.getQos(), clientProperties.getMaxConnectTimes()); //连接客户端 mqttClient.connect(); //将创建的客户端保存在集合中 connectionMap.put(clientId, mqttClient); idleClients.add(mqttClient); } //启动监视线程 Thread thread = new Thread(new Watcher()); thread.start(); } /** * 获取连接 * * @return * @throws Exception */ public synchronized SimpleMqttClient getClient() throws Exception { SimpleMqttClient client = null; try { synchronized (idleClients) { //创建一个迭代器,用于获取连接 Iterator iterator = idleClients.iterator(); while (iterator.hasNext()) { SimpleMqttClient simpleMqttClient = iterator.next(); // 判断连接是否可用 if (isAvailableClient(simpleMqttClient)) { client = simpleMqttClient; logger.info("连接被取出,ID为: " + client.getClientid()); //从空闲池中删除该连接 iterator.remove(); break; } } } //如果此时client还为空,说明当前空闲池中没有可用的连接,创建新的连接 if (client == null) { client = createNewClient(); } //将连接对象设为非空闲状态 client.setIdled(false); } catch (Exception e) { e.printStackTrace(); throw e; } return client; } /** * 释放连接 * * @param client */ public void releaseClient(SimpleMqttClient client) { synchronized (idleClients) { idleClients.add(client); logger.info("将ID为 【" + client.getClientid() + "】 的连接放回连接池中"); } } /** * 判断连接对象是否可用 * * @param client * @return */ private boolean isAvailableClient(SimpleMqttClient client) { return client.isConnected() && client.isIdled(); } /** * 创建未被使用clientId * * @return clientId */ private String createClientId() { int i = 1; String clientId = clientProperties.getClientid() + "_" + i; while (connectionMap.get(clientId) != null) { i++; clientId = clientProperties.getClientid() + "_" + i; } return clientId; } /** * 创建一个新的客户端并连接 * * @return 连接客户端 * @throws Exception 已达连接池上线 */ private SimpleMqttClient createNewClient() throws Exception { SimpleMqttClient client = null; if (connectionMap.size() < maxPoolSize) { //获取未被使用客户端ID String clientId = createClientId(); //创建新的连接 SimpleMqttClient mqttClient = new SimpleMqttClient(clientId, clientProperties.getUserName(), host, clientProperties.getTimeOut(), clientProperties.getAliveTime(), clientProperties.getTopics(), clientProperties.getQos(), clientProperties.getMaxConnectTimes()); mqttClient.connect(); logger.info("新建连接对象, ID为:" + clientId); client = mqttClient; //把连接保存在集合中 connectionMap.put(clientId, mqttClient); } else { throw new Exception("MQTT连接池连接数已达最大值"); } return client; } class Watcher implements Runnable { @Override public void run() { while (true) { logger.info("__________监视线程执行中___________"); try { synchronized (idleClients) { logger.info("当前连接池连接总数: " + connectionMap.size()); connectionMap.values().forEach(client -> logger.info(client != null ? client.getClientid() : null)); logger.info("当前连接池可用连接个数: " + idleClients.size()); idleClients.forEach(client -> logger.info(client != null ? client.getClientid() : null)); //1.清除无效连接 clearUnusableConnections(); //2.修改处于忙碌的连接状态 refreshBusyConnections(); //3.清除多余连接 closeConnections(); //4.如果连接数量小于最小连接数量,创建连接 createClients(); } logger.info("__________监视线程执行完毕___________"); Thread.sleep(overseeInterval); } catch (Exception e) { logger.error("监视线程执行时发生错误:" + e.toString()); } } } private boolean isBusy() { //空闲池中可用的连接数大于最小连接数时为非繁忙状态 int availableClientNum = (int) idleClients.stream().filter(SimpleMqttPool.this::isAvailableClient).count(); logger.info("连接池当前可用连接数:" + availableClientNum); return availableClientNum < minPoolSize; } /** * 清楚不可用的连接,连接中断的,虽然客户端设置了断线重连策略,但也有可能连不上服务器,这些连接需要被清除 * 设置检查到一个客户端连接中断被检查到一定次数后,将其清除 */ private void clearUnusableConnections() { logger.info("正在清除无效连接"); try { Collection connections = connectionMap.values(); Iterator iterator = connections.iterator(); while (iterator.hasNext()) { SimpleMqttClient client = iterator.next(); if (!client.isConnected()) { Integer times = disconnectedClientMap.containsKey(client.getClientid()) ? disconnectedClientMap.get(client.getClientid()) : 0; times++; if (times > OVER_TIMES) { connectionMap.remove(client.getClientid()); SimpleMqttClient finalClient = client; idleClients.removeIf((idleClient) -> idleClient.getClientid() == finalClient.getClientid() || idleClient == null); disconnectedClientMap.remove(client.getClientid()); logger.info("无效连接被清除,ID为 " + client.getClientid()); client.disconnect(); client = null; } else { disconnectedClientMap.put(client.getClientid(), times); } } } } catch (Exception e) { logger.error("清除无效连接时发生异常: " + e.toString()); e.printStackTrace(); } } /** * 通过回调设置连接客户端的空闲标识,如果回调失败,则客户端永远处于忙碌状态,需要通过监视线程修改这些客户端的标识 * 这里设置监视线程监视到一个客户端处于忙碌状态达到一定次数后,强制更改其状态 */ private void refreshBusyConnections() { logger.info("正在重置忙碌连接"); try { Collection connections = connectionMap.values(); Iterator iterator = connections.iterator(); while (iterator.hasNext()) { SimpleMqttClient client = iterator.next(); if (isAvailableClient(client)) { busyClientMap.put(client.getClientid(), 0); } else { Integer times = busyClientMap.containsKey(client.getClientid()) ? busyClientMap.get(client.getClientid()) : 0; times++; if (times > OVER_TIMES) { if (client.isConnected()) { //将标识位改为空闲 client.setIdled(true); logger.info("忙碌连接被重置,ID为 " + client.getClientid()); times = 0; } busyClientMap.put(client.getClientid(), times); } } } } catch (Exception e) { logger.error("重置忙碌连接时发生异常: " + e.toString()); e.printStackTrace(); } } /** * 关闭多余连接 */ private void closeConnections() { if (!isBusy()) { logger.info("正在关闭多余连接"); try { Iterator iterator = idleClients.iterator(); while (iterator.hasNext() && idleClients.size() > minPoolSize) { SimpleMqttClient client = iterator.next(); if (isAvailableClient(client)) ; iterator.remove(); connectionMap.remove(client.getClientid()); busyClientMap.remove(client.getClientid()); client.disconnect(); logger.info("多余连接被清除,ID为 " + client.getClientid()); client = null; } } catch (Exception e) { logger.error("关闭多余连接时发生异常: " + e.toString()); e.printStackTrace(); } } } /** * 监测连接池中连接个数,如果连接小于最小连接个数,创建连接 */ private void createClients() { if (connectionMap.size() < minPoolSize) { logger.info("正在创建新的连接"); for (int i = connectionMap.size(); i < minPoolSize; i++) { try { SimpleMqttClient client = createNewClient(); client.setIdled(true); idleClients.add(client); } catch (Exception e) { logger.error("监视线程创建新的连接时发生异常:" + e.toString()); e.printStackTrace(); } } } } } } ```