diff --git a/ruoyi-common/ruoyi-common-sse/src/main/java/org/dromara/common/sse/config/SseProperties.java b/ruoyi-common/ruoyi-common-sse/src/main/java/org/dromara/common/sse/config/SseProperties.java
index ce4e1732dd45e156481ec8f5c18873179355da20..41810e8ce0600d844ade1c699b9182730db3cef8 100644
--- a/ruoyi-common/ruoyi-common-sse/src/main/java/org/dromara/common/sse/config/SseProperties.java
+++ b/ruoyi-common/ruoyi-common-sse/src/main/java/org/dromara/common/sse/config/SseProperties.java
@@ -3,6 +3,8 @@ package org.dromara.common.sse.config;
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
+import java.util.concurrent.TimeUnit;
+
/**
* SSE 配置项
*
@@ -12,10 +14,29 @@ import org.springframework.boot.context.properties.ConfigurationProperties;
@ConfigurationProperties("sse")
public class SseProperties {
+ /**
+ * 是否启用 SSE 功能
+ */
private Boolean enabled;
/**
* 路径
*/
private String path;
+
+ /**
+ * 心跳检测间隔,默认秒
+ */
+ private long heartbeatSeconds = 30;
+
+ /**
+ * 清理线程执行间隔,默认秒
+ */
+ private long checkIntervalSeconds = 10;
+
+ /**
+ * 时间单位(秒、分钟等),默认秒
+ */
+ private TimeUnit unit = TimeUnit.SECONDS;
+
}
diff --git a/ruoyi-common/ruoyi-common-sse/src/main/java/org/dromara/common/sse/core/SseEmitterDelayed.java b/ruoyi-common/ruoyi-common-sse/src/main/java/org/dromara/common/sse/core/SseEmitterDelayed.java
new file mode 100644
index 0000000000000000000000000000000000000000..fa8f3368b4e45abdaa0de41812f68a4a4369b9b0
--- /dev/null
+++ b/ruoyi-common/ruoyi-common-sse/src/main/java/org/dromara/common/sse/core/SseEmitterDelayed.java
@@ -0,0 +1,104 @@
+package org.dromara.common.sse.core;
+
+import lombok.Getter;
+import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
+
+import java.lang.ref.WeakReference;
+import java.time.Instant;
+import java.util.Objects;
+import java.util.concurrent.Delayed;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * SseEmitterDelayed 是一个用于延迟队列 (DelayQueue) 的包装类,
+ * 用于管理 SSE (Server-Sent Events) 连接的超时处理
+ *
+ * 核心思路:
+ * - 每个用户的 SSE 连接用一个 WeakReference 包装,避免内存泄漏
+ * - 实现 Delayed 接口,允许放入 DelayQueue,根据 expireAt 控制超时
+ * - 可在 DelayQueue 中统一处理过期的 SSE 连接,释放资源
+ *
+ * @author AprilWind
+ */
+@Getter
+public class SseEmitterDelayed implements Delayed {
+
+ /**
+ * 用户ID,用于标识该 SSE 连接属于哪个用户
+ */
+ private final Long userId;
+
+ /**
+ * 用于标识 SSE 连接的唯一令牌,通常用于前端校验
+ */
+ private final String token;
+
+ /**
+ * SSE 连接的弱引用,避免延迟队列持有强引用导致内存泄漏
+ */
+ private final WeakReference emitterRef;
+
+ /**
+ * 该连接的过期时间戳(毫秒),到达该时间后可视为超时
+ */
+ private volatile long expireAt;
+
+ /**
+ * 构造函数
+ *
+ * @param userId 用户ID
+ * @param token 连接的唯一标识
+ * @param emitter 对应的 SseEmitter 对象
+ * @param delay 延迟时长
+ * @param unit 延迟单位
+ */
+ public SseEmitterDelayed(Long userId, String token, SseEmitter emitter, long delay, TimeUnit unit) {
+ this.userId = userId;
+ this.token = token;
+ this.emitterRef = new WeakReference<>(emitter);
+ // expireAt = 当前时间 + 延迟时间
+ this.expireAt = Instant.now().toEpochMilli() + unit.toMillis(delay);
+ }
+
+ /**
+ * 续期当前延迟任务的到期时间
+ *
+ * @param delay 延迟时长(例如 30 表示 30 秒)
+ * @param unit 时间单位(如秒、分钟等)
+ */
+ public void renew(long delay, TimeUnit unit) {
+ this.expireAt = System.currentTimeMillis() + unit.toMillis(delay);
+ }
+
+ /**
+ * 获取剩余延迟时间
+ *
+ * @param unit 时间单位
+ * @return 剩余时间
+ */
+ @Override
+ public long getDelay(TimeUnit unit) {
+ return unit.convert(expireAt - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
+ }
+
+ /**
+ * 按 expireAt 时间排序,DelayQueue 内部使用
+ *
+ * @param o 另一个 Delayed 对象
+ * @return 比较结果
+ */
+ @Override
+ public int compareTo(Delayed o) {
+ return Long.compare(this.expireAt, ((SseEmitterDelayed) o).expireAt);
+ }
+
+ /**
+ * 重写 hashCode 方法,基于 userId 和 token
+ * 保证在集合中正确比较
+ */
+ @Override
+ public int hashCode() {
+ return Objects.hash(userId, token);
+ }
+
+}
diff --git a/ruoyi-common/ruoyi-common-sse/src/main/java/org/dromara/common/sse/core/SseEmitterManager.java b/ruoyi-common/ruoyi-common-sse/src/main/java/org/dromara/common/sse/core/SseEmitterManager.java
index bc19460f8e419e30e5eea96ebf882d2126516fd5..72321f7200db38ee9e65ff64065ed492c2b7286e 100644
--- a/ruoyi-common/ruoyi-common-sse/src/main/java/org/dromara/common/sse/core/SseEmitterManager.java
+++ b/ruoyi-common/ruoyi-common-sse/src/main/java/org/dromara/common/sse/core/SseEmitterManager.java
@@ -1,14 +1,22 @@
package org.dromara.common.sse.core;
import cn.hutool.core.map.MapUtil;
+import jakarta.annotation.PostConstruct;
+import jakarta.annotation.PreDestroy;
import lombok.extern.slf4j.Slf4j;
+import org.dromara.common.core.utils.SpringUtils;
import org.dromara.common.redis.utils.RedisUtils;
+import org.dromara.common.sse.config.SseProperties;
import org.dromara.common.sse.dto.SseMessageDto;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import java.io.IOException;
+import java.lang.ref.WeakReference;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.DelayQueue;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
import java.util.function.Consumer;
/**
@@ -24,7 +32,85 @@ public class SseEmitterManager {
*/
private final static String SSE_TOPIC = "global:sse";
- private final static Map> USER_TOKEN_EMITTERS = new ConcurrentHashMap<>();
+ private final static Map>> USER_TOKEN_EMITTERS = new ConcurrentHashMap<>();
+
+ /**
+ * 延迟检测队列
+ */
+ private static final DelayQueue DELAY_QUEUE = new DelayQueue<>();
+
+ /**
+ * 心跳事件
+ */
+ private static final SseEmitter.SseEventBuilder PING_EVENT = SseEmitter.event().comment("ping");
+
+ private final SseProperties PROPERTIES = SpringUtils.getBean(SseProperties.class);
+
+ /**
+ * 清理线程池
+ */
+ private final ScheduledExecutorService CLEANER =
+ Executors.newSingleThreadScheduledExecutor(r -> {
+ Thread t = new Thread(r, "SSE-Cleaner");
+ t.setDaemon(true);
+ return t;
+ });
+
+ @PostConstruct
+ public void init() {
+ log.info("SSE 管理器启动 -> 检查间隔: {} 秒, 心跳间隔: {} 秒",
+ PROPERTIES.getCheckIntervalSeconds(), PROPERTIES.getHeartbeatSeconds());
+
+ CLEANER.scheduleWithFixedDelay(
+ this::processDelayQueue,
+ PROPERTIES.getCheckIntervalSeconds(),
+ PROPERTIES.getCheckIntervalSeconds(),
+ PROPERTIES.getUnit()
+ );
+ }
+
+ @PreDestroy
+ public void destroy() {
+ CLEANER.shutdownNow();
+ USER_TOKEN_EMITTERS.clear();
+ DELAY_QUEUE.clear();
+ log.info("SSE 管理器已关闭");
+ }
+
+ /**
+ * 处理延迟队列中的到期 SSE 任务
+ * 作用:
+ * 1. 移除已经失效的 SSEEmitter
+ * 2. 对存活的 SSEEmitter 延迟续期,保证心跳检测
+ */
+ private void processDelayQueue() {
+ try {
+ // 获取当前时间戳,用于判断任务是否到期
+ long now = System.currentTimeMillis();
+ SseEmitterDelayed task;
+
+ // 循环处理队列中已经到期的任务
+ // peek() 获取队首任务,但不移除
+ while ((task = DELAY_QUEUE.peek()) != null && task.getExpireAt() <= now) {
+ // 队首任务已到期,真正取出
+ DELAY_QUEUE.poll(); // 从队列中移除任务
+
+ WeakReference ref = task.getEmitterRef();
+ SseEmitter emitter = ref.get();
+
+ if (emitter == null || isEmitterDead(emitter)) {
+ // Emitter 已被 GC 或已关闭,断开连接并从管理器移除
+ this.disconnect(task.getUserId(), task.getToken());
+ } else {
+ // 直接更新到期时间,放回队列
+ task.renew(PROPERTIES.getHeartbeatSeconds(), PROPERTIES.getUnit());
+ DELAY_QUEUE.offer(task);
+ }
+ }
+ } catch (Exception e) {
+ log.error("SSE延迟队列处理异常", e);
+ }
+ }
/**
* 建立与指定用户的 SSE 连接
@@ -34,41 +120,38 @@ public class SseEmitterManager {
* @return 返回一个 SseEmitter 实例,客户端可以通过该实例接收 SSE 事件
*/
public SseEmitter connect(Long userId, String token) {
+ if (userId == null || token == null) {
+ throw new IllegalArgumentException("userId and token cannot be null");
+ }
// 从 USER_TOKEN_EMITTERS 中获取或创建当前用户的 SseEmitter 映射表(ConcurrentHashMap)
// 每个用户可以有多个 SSE 连接,通过 token 进行区分
- Map emitters = USER_TOKEN_EMITTERS.computeIfAbsent(userId, k -> new ConcurrentHashMap<>());
+ Map> emitters =
+ USER_TOKEN_EMITTERS.computeIfAbsent(userId, k -> new ConcurrentHashMap<>());
+
+ // 如果已有旧连接,则先断开
+ emitters.remove(token);
// 创建一个新的 SseEmitter 实例,超时时间设置为一天 避免连接之后直接关闭浏览器导致连接停滞
SseEmitter emitter = new SseEmitter(86400000L);
- emitters.put(token, emitter);
+ emitters.put(token, new WeakReference<>(emitter));
- // 当 emitter 完成、超时或发生错误时,从映射表中移除对应的 token
- emitter.onCompletion(() -> {
- SseEmitter remove = emitters.remove(token);
- if (remove != null) {
- remove.complete();
- }
- });
- emitter.onTimeout(() -> {
- SseEmitter remove = emitters.remove(token);
- if (remove != null) {
- remove.complete();
- }
- });
- emitter.onError((e) -> {
- SseEmitter remove = emitters.remove(token);
- if (remove != null) {
- remove.complete();
- }
- });
+ // 三种事件统一处理
+ Runnable removeTask = () -> disconnect(userId, token);
+ emitter.onCompletion(removeTask);
+ emitter.onTimeout(removeTask);
+ emitter.onError(e -> removeTask.run());
+
+ // 延迟清理
+ DELAY_QUEUE.offer(new SseEmitterDelayed(userId, token, emitter,
+ PROPERTIES.getHeartbeatSeconds(), PROPERTIES.getUnit()));
try {
// 向客户端发送一条连接成功的事件
emitter.send(SseEmitter.event().comment("connected"));
} catch (IOException e) {
- // 如果发送消息失败,则从映射表中移除 emitter
- emitters.remove(token);
+ log.warn("SSE连接发送初始事件失败 userId={}, token={}", userId, token, e);
+ disconnect(userId, token);
}
return emitter;
}
@@ -83,18 +166,40 @@ public class SseEmitterManager {
if (userId == null || token == null) {
return;
}
- Map emitters = USER_TOKEN_EMITTERS.get(userId);
- if (MapUtil.isNotEmpty(emitters)) {
- try {
- SseEmitter sseEmitter = emitters.get(token);
- sseEmitter.send(SseEmitter.event().comment("disconnected"));
- sseEmitter.complete();
- } catch (Exception ignore) {
+ Map> emitters = USER_TOKEN_EMITTERS.get(userId);
+ if (MapUtil.isEmpty(emitters)) {
+ USER_TOKEN_EMITTERS.remove(userId);
+ return;
+ }
+
+ WeakReference ref = emitters.remove(token);
+ if (ref != null) {
+ SseEmitter emitter = ref.get();
+ if (emitter != null) {
+ try {
+ emitter.send(SseEmitter.event().comment("disconnected"));
+ emitter.complete();
+ } catch (Exception ignore) {
+ }
}
- emitters.remove(token);
- } else {
+ }
+
+ if (emitters.isEmpty()) {
USER_TOKEN_EMITTERS.remove(userId);
}
+ log.debug("SSE连接移除并断开 userId={}, token={}", userId, token);
+ }
+
+ /**
+ * 心跳检测 Emitter 是否已关闭
+ */
+ private static boolean isEmitterDead(SseEmitter emitter) {
+ try {
+ emitter.send(PING_EVENT);
+ return false;
+ } catch (Exception e) {
+ return true;
+ }
}
/**
@@ -113,23 +218,24 @@ public class SseEmitterManager {
* @param message 要发送的消息内容
*/
public void sendMessage(Long userId, String message) {
- Map emitters = USER_TOKEN_EMITTERS.get(userId);
- if (MapUtil.isNotEmpty(emitters)) {
- for (Map.Entry entry : emitters.entrySet()) {
+ Map> emitters = USER_TOKEN_EMITTERS.get(userId);
+ if (MapUtil.isEmpty(emitters)) {
+ USER_TOKEN_EMITTERS.remove(userId);
+ return;
+ }
+ emitters.forEach((token, ref) -> {
+ SseEmitter emitter = ref.get();
+ if (emitter != null) {
try {
- entry.getValue().send(SseEmitter.event()
- .name("message")
- .data(message));
+ emitter.send(SseEmitter.event().name("message").data(message));
} catch (Exception e) {
- SseEmitter remove = emitters.remove(entry.getKey());
- if (remove != null) {
- remove.complete();
- }
+ log.warn("SSE消息发送失败 userId={}, token={}", userId, token, e);
+ disconnect(userId, token);
}
+ } else {
+ disconnect(userId, token);
}
- } else {
- USER_TOKEN_EMITTERS.remove(userId);
- }
+ });
}
/**
@@ -138,9 +244,7 @@ public class SseEmitterManager {
* @param message 要发送的消息内容
*/
public void sendMessage(String message) {
- for (Long userId : USER_TOKEN_EMITTERS.keySet()) {
- sendMessage(userId, message);
- }
+ USER_TOKEN_EMITTERS.keySet().forEach(userId -> sendMessage(userId, message));
}
/**
@@ -149,10 +253,7 @@ public class SseEmitterManager {
* @param sseMessageDto 要发布的SSE消息对象
*/
public void publishMessage(SseMessageDto sseMessageDto) {
- SseMessageDto broadcastMessage = new SseMessageDto();
- broadcastMessage.setMessage(sseMessageDto.getMessage());
- broadcastMessage.setUserIds(sseMessageDto.getUserIds());
- RedisUtils.publish(SSE_TOPIC, broadcastMessage, consumer -> {
+ RedisUtils.publish(SSE_TOPIC, sseMessageDto, consumer -> {
log.info("SSE发送主题订阅消息topic:{} session keys:{} message:{}",
SSE_TOPIC, sseMessageDto.getUserIds(), sseMessageDto.getMessage());
});
@@ -170,4 +271,5 @@ public class SseEmitterManager {
log.info("SSE发送主题订阅消息topic:{} message:{}", SSE_TOPIC, message);
});
}
+
}