diff --git a/ruoyi-common/ruoyi-common-sse/src/main/java/org/dromara/common/sse/config/SseProperties.java b/ruoyi-common/ruoyi-common-sse/src/main/java/org/dromara/common/sse/config/SseProperties.java index ce4e1732dd45e156481ec8f5c18873179355da20..41810e8ce0600d844ade1c699b9182730db3cef8 100644 --- a/ruoyi-common/ruoyi-common-sse/src/main/java/org/dromara/common/sse/config/SseProperties.java +++ b/ruoyi-common/ruoyi-common-sse/src/main/java/org/dromara/common/sse/config/SseProperties.java @@ -3,6 +3,8 @@ package org.dromara.common.sse.config; import lombok.Data; import org.springframework.boot.context.properties.ConfigurationProperties; +import java.util.concurrent.TimeUnit; + /** * SSE 配置项 * @@ -12,10 +14,29 @@ import org.springframework.boot.context.properties.ConfigurationProperties; @ConfigurationProperties("sse") public class SseProperties { + /** + * 是否启用 SSE 功能 + */ private Boolean enabled; /** * 路径 */ private String path; + + /** + * 心跳检测间隔,默认秒 + */ + private long heartbeatSeconds = 30; + + /** + * 清理线程执行间隔,默认秒 + */ + private long checkIntervalSeconds = 10; + + /** + * 时间单位(秒、分钟等),默认秒 + */ + private TimeUnit unit = TimeUnit.SECONDS; + } diff --git a/ruoyi-common/ruoyi-common-sse/src/main/java/org/dromara/common/sse/core/SseEmitterDelayed.java b/ruoyi-common/ruoyi-common-sse/src/main/java/org/dromara/common/sse/core/SseEmitterDelayed.java new file mode 100644 index 0000000000000000000000000000000000000000..fa8f3368b4e45abdaa0de41812f68a4a4369b9b0 --- /dev/null +++ b/ruoyi-common/ruoyi-common-sse/src/main/java/org/dromara/common/sse/core/SseEmitterDelayed.java @@ -0,0 +1,104 @@ +package org.dromara.common.sse.core; + +import lombok.Getter; +import org.springframework.web.servlet.mvc.method.annotation.SseEmitter; + +import java.lang.ref.WeakReference; +import java.time.Instant; +import java.util.Objects; +import java.util.concurrent.Delayed; +import java.util.concurrent.TimeUnit; + +/** + * SseEmitterDelayed 是一个用于延迟队列 (DelayQueue) 的包装类, + * 用于管理 SSE (Server-Sent Events) 连接的超时处理 + *

+ * 核心思路: + * - 每个用户的 SSE 连接用一个 WeakReference 包装,避免内存泄漏 + * - 实现 Delayed 接口,允许放入 DelayQueue,根据 expireAt 控制超时 + * - 可在 DelayQueue 中统一处理过期的 SSE 连接,释放资源 + * + * @author AprilWind + */ +@Getter +public class SseEmitterDelayed implements Delayed { + + /** + * 用户ID,用于标识该 SSE 连接属于哪个用户 + */ + private final Long userId; + + /** + * 用于标识 SSE 连接的唯一令牌,通常用于前端校验 + */ + private final String token; + + /** + * SSE 连接的弱引用,避免延迟队列持有强引用导致内存泄漏 + */ + private final WeakReference emitterRef; + + /** + * 该连接的过期时间戳(毫秒),到达该时间后可视为超时 + */ + private volatile long expireAt; + + /** + * 构造函数 + * + * @param userId 用户ID + * @param token 连接的唯一标识 + * @param emitter 对应的 SseEmitter 对象 + * @param delay 延迟时长 + * @param unit 延迟单位 + */ + public SseEmitterDelayed(Long userId, String token, SseEmitter emitter, long delay, TimeUnit unit) { + this.userId = userId; + this.token = token; + this.emitterRef = new WeakReference<>(emitter); + // expireAt = 当前时间 + 延迟时间 + this.expireAt = Instant.now().toEpochMilli() + unit.toMillis(delay); + } + + /** + * 续期当前延迟任务的到期时间 + * + * @param delay 延迟时长(例如 30 表示 30 秒) + * @param unit 时间单位(如秒、分钟等) + */ + public void renew(long delay, TimeUnit unit) { + this.expireAt = System.currentTimeMillis() + unit.toMillis(delay); + } + + /** + * 获取剩余延迟时间 + * + * @param unit 时间单位 + * @return 剩余时间 + */ + @Override + public long getDelay(TimeUnit unit) { + return unit.convert(expireAt - System.currentTimeMillis(), TimeUnit.MILLISECONDS); + } + + /** + * 按 expireAt 时间排序,DelayQueue 内部使用 + * + * @param o 另一个 Delayed 对象 + * @return 比较结果 + */ + @Override + public int compareTo(Delayed o) { + return Long.compare(this.expireAt, ((SseEmitterDelayed) o).expireAt); + } + + /** + * 重写 hashCode 方法,基于 userId 和 token + * 保证在集合中正确比较 + */ + @Override + public int hashCode() { + return Objects.hash(userId, token); + } + +} diff --git a/ruoyi-common/ruoyi-common-sse/src/main/java/org/dromara/common/sse/core/SseEmitterManager.java b/ruoyi-common/ruoyi-common-sse/src/main/java/org/dromara/common/sse/core/SseEmitterManager.java index bc19460f8e419e30e5eea96ebf882d2126516fd5..72321f7200db38ee9e65ff64065ed492c2b7286e 100644 --- a/ruoyi-common/ruoyi-common-sse/src/main/java/org/dromara/common/sse/core/SseEmitterManager.java +++ b/ruoyi-common/ruoyi-common-sse/src/main/java/org/dromara/common/sse/core/SseEmitterManager.java @@ -1,14 +1,22 @@ package org.dromara.common.sse.core; import cn.hutool.core.map.MapUtil; +import jakarta.annotation.PostConstruct; +import jakarta.annotation.PreDestroy; import lombok.extern.slf4j.Slf4j; +import org.dromara.common.core.utils.SpringUtils; import org.dromara.common.redis.utils.RedisUtils; +import org.dromara.common.sse.config.SseProperties; import org.dromara.common.sse.dto.SseMessageDto; import org.springframework.web.servlet.mvc.method.annotation.SseEmitter; import java.io.IOException; +import java.lang.ref.WeakReference; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.DelayQueue; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; import java.util.function.Consumer; /** @@ -24,7 +32,85 @@ public class SseEmitterManager { */ private final static String SSE_TOPIC = "global:sse"; - private final static Map> USER_TOKEN_EMITTERS = new ConcurrentHashMap<>(); + private final static Map>> USER_TOKEN_EMITTERS = new ConcurrentHashMap<>(); + + /** + * 延迟检测队列 + */ + private static final DelayQueue DELAY_QUEUE = new DelayQueue<>(); + + /** + * 心跳事件 + */ + private static final SseEmitter.SseEventBuilder PING_EVENT = SseEmitter.event().comment("ping"); + + private final SseProperties PROPERTIES = SpringUtils.getBean(SseProperties.class); + + /** + * 清理线程池 + */ + private final ScheduledExecutorService CLEANER = + Executors.newSingleThreadScheduledExecutor(r -> { + Thread t = new Thread(r, "SSE-Cleaner"); + t.setDaemon(true); + return t; + }); + + @PostConstruct + public void init() { + log.info("SSE 管理器启动 -> 检查间隔: {} 秒, 心跳间隔: {} 秒", + PROPERTIES.getCheckIntervalSeconds(), PROPERTIES.getHeartbeatSeconds()); + + CLEANER.scheduleWithFixedDelay( + this::processDelayQueue, + PROPERTIES.getCheckIntervalSeconds(), + PROPERTIES.getCheckIntervalSeconds(), + PROPERTIES.getUnit() + ); + } + + @PreDestroy + public void destroy() { + CLEANER.shutdownNow(); + USER_TOKEN_EMITTERS.clear(); + DELAY_QUEUE.clear(); + log.info("SSE 管理器已关闭"); + } + + /** + * 处理延迟队列中的到期 SSE 任务 + * 作用: + * 1. 移除已经失效的 SSEEmitter + * 2. 对存活的 SSEEmitter 延迟续期,保证心跳检测 + */ + private void processDelayQueue() { + try { + // 获取当前时间戳,用于判断任务是否到期 + long now = System.currentTimeMillis(); + SseEmitterDelayed task; + + // 循环处理队列中已经到期的任务 + // peek() 获取队首任务,但不移除 + while ((task = DELAY_QUEUE.peek()) != null && task.getExpireAt() <= now) { + // 队首任务已到期,真正取出 + DELAY_QUEUE.poll(); // 从队列中移除任务 + + WeakReference ref = task.getEmitterRef(); + SseEmitter emitter = ref.get(); + + if (emitter == null || isEmitterDead(emitter)) { + // Emitter 已被 GC 或已关闭,断开连接并从管理器移除 + this.disconnect(task.getUserId(), task.getToken()); + } else { + // 直接更新到期时间,放回队列 + task.renew(PROPERTIES.getHeartbeatSeconds(), PROPERTIES.getUnit()); + DELAY_QUEUE.offer(task); + } + } + } catch (Exception e) { + log.error("SSE延迟队列处理异常", e); + } + } /** * 建立与指定用户的 SSE 连接 @@ -34,41 +120,38 @@ public class SseEmitterManager { * @return 返回一个 SseEmitter 实例,客户端可以通过该实例接收 SSE 事件 */ public SseEmitter connect(Long userId, String token) { + if (userId == null || token == null) { + throw new IllegalArgumentException("userId and token cannot be null"); + } // 从 USER_TOKEN_EMITTERS 中获取或创建当前用户的 SseEmitter 映射表(ConcurrentHashMap) // 每个用户可以有多个 SSE 连接,通过 token 进行区分 - Map emitters = USER_TOKEN_EMITTERS.computeIfAbsent(userId, k -> new ConcurrentHashMap<>()); + Map> emitters = + USER_TOKEN_EMITTERS.computeIfAbsent(userId, k -> new ConcurrentHashMap<>()); + + // 如果已有旧连接,则先断开 + emitters.remove(token); // 创建一个新的 SseEmitter 实例,超时时间设置为一天 避免连接之后直接关闭浏览器导致连接停滞 SseEmitter emitter = new SseEmitter(86400000L); - emitters.put(token, emitter); + emitters.put(token, new WeakReference<>(emitter)); - // 当 emitter 完成、超时或发生错误时,从映射表中移除对应的 token - emitter.onCompletion(() -> { - SseEmitter remove = emitters.remove(token); - if (remove != null) { - remove.complete(); - } - }); - emitter.onTimeout(() -> { - SseEmitter remove = emitters.remove(token); - if (remove != null) { - remove.complete(); - } - }); - emitter.onError((e) -> { - SseEmitter remove = emitters.remove(token); - if (remove != null) { - remove.complete(); - } - }); + // 三种事件统一处理 + Runnable removeTask = () -> disconnect(userId, token); + emitter.onCompletion(removeTask); + emitter.onTimeout(removeTask); + emitter.onError(e -> removeTask.run()); + + // 延迟清理 + DELAY_QUEUE.offer(new SseEmitterDelayed(userId, token, emitter, + PROPERTIES.getHeartbeatSeconds(), PROPERTIES.getUnit())); try { // 向客户端发送一条连接成功的事件 emitter.send(SseEmitter.event().comment("connected")); } catch (IOException e) { - // 如果发送消息失败,则从映射表中移除 emitter - emitters.remove(token); + log.warn("SSE连接发送初始事件失败 userId={}, token={}", userId, token, e); + disconnect(userId, token); } return emitter; } @@ -83,18 +166,40 @@ public class SseEmitterManager { if (userId == null || token == null) { return; } - Map emitters = USER_TOKEN_EMITTERS.get(userId); - if (MapUtil.isNotEmpty(emitters)) { - try { - SseEmitter sseEmitter = emitters.get(token); - sseEmitter.send(SseEmitter.event().comment("disconnected")); - sseEmitter.complete(); - } catch (Exception ignore) { + Map> emitters = USER_TOKEN_EMITTERS.get(userId); + if (MapUtil.isEmpty(emitters)) { + USER_TOKEN_EMITTERS.remove(userId); + return; + } + + WeakReference ref = emitters.remove(token); + if (ref != null) { + SseEmitter emitter = ref.get(); + if (emitter != null) { + try { + emitter.send(SseEmitter.event().comment("disconnected")); + emitter.complete(); + } catch (Exception ignore) { + } } - emitters.remove(token); - } else { + } + + if (emitters.isEmpty()) { USER_TOKEN_EMITTERS.remove(userId); } + log.debug("SSE连接移除并断开 userId={}, token={}", userId, token); + } + + /** + * 心跳检测 Emitter 是否已关闭 + */ + private static boolean isEmitterDead(SseEmitter emitter) { + try { + emitter.send(PING_EVENT); + return false; + } catch (Exception e) { + return true; + } } /** @@ -113,23 +218,24 @@ public class SseEmitterManager { * @param message 要发送的消息内容 */ public void sendMessage(Long userId, String message) { - Map emitters = USER_TOKEN_EMITTERS.get(userId); - if (MapUtil.isNotEmpty(emitters)) { - for (Map.Entry entry : emitters.entrySet()) { + Map> emitters = USER_TOKEN_EMITTERS.get(userId); + if (MapUtil.isEmpty(emitters)) { + USER_TOKEN_EMITTERS.remove(userId); + return; + } + emitters.forEach((token, ref) -> { + SseEmitter emitter = ref.get(); + if (emitter != null) { try { - entry.getValue().send(SseEmitter.event() - .name("message") - .data(message)); + emitter.send(SseEmitter.event().name("message").data(message)); } catch (Exception e) { - SseEmitter remove = emitters.remove(entry.getKey()); - if (remove != null) { - remove.complete(); - } + log.warn("SSE消息发送失败 userId={}, token={}", userId, token, e); + disconnect(userId, token); } + } else { + disconnect(userId, token); } - } else { - USER_TOKEN_EMITTERS.remove(userId); - } + }); } /** @@ -138,9 +244,7 @@ public class SseEmitterManager { * @param message 要发送的消息内容 */ public void sendMessage(String message) { - for (Long userId : USER_TOKEN_EMITTERS.keySet()) { - sendMessage(userId, message); - } + USER_TOKEN_EMITTERS.keySet().forEach(userId -> sendMessage(userId, message)); } /** @@ -149,10 +253,7 @@ public class SseEmitterManager { * @param sseMessageDto 要发布的SSE消息对象 */ public void publishMessage(SseMessageDto sseMessageDto) { - SseMessageDto broadcastMessage = new SseMessageDto(); - broadcastMessage.setMessage(sseMessageDto.getMessage()); - broadcastMessage.setUserIds(sseMessageDto.getUserIds()); - RedisUtils.publish(SSE_TOPIC, broadcastMessage, consumer -> { + RedisUtils.publish(SSE_TOPIC, sseMessageDto, consumer -> { log.info("SSE发送主题订阅消息topic:{} session keys:{} message:{}", SSE_TOPIC, sseMessageDto.getUserIds(), sseMessageDto.getMessage()); }); @@ -170,4 +271,5 @@ public class SseEmitterManager { log.info("SSE发送主题订阅消息topic:{} message:{}", SSE_TOPIC, message); }); } + }