From 0849c2e1c149b54f111e698c51df4513932a8b8f Mon Sep 17 00:00:00 2001 From: AprilWind <2100166581@qq.com> Date: Fri, 17 Oct 2025 13:20:29 +0800 Subject: [PATCH 1/3] =?UTF-8?q?update=20=E6=B7=BB=E5=8A=A0=20SseEmitterDel?= =?UTF-8?q?ayed=20=E7=B1=BB=E4=BB=A5=E6=94=AF=E6=8C=81=20SSE=20=E8=BF=9E?= =?UTF-8?q?=E6=8E=A5=E7=9A=84=E5=BB=B6=E8=BF=9F=E7=AE=A1=E7=90=86=E5=92=8C?= =?UTF-8?q?=E8=B6=85=E6=97=B6=E5=A4=84=E7=90=86?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../common/sse/core/SseEmitterDelayed.java | 94 +++++++++ .../common/sse/core/SseEmitterManager.java | 179 ++++++++++++------ 2 files changed, 219 insertions(+), 54 deletions(-) create mode 100644 ruoyi-common/ruoyi-common-sse/src/main/java/org/dromara/common/sse/core/SseEmitterDelayed.java diff --git a/ruoyi-common/ruoyi-common-sse/src/main/java/org/dromara/common/sse/core/SseEmitterDelayed.java b/ruoyi-common/ruoyi-common-sse/src/main/java/org/dromara/common/sse/core/SseEmitterDelayed.java new file mode 100644 index 000000000..76d277479 --- /dev/null +++ b/ruoyi-common/ruoyi-common-sse/src/main/java/org/dromara/common/sse/core/SseEmitterDelayed.java @@ -0,0 +1,94 @@ +package org.dromara.common.sse.core; + +import lombok.Getter; +import org.springframework.web.servlet.mvc.method.annotation.SseEmitter; + +import java.lang.ref.WeakReference; +import java.time.Instant; +import java.util.Objects; +import java.util.concurrent.Delayed; +import java.util.concurrent.TimeUnit; + +/** + * SseEmitterDelayed 是一个用于延迟队列 (DelayQueue) 的包装类, + * 用于管理 SSE (Server-Sent Events) 连接的超时处理 + *

+ * 核心思路: + * - 每个用户的 SSE 连接用一个 WeakReference 包装,避免内存泄漏 + * - 实现 Delayed 接口,允许放入 DelayQueue,根据 expireAt 控制超时 + * - 可在 DelayQueue 中统一处理过期的 SSE 连接,释放资源 + * + * @author AprilWind + */ +@Getter +public class SseEmitterDelayed implements Delayed { + + /** + * 用户ID,用于标识该 SSE 连接属于哪个用户 + */ + private final Long userId; + + /** + * 用于标识 SSE 连接的唯一令牌,通常用于前端校验 + */ + private final String token; + + /** + * SSE 连接的弱引用,避免延迟队列持有强引用导致内存泄漏 + */ + private final WeakReference emitterRef; + + /** + * 该连接的过期时间戳(毫秒),到达该时间后可视为超时 + */ + private final long expireAt; + + /** + * 构造函数 + * + * @param userId 用户ID + * @param token 连接的唯一标识 + * @param emitter 对应的 SseEmitter 对象 + * @param delay 延迟时长 + * @param unit 延迟单位 + */ + public SseEmitterDelayed(Long userId, String token, SseEmitter emitter, long delay, TimeUnit unit) { + this.userId = userId; + this.token = token; + this.emitterRef = new WeakReference<>(emitter); + // expireAt = 当前时间 + 延迟时间 + this.expireAt = Instant.now().toEpochMilli() + unit.toMillis(delay); + } + + /** + * 获取剩余延迟时间 + * + * @param unit 时间单位 + * @return 剩余时间 + */ + @Override + public long getDelay(TimeUnit unit) { + return unit.convert(expireAt - System.currentTimeMillis(), TimeUnit.MILLISECONDS); + } + + /** + * 按 expireAt 时间排序,DelayQueue 内部使用 + * + * @param o 另一个 Delayed 对象 + * @return 比较结果 + */ + @Override + public int compareTo(Delayed o) { + return Long.compare(this.expireAt, ((SseEmitterDelayed) o).expireAt); + } + + /** + * 重写 hashCode 方法,基于 userId 和 token + * 保证在集合中正确比较 + */ + @Override + public int hashCode() { + return Objects.hash(userId, token); + } + +} diff --git a/ruoyi-common/ruoyi-common-sse/src/main/java/org/dromara/common/sse/core/SseEmitterManager.java b/ruoyi-common/ruoyi-common-sse/src/main/java/org/dromara/common/sse/core/SseEmitterManager.java index bc19460f8..563a89dd3 100644 --- a/ruoyi-common/ruoyi-common-sse/src/main/java/org/dromara/common/sse/core/SseEmitterManager.java +++ b/ruoyi-common/ruoyi-common-sse/src/main/java/org/dromara/common/sse/core/SseEmitterManager.java @@ -2,13 +2,15 @@ package org.dromara.common.sse.core; import cn.hutool.core.map.MapUtil; import lombok.extern.slf4j.Slf4j; +import org.dromara.common.core.utils.SpringUtils; import org.dromara.common.redis.utils.RedisUtils; import org.dromara.common.sse.dto.SseMessageDto; import org.springframework.web.servlet.mvc.method.annotation.SseEmitter; import java.io.IOException; +import java.lang.ref.WeakReference; import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.*; import java.util.function.Consumer; /** @@ -24,7 +26,63 @@ public class SseEmitterManager { */ private final static String SSE_TOPIC = "global:sse"; - private final static Map> USER_TOKEN_EMITTERS = new ConcurrentHashMap<>(); + private final static Map>> USER_TOKEN_EMITTERS = new ConcurrentHashMap<>(); + + /** + * 延迟检测队列 + */ + private static final DelayQueue DELAY_QUEUE = new DelayQueue<>(); + + /** + * 清理线程池 + */ + private static final ScheduledExecutorService CLEANER = + Executors.newSingleThreadScheduledExecutor(r -> { + Thread t = new Thread(r, "SSE-Cleaner"); + t.setDaemon(true); + return t; + }); + + static { + // 每分钟处理到期任务 + CLEANER.scheduleWithFixedDelay(SseEmitterManager::processDelayQueue, 1, 1, TimeUnit.MINUTES); + } + + /** + * 处理延迟队列中的到期 SSE 任务 + * 作用: + * 1. 移除已经失效的 SSEEmitter + * 2. 对存活的 SSEEmitter 延迟续期,保证心跳检测 + */ + private static void processDelayQueue() { + try { + // 获取当前时间戳,用于判断任务是否到期 + long now = System.currentTimeMillis(); + SseEmitterDelayed task; + + // 循环处理队列中已经到期的任务 + // peek() 获取队首任务,但不移除 + while ((task = DELAY_QUEUE.peek()) != null && task.getExpireAt() <= now) { + // 队首任务已到期,真正取出 + DELAY_QUEUE.poll(); // 从队列中移除任务 + + WeakReference ref = task.getEmitterRef(); + SseEmitter emitter = ref.get(); + + if (emitter == null || isEmitterDead(emitter)) { + // Emitter 已被 GC 或已关闭,断开连接并从管理器移除 + SpringUtils.getBean(SseEmitterManager.class).disconnect(task.getUserId(), task.getToken()); + } else { + // Emitter 仍然存活,延迟续期 + // 5 分钟后再检查该连接,避免频繁触发 + DELAY_QUEUE.offer(new SseEmitterDelayed(task.getUserId(), task.getToken(), emitter, 5, TimeUnit.MINUTES)); + } + } + } catch (Exception e) { + log.error("SSE延迟队列处理异常", e); + } + } + /** * 建立与指定用户的 SSE 连接 @@ -34,41 +92,34 @@ public class SseEmitterManager { * @return 返回一个 SseEmitter 实例,客户端可以通过该实例接收 SSE 事件 */ public SseEmitter connect(Long userId, String token) { + if (userId == null || token == null) { + throw new IllegalArgumentException("userId and token cannot be null"); + } // 从 USER_TOKEN_EMITTERS 中获取或创建当前用户的 SseEmitter 映射表(ConcurrentHashMap) // 每个用户可以有多个 SSE 连接,通过 token 进行区分 - Map emitters = USER_TOKEN_EMITTERS.computeIfAbsent(userId, k -> new ConcurrentHashMap<>()); + Map> emitters = + USER_TOKEN_EMITTERS.computeIfAbsent(userId, k -> new ConcurrentHashMap<>()); // 创建一个新的 SseEmitter 实例,超时时间设置为一天 避免连接之后直接关闭浏览器导致连接停滞 SseEmitter emitter = new SseEmitter(86400000L); - emitters.put(token, emitter); + emitters.put(token, new WeakReference<>(emitter)); - // 当 emitter 完成、超时或发生错误时,从映射表中移除对应的 token - emitter.onCompletion(() -> { - SseEmitter remove = emitters.remove(token); - if (remove != null) { - remove.complete(); - } - }); - emitter.onTimeout(() -> { - SseEmitter remove = emitters.remove(token); - if (remove != null) { - remove.complete(); - } - }); - emitter.onError((e) -> { - SseEmitter remove = emitters.remove(token); - if (remove != null) { - remove.complete(); - } - }); + // 三种事件统一处理 + Runnable removeTask = () -> disconnect(userId, token); + emitter.onCompletion(removeTask); + emitter.onTimeout(removeTask); + emitter.onError(e -> removeTask.run()); + + // 延迟清理 + DELAY_QUEUE.offer(new SseEmitterDelayed(userId, token, emitter, 5, TimeUnit.MINUTES)); try { // 向客户端发送一条连接成功的事件 emitter.send(SseEmitter.event().comment("connected")); } catch (IOException e) { - // 如果发送消息失败,则从映射表中移除 emitter - emitters.remove(token); + log.warn("SSE连接发送初始事件失败 userId={}, token={}", userId, token, e); + disconnect(userId, token); } return emitter; } @@ -83,18 +134,41 @@ public class SseEmitterManager { if (userId == null || token == null) { return; } - Map emitters = USER_TOKEN_EMITTERS.get(userId); - if (MapUtil.isNotEmpty(emitters)) { - try { - SseEmitter sseEmitter = emitters.get(token); - sseEmitter.send(SseEmitter.event().comment("disconnected")); - sseEmitter.complete(); - } catch (Exception ignore) { + Map> emitters = USER_TOKEN_EMITTERS.get(userId); + if (MapUtil.isEmpty(emitters)) { + USER_TOKEN_EMITTERS.remove(userId); + return; + } + + WeakReference ref = emitters.remove(token); + if (ref != null) { + SseEmitter emitter = ref.get(); + if (emitter != null) { + try { + emitter.send(SseEmitter.event().comment("disconnected")); + emitter.complete(); + } catch (Exception ignore) { + } } - emitters.remove(token); - } else { + } + + if (emitters.isEmpty()) { USER_TOKEN_EMITTERS.remove(userId); } + + log.debug("SSE连接移除并断开 userId={}, token={}", userId, token); + } + + /** + * 心跳检测 Emitter 是否已关闭 + */ + private static boolean isEmitterDead(SseEmitter emitter) { + try { + emitter.send(SseEmitter.event().comment("ping")); + return false; + } catch (Exception e) { + return true; + } } /** @@ -113,23 +187,24 @@ public class SseEmitterManager { * @param message 要发送的消息内容 */ public void sendMessage(Long userId, String message) { - Map emitters = USER_TOKEN_EMITTERS.get(userId); - if (MapUtil.isNotEmpty(emitters)) { - for (Map.Entry entry : emitters.entrySet()) { + Map> emitters = USER_TOKEN_EMITTERS.get(userId); + if (MapUtil.isEmpty(emitters)) { + USER_TOKEN_EMITTERS.remove(userId); + return; + } + emitters.forEach((token, ref) -> { + SseEmitter emitter = ref.get(); + if (emitter != null) { try { - entry.getValue().send(SseEmitter.event() - .name("message") - .data(message)); + emitter.send(SseEmitter.event().name("message").data(message)); } catch (Exception e) { - SseEmitter remove = emitters.remove(entry.getKey()); - if (remove != null) { - remove.complete(); - } + log.warn("SSE消息发送失败 userId={}, token={}", userId, token, e); + disconnect(userId, token); } + } else { + disconnect(userId, token); } - } else { - USER_TOKEN_EMITTERS.remove(userId); - } + }); } /** @@ -138,9 +213,7 @@ public class SseEmitterManager { * @param message 要发送的消息内容 */ public void sendMessage(String message) { - for (Long userId : USER_TOKEN_EMITTERS.keySet()) { - sendMessage(userId, message); - } + USER_TOKEN_EMITTERS.keySet().forEach(userId -> sendMessage(userId, message)); } /** @@ -149,10 +222,7 @@ public class SseEmitterManager { * @param sseMessageDto 要发布的SSE消息对象 */ public void publishMessage(SseMessageDto sseMessageDto) { - SseMessageDto broadcastMessage = new SseMessageDto(); - broadcastMessage.setMessage(sseMessageDto.getMessage()); - broadcastMessage.setUserIds(sseMessageDto.getUserIds()); - RedisUtils.publish(SSE_TOPIC, broadcastMessage, consumer -> { + RedisUtils.publish(SSE_TOPIC, sseMessageDto, consumer -> { log.info("SSE发送主题订阅消息topic:{} session keys:{} message:{}", SSE_TOPIC, sseMessageDto.getUserIds(), sseMessageDto.getMessage()); }); @@ -170,4 +240,5 @@ public class SseEmitterManager { log.info("SSE发送主题订阅消息topic:{} message:{}", SSE_TOPIC, message); }); } + } -- Gitee From 57f4eaa952c54ff469def8b0c37fe836a25f47f5 Mon Sep 17 00:00:00 2001 From: AprilWind <2100166581@qq.com> Date: Fri, 17 Oct 2025 21:16:54 +0800 Subject: [PATCH 2/3] =?UTF-8?q?update=20=E5=A2=9E=E5=BC=BA=20SSE=20?= =?UTF-8?q?=E8=BF=9E=E6=8E=A5=E7=AE=A1=E7=90=86=EF=BC=8C=E6=94=AF=E6=8C=81?= =?UTF-8?q?=E5=BB=B6=E8=BF=9F=E7=BB=AD=E6=9C=9F=E5=92=8C=E5=BF=83=E8=B7=B3?= =?UTF-8?q?=E6=A3=80=E6=B5=8B=E9=85=8D=E7=BD=AE?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../common/sse/config/SseProperties.java | 21 +++++++ .../common/sse/core/SseEmitterDelayed.java | 6 +- .../common/sse/core/SseEmitterManager.java | 63 ++++++++++++++----- 3 files changed, 73 insertions(+), 17 deletions(-) diff --git a/ruoyi-common/ruoyi-common-sse/src/main/java/org/dromara/common/sse/config/SseProperties.java b/ruoyi-common/ruoyi-common-sse/src/main/java/org/dromara/common/sse/config/SseProperties.java index ce4e1732d..41810e8ce 100644 --- a/ruoyi-common/ruoyi-common-sse/src/main/java/org/dromara/common/sse/config/SseProperties.java +++ b/ruoyi-common/ruoyi-common-sse/src/main/java/org/dromara/common/sse/config/SseProperties.java @@ -3,6 +3,8 @@ package org.dromara.common.sse.config; import lombok.Data; import org.springframework.boot.context.properties.ConfigurationProperties; +import java.util.concurrent.TimeUnit; + /** * SSE 配置项 * @@ -12,10 +14,29 @@ import org.springframework.boot.context.properties.ConfigurationProperties; @ConfigurationProperties("sse") public class SseProperties { + /** + * 是否启用 SSE 功能 + */ private Boolean enabled; /** * 路径 */ private String path; + + /** + * 心跳检测间隔,默认秒 + */ + private long heartbeatSeconds = 30; + + /** + * 清理线程执行间隔,默认秒 + */ + private long checkIntervalSeconds = 10; + + /** + * 时间单位(秒、分钟等),默认秒 + */ + private TimeUnit unit = TimeUnit.SECONDS; + } diff --git a/ruoyi-common/ruoyi-common-sse/src/main/java/org/dromara/common/sse/core/SseEmitterDelayed.java b/ruoyi-common/ruoyi-common-sse/src/main/java/org/dromara/common/sse/core/SseEmitterDelayed.java index 76d277479..cb9ad4d62 100644 --- a/ruoyi-common/ruoyi-common-sse/src/main/java/org/dromara/common/sse/core/SseEmitterDelayed.java +++ b/ruoyi-common/ruoyi-common-sse/src/main/java/org/dromara/common/sse/core/SseEmitterDelayed.java @@ -41,7 +41,7 @@ public class SseEmitterDelayed implements Delayed { /** * 该连接的过期时间戳(毫秒),到达该时间后可视为超时 */ - private final long expireAt; + private volatile long expireAt; /** * 构造函数 @@ -60,6 +60,10 @@ public class SseEmitterDelayed implements Delayed { this.expireAt = Instant.now().toEpochMilli() + unit.toMillis(delay); } + public void renew(long delay, TimeUnit unit) { + this.expireAt = System.currentTimeMillis() + unit.toMillis(delay); + } + /** * 获取剩余延迟时间 * diff --git a/ruoyi-common/ruoyi-common-sse/src/main/java/org/dromara/common/sse/core/SseEmitterManager.java b/ruoyi-common/ruoyi-common-sse/src/main/java/org/dromara/common/sse/core/SseEmitterManager.java index 563a89dd3..72321f720 100644 --- a/ruoyi-common/ruoyi-common-sse/src/main/java/org/dromara/common/sse/core/SseEmitterManager.java +++ b/ruoyi-common/ruoyi-common-sse/src/main/java/org/dromara/common/sse/core/SseEmitterManager.java @@ -1,16 +1,22 @@ package org.dromara.common.sse.core; import cn.hutool.core.map.MapUtil; +import jakarta.annotation.PostConstruct; +import jakarta.annotation.PreDestroy; import lombok.extern.slf4j.Slf4j; import org.dromara.common.core.utils.SpringUtils; import org.dromara.common.redis.utils.RedisUtils; +import org.dromara.common.sse.config.SseProperties; import org.dromara.common.sse.dto.SseMessageDto; import org.springframework.web.servlet.mvc.method.annotation.SseEmitter; import java.io.IOException; import java.lang.ref.WeakReference; import java.util.Map; -import java.util.concurrent.*; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.DelayQueue; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; import java.util.function.Consumer; /** @@ -33,28 +39,51 @@ public class SseEmitterManager { */ private static final DelayQueue DELAY_QUEUE = new DelayQueue<>(); + /** + * 心跳事件 + */ + private static final SseEmitter.SseEventBuilder PING_EVENT = SseEmitter.event().comment("ping"); + + private final SseProperties PROPERTIES = SpringUtils.getBean(SseProperties.class); + /** * 清理线程池 */ - private static final ScheduledExecutorService CLEANER = + private final ScheduledExecutorService CLEANER = Executors.newSingleThreadScheduledExecutor(r -> { Thread t = new Thread(r, "SSE-Cleaner"); t.setDaemon(true); return t; }); - static { - // 每分钟处理到期任务 - CLEANER.scheduleWithFixedDelay(SseEmitterManager::processDelayQueue, 1, 1, TimeUnit.MINUTES); + @PostConstruct + public void init() { + log.info("SSE 管理器启动 -> 检查间隔: {} 秒, 心跳间隔: {} 秒", + PROPERTIES.getCheckIntervalSeconds(), PROPERTIES.getHeartbeatSeconds()); + + CLEANER.scheduleWithFixedDelay( + this::processDelayQueue, + PROPERTIES.getCheckIntervalSeconds(), + PROPERTIES.getCheckIntervalSeconds(), + PROPERTIES.getUnit() + ); + } + + @PreDestroy + public void destroy() { + CLEANER.shutdownNow(); + USER_TOKEN_EMITTERS.clear(); + DELAY_QUEUE.clear(); + log.info("SSE 管理器已关闭"); } /** * 处理延迟队列中的到期 SSE 任务 * 作用: - * 1. 移除已经失效的 SSEEmitter - * 2. 对存活的 SSEEmitter 延迟续期,保证心跳检测 + * 1. 移除已经失效的 SSEEmitter + * 2. 对存活的 SSEEmitter 延迟续期,保证心跳检测 */ - private static void processDelayQueue() { + private void processDelayQueue() { try { // 获取当前时间戳,用于判断任务是否到期 long now = System.currentTimeMillis(); @@ -71,11 +100,11 @@ public class SseEmitterManager { if (emitter == null || isEmitterDead(emitter)) { // Emitter 已被 GC 或已关闭,断开连接并从管理器移除 - SpringUtils.getBean(SseEmitterManager.class).disconnect(task.getUserId(), task.getToken()); + this.disconnect(task.getUserId(), task.getToken()); } else { - // Emitter 仍然存活,延迟续期 - // 5 分钟后再检查该连接,避免频繁触发 - DELAY_QUEUE.offer(new SseEmitterDelayed(task.getUserId(), task.getToken(), emitter, 5, TimeUnit.MINUTES)); + // 直接更新到期时间,放回队列 + task.renew(PROPERTIES.getHeartbeatSeconds(), PROPERTIES.getUnit()); + DELAY_QUEUE.offer(task); } } } catch (Exception e) { @@ -83,7 +112,6 @@ public class SseEmitterManager { } } - /** * 建立与指定用户的 SSE 连接 * @@ -100,6 +128,9 @@ public class SseEmitterManager { Map> emitters = USER_TOKEN_EMITTERS.computeIfAbsent(userId, k -> new ConcurrentHashMap<>()); + // 如果已有旧连接,则先断开 + emitters.remove(token); + // 创建一个新的 SseEmitter 实例,超时时间设置为一天 避免连接之后直接关闭浏览器导致连接停滞 SseEmitter emitter = new SseEmitter(86400000L); @@ -112,7 +143,8 @@ public class SseEmitterManager { emitter.onError(e -> removeTask.run()); // 延迟清理 - DELAY_QUEUE.offer(new SseEmitterDelayed(userId, token, emitter, 5, TimeUnit.MINUTES)); + DELAY_QUEUE.offer(new SseEmitterDelayed(userId, token, emitter, + PROPERTIES.getHeartbeatSeconds(), PROPERTIES.getUnit())); try { // 向客户端发送一条连接成功的事件 @@ -155,7 +187,6 @@ public class SseEmitterManager { if (emitters.isEmpty()) { USER_TOKEN_EMITTERS.remove(userId); } - log.debug("SSE连接移除并断开 userId={}, token={}", userId, token); } @@ -164,7 +195,7 @@ public class SseEmitterManager { */ private static boolean isEmitterDead(SseEmitter emitter) { try { - emitter.send(SseEmitter.event().comment("ping")); + emitter.send(PING_EVENT); return false; } catch (Exception e) { return true; -- Gitee From 7faa48f905989a26d40e4dd382613982b182a36b Mon Sep 17 00:00:00 2001 From: AprilWind <2100166581@qq.com> Date: Fri, 17 Oct 2025 21:42:52 +0800 Subject: [PATCH 3/3] =?UTF-8?q?update=20=E6=B7=BB=E5=8A=A0=20renew=20?= =?UTF-8?q?=E6=96=B9=E6=B3=95=E4=BB=A5=E6=94=AF=E6=8C=81=E7=BB=AD=E6=9C=9F?= =?UTF-8?q?=E5=BD=93=E5=89=8D=E5=BB=B6=E8=BF=9F=E4=BB=BB=E5=8A=A1=E7=9A=84?= =?UTF-8?q?=E5=88=B0=E6=9C=9F=E6=97=B6=E9=97=B4?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../java/org/dromara/common/sse/core/SseEmitterDelayed.java | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/ruoyi-common/ruoyi-common-sse/src/main/java/org/dromara/common/sse/core/SseEmitterDelayed.java b/ruoyi-common/ruoyi-common-sse/src/main/java/org/dromara/common/sse/core/SseEmitterDelayed.java index cb9ad4d62..fa8f3368b 100644 --- a/ruoyi-common/ruoyi-common-sse/src/main/java/org/dromara/common/sse/core/SseEmitterDelayed.java +++ b/ruoyi-common/ruoyi-common-sse/src/main/java/org/dromara/common/sse/core/SseEmitterDelayed.java @@ -60,6 +60,12 @@ public class SseEmitterDelayed implements Delayed { this.expireAt = Instant.now().toEpochMilli() + unit.toMillis(delay); } + /** + * 续期当前延迟任务的到期时间 + * + * @param delay 延迟时长(例如 30 表示 30 秒) + * @param unit 时间单位(如秒、分钟等) + */ public void renew(long delay, TimeUnit unit) { this.expireAt = System.currentTimeMillis() + unit.toMillis(delay); } -- Gitee