From 735f4b170dd813e02f8da5501f9f7b20be9bae91 Mon Sep 17 00:00:00 2001 From: jay li Date: Thu, 19 Jun 2025 20:18:38 +0800 Subject: [PATCH 1/3] =?UTF-8?q?=20#ICGGAW=20redis=E6=95=B0=E6=8D=AE?= =?UTF-8?q?=E6=BA=90=E6=94=AF=E6=8C=81=E9=9B=86=E7=BE=A4=E6=A8=A1=E5=BC=8F?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../parser/redis/RedisXmlELParser.java | 9 +- .../liteflow/parser/redis/mode/RedisMode.java | 7 +- .../parser/redis/mode/RedisParserHelper.java | 40 ++++ .../mode/polling/RedisParserPollingMode.java | 43 ++-- .../subscribe/RedisParserSubscribeMode.java | 43 ++-- .../parser/redis/vo/RedisParserVO.java | 23 ++ .../redis/RedisClusterPollSpringBootTest.java | 199 +++++++++++++++++ .../RedisClusterSubscribeSpringBootTest.java | 202 ++++++++++++++++++ .../application-poll-cluster-xml.properties | 9 + .../application-sub-cluster-xml.properties | 10 + 10 files changed, 542 insertions(+), 43 deletions(-) create mode 100644 liteflow-testcase-el/liteflow-testcase-el-redis-springboot/src/test/java/com/yomahub/liteflow/test/redis/RedisClusterPollSpringBootTest.java create mode 100644 liteflow-testcase-el/liteflow-testcase-el-redis-springboot/src/test/java/com/yomahub/liteflow/test/redis/RedisClusterSubscribeSpringBootTest.java create mode 100644 liteflow-testcase-el/liteflow-testcase-el-redis-springboot/src/test/resources/redis/application-poll-cluster-xml.properties create mode 100644 liteflow-testcase-el/liteflow-testcase-el-redis-springboot/src/test/resources/redis/application-sub-cluster-xml.properties diff --git a/liteflow-rule-plugin/liteflow-rule-redis/src/main/java/com/yomahub/liteflow/parser/redis/RedisXmlELParser.java b/liteflow-rule-plugin/liteflow-rule-redis/src/main/java/com/yomahub/liteflow/parser/redis/RedisXmlELParser.java index 48a3a9c88..ce652aa03 100644 --- a/liteflow-rule-plugin/liteflow-rule-redis/src/main/java/com/yomahub/liteflow/parser/redis/RedisXmlELParser.java +++ b/liteflow-rule-plugin/liteflow-rule-redis/src/main/java/com/yomahub/liteflow/parser/redis/RedisXmlELParser.java @@ -24,6 +24,7 @@ import java.util.Objects; * Redis解析器实现,只支持EL形式的XML,不支持其他的形式 * * @author hxinyu + * @author jay li * @since 2.11.0 */ @@ -109,11 +110,17 @@ public class RedisXmlELParser extends ClassXmlFlowELParser { if (redisParserVO.getRedisMode().equals(RedisMode.SENTINEL) && CollectionUtil.isEmpty(redisParserVO.getSentinelAddress())) { throw new RedisException(StrFormatter.format(ERROR_MSG_PATTERN, "sentinel address list")); } - if (ObjectUtil.isNull(redisParserVO.getChainDataBase())) { + if (ObjectUtil.isNull(redisParserVO.getChainDataBase()) && !redisParserVO.getRedisMode().equals(RedisMode.CLUSTER)) { throw new RedisException(StrFormatter.format(ERROR_MSG_PATTERN, "chainDataBase")); } if (StrUtil.isBlank(redisParserVO.getChainKey())) { throw new RedisException(StrFormatter.format(ERROR_MSG_PATTERN, "chainKey")); } + if (redisParserVO.getRedisMode().equals(RedisMode.CLUSTER) && CollectionUtil.isEmpty(redisParserVO.getClusterNodeAddress())) { + throw new RedisException(StrFormatter.format(ERROR_MSG_PATTERN, "cluster address list")); + } + if (ObjectUtil.isNull(redisParserVO.getScriptKey()) && ObjectUtil.isNull(redisParserVO.getScriptDataBase()) && !redisParserVO.getRedisMode().equals(RedisMode.CLUSTER)) { + throw new RedisException(StrFormatter.format(ERROR_MSG_PATTERN, "scriptDataBase")); + } } } diff --git a/liteflow-rule-plugin/liteflow-rule-redis/src/main/java/com/yomahub/liteflow/parser/redis/mode/RedisMode.java b/liteflow-rule-plugin/liteflow-rule-redis/src/main/java/com/yomahub/liteflow/parser/redis/mode/RedisMode.java index ce5610428..d333da87d 100644 --- a/liteflow-rule-plugin/liteflow-rule-redis/src/main/java/com/yomahub/liteflow/parser/redis/mode/RedisMode.java +++ b/liteflow-rule-plugin/liteflow-rule-redis/src/main/java/com/yomahub/liteflow/parser/redis/mode/RedisMode.java @@ -4,16 +4,19 @@ package com.yomahub.liteflow.parser.redis.mode; * 用于定义Redis模式的枚举类 * * single单点模式, sentinel哨兵模式 - * 不支持集群模式配置 + * cluster 集群模式配置 * * @author hxinyu + * @author jay li * @since 2.11.0 */ public enum RedisMode { SINGLE("single"), - SENTINEL("sentinel"); + SENTINEL("sentinel"), + + CLUSTER("cluster"); private String mode; diff --git a/liteflow-rule-plugin/liteflow-rule-redis/src/main/java/com/yomahub/liteflow/parser/redis/mode/RedisParserHelper.java b/liteflow-rule-plugin/liteflow-rule-redis/src/main/java/com/yomahub/liteflow/parser/redis/mode/RedisParserHelper.java index e6d80d51f..e588ea0d9 100644 --- a/liteflow-rule-plugin/liteflow-rule-redis/src/main/java/com/yomahub/liteflow/parser/redis/mode/RedisParserHelper.java +++ b/liteflow-rule-plugin/liteflow-rule-redis/src/main/java/com/yomahub/liteflow/parser/redis/mode/RedisParserHelper.java @@ -3,6 +3,7 @@ package com.yomahub.liteflow.parser.redis.mode; import cn.hutool.core.lang.Pair; import cn.hutool.core.text.StrFormatter; import cn.hutool.core.util.BooleanUtil; +import cn.hutool.core.util.ObjectUtil; import cn.hutool.core.util.StrUtil; import com.yomahub.liteflow.builder.LiteFlowNodeBuilder; import com.yomahub.liteflow.builder.el.LiteFlowChainELBuilder; @@ -13,6 +14,7 @@ import com.yomahub.liteflow.log.LFLoggerManager; import com.yomahub.liteflow.parser.helper.NodeConvertHelper; import com.yomahub.liteflow.parser.redis.vo.RedisParserVO; import com.yomahub.liteflow.util.RuleParsePluginUtil; +import org.redisson.config.ClusterServersConfig; import org.redisson.config.Config; import org.redisson.config.SentinelServersConfig; import org.redisson.config.SingleServerConfig; @@ -22,6 +24,7 @@ import org.redisson.config.SingleServerConfig; * * @author hxinyu * @author Bryan.Zhang + * @author jay li * @since 2.11.0 */ @@ -33,6 +36,8 @@ public interface RedisParserHelper { String SENTINEL_REDIS_URL_PATTERN = "redis://{}"; + String CLUSTER_REDIS_URL_PATTERN = "redis://{}"; + String CHAIN_XML_PATTERN = "{}"; String NODE_XML_PATTERN = "{}"; @@ -55,6 +60,9 @@ public interface RedisParserHelper { * @return redisson config */ default Config getSingleRedissonConfig(RedisParserVO redisParserVO, Integer dataBase) { + if (ObjectUtil.isNull(dataBase)) { + return null; + } Config config = new Config(); String redisAddress = StrFormatter.format(SINGLE_REDIS_URL_PATTERN, redisParserVO.getHost(), redisParserVO.getPort()); @@ -81,6 +89,9 @@ public interface RedisParserHelper { * @return redisson Config */ default Config getSentinelRedissonConfig(RedisParserVO redisParserVO, Integer dataBase) { + if (ObjectUtil.isNull(dataBase)) { + return null; + } Config config = new Config(); SentinelServersConfig sentinelConfig = config.useSentinelServers() .setMasterName(redisParserVO.getMasterName()) @@ -109,6 +120,35 @@ public interface RedisParserHelper { return config; } + /** + * 获取Redisson客户端的Config配置通用方法(集群模式) + * @param redisParserVO redisParserVO + * @return redisson Config + */ + default Config getCluserRedissonConfig(RedisParserVO redisParserVO) { + Config config = new Config(); + ClusterServersConfig clusterConfig = config.useClusterServers() + .setMasterConnectionPoolSize(redisParserVO.getConnectionPoolSize()) + .setSlaveConnectionPoolSize(redisParserVO.getConnectionPoolSize()) + .setMasterConnectionMinimumIdleSize(redisParserVO.getConnectionMinimumIdleSize()) + .setSlaveConnectionMinimumIdleSize(redisParserVO.getConnectionMinimumIdleSize()); + + redisParserVO.getClusterNodeAddress().forEach(address -> { + clusterConfig.addNodeAddress(StrFormatter.format(CLUSTER_REDIS_URL_PATTERN, address)); + }); + //如果配置了用户名和密码 + if(StrUtil.isNotBlank(redisParserVO.getUsername()) && StrUtil.isNotBlank(redisParserVO.getPassword())) { + clusterConfig.setUsername(redisParserVO.getUsername()) + .setPassword(redisParserVO.getPassword()); + } + //如果配置了密码 + else if(StrUtil.isNotBlank(redisParserVO.getPassword())) { + clusterConfig.setPassword(redisParserVO.getPassword()); + } + + return config; + } + /** * script节点的修改/添加 * diff --git a/liteflow-rule-plugin/liteflow-rule-redis/src/main/java/com/yomahub/liteflow/parser/redis/mode/polling/RedisParserPollingMode.java b/liteflow-rule-plugin/liteflow-rule-redis/src/main/java/com/yomahub/liteflow/parser/redis/mode/polling/RedisParserPollingMode.java index cafaa67fb..3338f772f 100644 --- a/liteflow-rule-plugin/liteflow-rule-redis/src/main/java/com/yomahub/liteflow/parser/redis/mode/polling/RedisParserPollingMode.java +++ b/liteflow-rule-plugin/liteflow-rule-redis/src/main/java/com/yomahub/liteflow/parser/redis/mode/polling/RedisParserPollingMode.java @@ -26,6 +26,7 @@ import java.util.concurrent.TimeUnit; * Redis 轮询机制实现类 * * @author hxinyu + * @author jay li * @since 2.11.0 */ @@ -81,27 +82,29 @@ public class RedisParserPollingMode implements RedisParserHelper { } if (ObjectUtil.isNull(chainClient)) { RedisMode redisMode = redisParserVO.getRedisMode(); - Config config; - //Redis单点模式 - if (redisMode.equals(RedisMode.SINGLE)){ - config = getSingleRedissonConfig(redisParserVO, redisParserVO.getChainDataBase()); - this.chainClient = new RClient(Redisson.create(config)); - //如果有脚本数据 - if (ObjectUtil.isNotNull(redisParserVO.getScriptDataBase())) { - config = getSingleRedissonConfig(redisParserVO, redisParserVO.getScriptDataBase()); - this.scriptClient = new RClient(Redisson.create(config)); - } + Config chinaConfig, scriptConfig; + + switch (redisMode) { + case SINGLE: + chinaConfig = getSingleRedissonConfig(redisParserVO, redisParserVO.getChainDataBase()); + scriptConfig = getSingleRedissonConfig(redisParserVO, redisParserVO.getScriptDataBase()); + break; + case SENTINEL: + chinaConfig = getSentinelRedissonConfig(redisParserVO, redisParserVO.getChainDataBase()); + scriptConfig = getSentinelRedissonConfig(redisParserVO, redisParserVO.getScriptDataBase()); + break; + case CLUSTER: + chinaConfig = getCluserRedissonConfig(redisParserVO); + scriptConfig = chinaConfig; + break; + default: + throw new RedisException("RedisMode is not supported"); } - //Redis哨兵模式 - else if (redisMode.equals(RedisMode.SENTINEL)) { - config = getSentinelRedissonConfig(redisParserVO, redisParserVO.getChainDataBase()); - this.chainClient = new RClient(Redisson.create(config)); - //如果有脚本数据 - if (ObjectUtil.isNotNull(redisParserVO.getScriptDataBase())) { - config = getSentinelRedissonConfig(redisParserVO, redisParserVO.getScriptDataBase()); - this.scriptClient = new RClient(Redisson.create(config)); - } + this.chainClient = new RClient(Redisson.create(chinaConfig)); + //如果有脚本数据 + if (ObjectUtil.isNotNull(redisParserVO.getScriptKey())) { + this.scriptClient = new RClient(Redisson.create(scriptConfig)); } } //创建定时任务线程池 @@ -211,7 +214,7 @@ public class RedisParserPollingMode implements RedisParserHelper { redisParserVO.getPollingInterval().longValue(), TimeUnit.SECONDS); //如果有脚本 - if (ObjectUtil.isNotNull(scriptClient) && ObjectUtil.isNotNull(redisParserVO.getScriptDataBase()) + if (ObjectUtil.isNotNull(scriptClient) && (ObjectUtil.isNotNull(redisParserVO.getScriptDataBase()) || RedisMode.CLUSTER.getMode().equals(redisParserVO.getMode().getMode())) && StrUtil.isNotBlank(redisParserVO.getScriptKey())) { //将lua脚本添加到scriptJedis脚本缓存 String keyLuaOfScript = scriptClient.scriptLoad(luaOfKey); diff --git a/liteflow-rule-plugin/liteflow-rule-redis/src/main/java/com/yomahub/liteflow/parser/redis/mode/subscribe/RedisParserSubscribeMode.java b/liteflow-rule-plugin/liteflow-rule-redis/src/main/java/com/yomahub/liteflow/parser/redis/mode/subscribe/RedisParserSubscribeMode.java index cb4e50c5d..4be097219 100644 --- a/liteflow-rule-plugin/liteflow-rule-redis/src/main/java/com/yomahub/liteflow/parser/redis/mode/subscribe/RedisParserSubscribeMode.java +++ b/liteflow-rule-plugin/liteflow-rule-redis/src/main/java/com/yomahub/liteflow/parser/redis/mode/subscribe/RedisParserSubscribeMode.java @@ -28,6 +28,7 @@ import java.util.Map; * 使用 Redisson客户端 RMapCache存储结构 * * @author hxinyu + * @author jay li * @since 2.11.0 */ @@ -50,27 +51,29 @@ public class RedisParserSubscribeMode implements RedisParserHelper { } if (ObjectUtil.isNull(chainClient)) { RedisMode redisMode = redisParserVO.getRedisMode(); - Config config; - //Redis单点模式 - if (redisMode.equals(RedisMode.SINGLE)) { - config = getSingleRedissonConfig(redisParserVO, redisParserVO.getChainDataBase()); - this.chainClient = new RClient(Redisson.create(config)); - //如果有脚本数据 - if (ObjectUtil.isNotNull(redisParserVO.getScriptDataBase())) { - config = getSingleRedissonConfig(redisParserVO, redisParserVO.getScriptDataBase()); - this.scriptClient = new RClient(Redisson.create(config)); - } + Config chinaConfig, scriptConfig; + + switch (redisMode) { + case SINGLE: + chinaConfig = getSingleRedissonConfig(redisParserVO, redisParserVO.getChainDataBase()); + scriptConfig = getSingleRedissonConfig(redisParserVO, redisParserVO.getScriptDataBase()); + break; + case SENTINEL: + chinaConfig = getSentinelRedissonConfig(redisParserVO, redisParserVO.getChainDataBase()); + scriptConfig = getSentinelRedissonConfig(redisParserVO, redisParserVO.getScriptDataBase()); + break; + case CLUSTER: + chinaConfig = getCluserRedissonConfig(redisParserVO); + scriptConfig = chinaConfig; + break; + default: + throw new RedisException("RedisMode is not supported"); } - //Redis哨兵模式 - else if (redisMode.equals(RedisMode.SENTINEL)) { - config = getSentinelRedissonConfig(redisParserVO, redisParserVO.getChainDataBase()); - this.chainClient = new RClient(Redisson.create(config)); - //如果有脚本数据 - if (ObjectUtil.isNotNull(redisParserVO.getScriptDataBase())) { - config = getSentinelRedissonConfig(redisParserVO, redisParserVO.getScriptDataBase()); - this.scriptClient = new RClient(Redisson.create(config)); - } + this.chainClient = new RClient(Redisson.create(chinaConfig)); + //如果有脚本数据 + if (ObjectUtil.isNotNull(redisParserVO.getScriptKey())) { + this.scriptClient = new RClient(Redisson.create(scriptConfig)); } } } catch (Exception e) { @@ -172,7 +175,7 @@ public class RedisParserSubscribeMode implements RedisParserHelper { }); //监听 script - if (ObjectUtil.isNotNull(scriptClient) && ObjectUtil.isNotNull(redisParserVO.getScriptDataBase())) { + if (ObjectUtil.isNotNull(scriptClient) && (ObjectUtil.isNotNull(redisParserVO.getScriptDataBase()) || RedisMode.CLUSTER.equals(redisParserVO.getRedisMode()))) { String scriptKey = redisParserVO.getScriptKey(); //添加 script diff --git a/liteflow-rule-plugin/liteflow-rule-redis/src/main/java/com/yomahub/liteflow/parser/redis/vo/RedisParserVO.java b/liteflow-rule-plugin/liteflow-rule-redis/src/main/java/com/yomahub/liteflow/parser/redis/vo/RedisParserVO.java index a3cc9293f..c4c9f3862 100644 --- a/liteflow-rule-plugin/liteflow-rule-redis/src/main/java/com/yomahub/liteflow/parser/redis/vo/RedisParserVO.java +++ b/liteflow-rule-plugin/liteflow-rule-redis/src/main/java/com/yomahub/liteflow/parser/redis/vo/RedisParserVO.java @@ -12,6 +12,7 @@ import java.util.List; * 用于解析RuleSourceExtData的vo类, 用于Redis模式中 * * @author hxinyu + * @author jay li * @since 2.11.0 */ @@ -63,6 +64,17 @@ public class RedisParserVO { /*脚本配置的键名 若没有脚本数据可不配置*/ private String scriptKey; + /*集群模式需配置 逗号分隔 集群地址 */ + private List clusterNodeAddress; + + public List getClusterNodeAddress() { + return clusterNodeAddress; + } + + public void setClusterNodeAddress(List clusterNodeAddress) { + this.clusterNodeAddress = clusterNodeAddress; + } + public void setRedisMode(String redisMode) { redisMode = redisMode.toUpperCase(); try{ @@ -120,6 +132,16 @@ public class RedisParserVO { } } + @JsonSetter("clusterAddress") + public void setClusterAddressFromString(String addresses) { + if (addresses != null && !addresses.trim().isEmpty()) { + // 按逗号分割,并去除每个地址前后的空格 + this.clusterNodeAddress = Arrays.asList(addresses.split("\\s*,\\s*")); + } else { + this.clusterNodeAddress = Collections.emptyList(); + } + } + public String getUsername() { return username; } @@ -232,6 +254,7 @@ public class RedisParserVO { ", chainKey='" + chainKey + '\'' + ", scriptDataBase=" + scriptDataBase + ", scriptKey='" + scriptKey + '\'' + + ", clusterAddress='" + clusterNodeAddress + '\'' + '}'; } } diff --git a/liteflow-testcase-el/liteflow-testcase-el-redis-springboot/src/test/java/com/yomahub/liteflow/test/redis/RedisClusterPollSpringBootTest.java b/liteflow-testcase-el/liteflow-testcase-el-redis-springboot/src/test/java/com/yomahub/liteflow/test/redis/RedisClusterPollSpringBootTest.java new file mode 100644 index 000000000..b6149f86f --- /dev/null +++ b/liteflow-testcase-el/liteflow-testcase-el-redis-springboot/src/test/java/com/yomahub/liteflow/test/redis/RedisClusterPollSpringBootTest.java @@ -0,0 +1,199 @@ +package com.yomahub.liteflow.test.redis; + +import cn.hutool.crypto.digest.DigestUtil; +import com.yomahub.liteflow.core.FlowExecutor; +import com.yomahub.liteflow.exception.ChainNotFoundException; +import com.yomahub.liteflow.flow.FlowBus; +import com.yomahub.liteflow.flow.LiteflowResponse; +import com.yomahub.liteflow.log.LFLog; +import com.yomahub.liteflow.log.LFLoggerManager; +import com.yomahub.liteflow.parser.redis.mode.RClient; +import com.yomahub.liteflow.parser.redis.mode.polling.RedisParserPollingMode; + +import com.yomahub.liteflow.slot.DefaultContext; +import com.yomahub.liteflow.test.BaseTest; +import org.junit.jupiter.api.*; +import org.junit.jupiter.api.extension.ExtendWith; + +import org.springframework.boot.autoconfigure.EnableAutoConfiguration; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.boot.test.mock.mockito.MockBean; +import org.springframework.context.annotation.ComponentScan; +import org.springframework.test.context.TestPropertySource; +import org.springframework.test.context.junit.jupiter.SpringExtension; + +import javax.annotation.Resource; + +import java.lang.reflect.Field; +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.ScheduledThreadPoolExecutor; + +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.when; + +/** + * springboot环境下的redis 集群配置源poll模式功能测试 + *

+ * 测试用例会在1号database中添加测试数据 chainKey:testChainKey; scriptKey:testScriptKey + * 测试完成后清除测试数据 + * + * @author jay li + * @since 2.13.3 + */ +@ExtendWith(SpringExtension.class) +@TestPropertySource(value = "classpath:/redis/application-poll-cluster-xml.properties") +@SpringBootTest(classes = RedisClusterPollSpringBootTest.class) +@EnableAutoConfiguration +@ComponentScan({"com.yomahub.liteflow.test.redis.cmp"}) +public class RedisClusterPollSpringBootTest extends BaseTest { + + @MockBean(name = "chainClient") + private static RClient chainClient; + + @MockBean(name = "scriptClient") + private static RClient scriptClient; + + @Resource + private FlowExecutor flowExecutor; + + //计算hash中field数量的lua脚本 + private final String luaOfKey = "local keys = redis.call(\"hkeys\", KEYS[1]);\n" + + "return #keys;\n"; + + //计算hash中value的SHA值的lua脚本 + private final String luaOfValue = "local key = KEYS[1];\n" + + "local field = KEYS[2];\n" + + "local value, err = redis.call(\"hget\", key, field);\n" + + "if value == false or value == nil then\n" + + " return \"nil\";\n" + + "end\n" + + "local sha1 = redis.sha1hex(value);\n" + + "return sha1;"; + + static LFLog LOG = LFLoggerManager.getLogger(RedisClusterPollSpringBootTest.class); + + + @AfterAll + public static void after() { + //关闭poll模式的轮询线程池 + try{ + Field pollExecutor = RedisParserPollingMode.class.getDeclaredField("pollExecutor"); + pollExecutor.setAccessible(true); + ScheduledThreadPoolExecutor threadPoolExecutor = (ScheduledThreadPoolExecutor) pollExecutor.get(null); + threadPoolExecutor.shutdownNow(); + } catch (Exception ignored) { + LOG.error("[Polling thread pool not closed]", ignored); + } + } + + /** + * 统一测试chain和script + * + * 测试数据流程: + * 1、执行chain1值:"THEN(a, b, c);" + * 2、修改chain1值为:"THEN(s11, s22, s33, a, b);", 执行新chain 验证chain的轮询拉取功能 + * 3、修改chain1其中的script11值 执行chain 验证script的轮询拉取功能 + */ + @Test + public void testPollWithXml() throws InterruptedException { + Set chainNameSet = new HashSet<>(); + chainNameSet.add("chain11"); + String chainValue = "THEN(a, b, c);"; + String chainSHA = DigestUtil.sha1Hex(chainValue); + + //修改chain并更新SHA值 + String changeChainValue = "THEN(s11, s22, s33, a, b);"; + String changeChainSHA = DigestUtil.sha1Hex(changeChainValue); + + when(chainClient.hkeys("pollChainKey")).thenReturn(chainNameSet); + when(chainClient.hget("pollChainKey", "chain11")).thenReturn(chainValue).thenReturn(changeChainValue); + when(chainClient.scriptLoad(luaOfKey)).thenReturn("keysha"); + when(chainClient.scriptLoad(luaOfValue)).thenReturn("valuesha"); + when(chainClient.evalSha(eq("keysha"), anyString())).thenReturn("1"); + when(chainClient.evalSha(eq("valuesha"), anyString(), anyString())).thenReturn(chainSHA).thenReturn(changeChainSHA); + + //添加script + Set scriptFieldSet = new HashSet<>(); + scriptFieldSet.add("s11:script:脚本s11:groovy"); + scriptFieldSet.add("s22:script:脚本s22:js"); + scriptFieldSet.add("s33:script:脚本s33"); + String s11 = "defaultContext.setData(\"test11\",\"hello s11\");"; + String s22 = "defaultContext.setData(\"test22\",\"hello s22\");"; + String s33 = "defaultContext.setData(\"test33\",\"hello s33\");"; + //SHA值用于测试修改script的轮询刷新功能 + String s11SHA = DigestUtil.sha1Hex(s11); + String s22SHA = DigestUtil.sha1Hex(s22); + String s33SHA = DigestUtil.sha1Hex(s33); + //修改script值并更新SHA值 + String changeS11 = "defaultContext.setData(\"test11\",\"hello world\");"; + String changeS11SHA = DigestUtil.sha1Hex(changeS11); + + when(scriptClient.hkeys("pollScriptKey")).thenReturn(scriptFieldSet); + //这里休眠一段时间是为了防止在未修改脚本的chain还没有执行前 轮询线程就拉取了新script值 + when(scriptClient.hget("pollScriptKey", "s11:script:脚本s11:groovy")).thenReturn(s11).thenAnswer(invocation -> { + Thread.sleep(2000); + return changeS11; + }).thenReturn(changeS11); + when(scriptClient.hget("pollScriptKey", "s22:script:脚本s22:js")).thenReturn(s22); + when(scriptClient.hget("pollScriptKey", "s33:script:脚本s33")).thenReturn(s33); + + //分别模拟三个script的evalsha指纹值计算的返回值, 其中s11脚本修改 指纹值变化 + when(scriptClient.scriptLoad(luaOfKey)).thenReturn("keysha"); + when(scriptClient.scriptLoad(luaOfValue)).thenReturn("valuesha"); + when(scriptClient.evalSha(eq("keysha"), anyString())).thenReturn("3"); + when(scriptClient.evalSha("valuesha", "pollScriptKey", "s11:script:脚本s11:groovy")).thenReturn(s11SHA).thenAnswer(invocation -> { + Thread.sleep(2000); + return changeS11SHA; + }).thenReturn(changeS11SHA); + when(scriptClient.evalSha("valuesha", "pollScriptKey", "s22:script:脚本s22:js")).thenReturn(s22SHA); + when(scriptClient.evalSha("valuesha", "pollScriptKey", "s33:script:脚本s33")).thenReturn(s33SHA); + + //测试修改前的chain + LiteflowResponse response = flowExecutor.execute2Resp("chain11", "arg"); + Assertions.assertTrue(response.isSuccess()); + Assertions.assertEquals("a==>b==>c", response.getExecuteStepStr()); + + Thread.sleep(4000); + + //测试加了script的chain + response = flowExecutor.execute2Resp("chain11", "arg"); + DefaultContext context = response.getFirstContextBean(); + Assertions.assertTrue(response.isSuccess()); + Assertions.assertEquals("hello s11", context.getData("test11")); + Assertions.assertEquals("hello s22", context.getData("test22")); + Assertions.assertEquals("s11[脚本s11]==>s22[脚本s22]==>s33[脚本s33]==>a==>b", response.getExecuteStepStrWithoutTime()); + + Thread.sleep(4000); + + //测试修改script后的chain + response = flowExecutor.execute2Resp("chain11", "arg"); + context = response.getFirstContextBean(); + Assertions.assertTrue(response.isSuccess()); + Assertions.assertEquals("hello world", context.getData("test11")); + } + + @Test + public void testDisablePollWithXml() throws InterruptedException { + Set chainNameSet = new HashSet<>(); + chainNameSet.add("chain1122:false"); + String chainValue = "THEN(a, b, c);"; + + when(chainClient.hkeys("pollChainKey")).thenReturn(chainNameSet); + when(chainClient.hget("pollChainKey", "chain1122:true")).thenReturn(chainValue); + + Set scriptFieldSet = new HashSet<>(); + scriptFieldSet.add("s4:script:脚本s3:groovy:false"); + when(scriptClient.hkeys("pollScriptKey")).thenReturn(scriptFieldSet); + when(scriptClient.hget("pollScriptKey", "s4:script:脚本s3:groovy:true")).thenReturn("defaultContext.setData(\"test\",\"hello\");"); + + // 测试 chain 停用 + Assertions.assertThrows(ChainNotFoundException.class, () -> { + throw flowExecutor.execute2Resp("chain1122", "arg").getCause(); + }); + + // 测试 script 停用 + Assertions.assertTrue(!FlowBus.getNodeMap().containsKey("s4")); + } +} diff --git a/liteflow-testcase-el/liteflow-testcase-el-redis-springboot/src/test/java/com/yomahub/liteflow/test/redis/RedisClusterSubscribeSpringBootTest.java b/liteflow-testcase-el/liteflow-testcase-el-redis-springboot/src/test/java/com/yomahub/liteflow/test/redis/RedisClusterSubscribeSpringBootTest.java new file mode 100644 index 000000000..f90f70985 --- /dev/null +++ b/liteflow-testcase-el/liteflow-testcase-el-redis-springboot/src/test/java/com/yomahub/liteflow/test/redis/RedisClusterSubscribeSpringBootTest.java @@ -0,0 +1,202 @@ +package com.yomahub.liteflow.test.redis; + +import com.yomahub.liteflow.core.FlowExecutor; +import com.yomahub.liteflow.flow.FlowBus; +import com.yomahub.liteflow.flow.LiteflowResponse; +import com.yomahub.liteflow.parser.helper.NodeConvertHelper; +import com.yomahub.liteflow.parser.redis.mode.RClient; +import com.yomahub.liteflow.parser.redis.mode.RedisParserHelper; + +import com.yomahub.liteflow.slot.DefaultContext; +import com.yomahub.liteflow.test.BaseTest; +import org.junit.jupiter.api.*; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.redisson.api.RMapCache; +import org.redisson.api.RedissonClient; +import org.springframework.boot.autoconfigure.EnableAutoConfiguration; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.boot.test.mock.mockito.MockBean; +import org.springframework.context.annotation.ComponentScan; +import org.springframework.test.context.TestPropertySource; +import org.springframework.test.context.junit.jupiter.SpringExtension; + +import javax.annotation.Resource; + +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +/** + * springboot环境下的redis 集群配置源订阅模式功能测试 + *

+ * 测试用例会在1号database中添加测试数据 chainKey:testChainKey; scriptKey:testScriptKey + * 测试完成后清除测试数据 + * + * @author jay li + * @since 2.13.3 + */ +@ExtendWith(SpringExtension.class) +@TestPropertySource(value = "classpath:/redis/application-sub-cluster-xml.properties") +@SpringBootTest(classes = RedisClusterSubscribeSpringBootTest.class) +@EnableAutoConfiguration +@ComponentScan({"com.yomahub.liteflow.test.redis.cmp"}) +public class RedisClusterSubscribeSpringBootTest extends BaseTest { + @Mock + private RedissonClient redissonClient; + + @Resource + private FlowExecutor flowExecutor; + + @MockBean(name = "chainClient") + private RClient chainClient; + + @MockBean(name = "scriptClient") + private RClient scriptClient; + + @Mock + private RMapCache chainKey; + + @Mock + private RMapCache scriptKey; + + @BeforeEach + public void setUpBeforeClass() { + + when(redissonClient.getMapCache("testChainKey")).thenReturn(chainKey); + when(redissonClient.getMapCache("testScriptKey")).thenReturn(scriptKey); + + when(scriptKey.get("s1:script:脚本s1:groovy")).thenReturn("defaultContext.setData(\"test1\",\"hello s1\");"); + when(scriptKey.get("s2:script:脚本s2:js")).thenReturn("defaultContext.setData(\"test2\",\"hello s2\");"); + when(scriptKey.get("s3:script:脚本s3")).thenReturn("defaultContext.setData(\"test3\",\"hello s3\");"); + + Set> mockEntrySet = new HashSet<>(); + mockEntrySet.add(createMockEntry("s1:script:脚本s1:groovy", "defaultContext.setData(\"test1\",\"hello s1\");")); + mockEntrySet.add(createMockEntry("s2:script:脚本s2:js", "defaultContext.setData(\"test2\",\"hello s2\");")); + mockEntrySet.add(createMockEntry("s3:script:脚本s3", "defaultContext.setData(\"test3\",\"hello s3\");")); + when(scriptKey.entrySet()).thenReturn(mockEntrySet); + + when(chainKey.get("chain1")).thenReturn("THEN(a, b, c);"); + when(chainKey.get("chain2")).thenReturn("THEN(a, b, c, s3);"); + when(chainKey.get("chain3")).thenReturn("THEN(a, b, c, s1, s2);"); + + mockEntrySet = new HashSet<>(); + mockEntrySet.add(createMockEntry("chain1", "THEN(a, b, c);")); + mockEntrySet.add(createMockEntry("chain2", "THEN(a, b, c, s3);")); + mockEntrySet.add(createMockEntry("chain3", "THEN(a, b, c, s1, s2);")); + + Set> mockEntrySet1 = new HashSet<>(mockEntrySet); + + mockEntrySet1.add(createMockEntry("chain1", "THEN(a, c, b);")); + when(chainKey.entrySet()).thenReturn(mockEntrySet).thenReturn(mockEntrySet1); + + when(chainClient.getMap(anyString())).thenReturn(chainKey); + when(scriptClient.getMap(anyString())).thenReturn(scriptKey); + } + + private Map.Entry createMockEntry(Object key, Object value) { + Map.Entry entry = mock(Map.Entry.class); + when(entry.getKey()).thenReturn(key); + when(entry.getValue()).thenReturn(value); + return entry; + } + + /** + * 测试chain + */ + @Test + public void testSubWithXml() throws InterruptedException { + LiteflowResponse response = flowExecutor.execute2Resp("chain1", "arg"); + Assertions.assertTrue(response.isSuccess()); + Assertions.assertEquals("a==>b==>c", response.getExecuteStepStr()); + + //修改redis中规则 + changeXMLData(); + //重新加载规则 + Thread.sleep(100); + + Assertions.assertEquals("a==>c==>b", flowExecutor.execute2Resp("chain1", "arg").getExecuteStepStr()); + + //删除redis中规则 + deleteXMLData(); + //重新加载规则 + Thread.sleep(100); + //由于chain1已被删除 这里会报ChainNotFoundException异常 + response = flowExecutor.execute2Resp("chain1", "arg"); + Assertions.assertTrue(!response.isSuccess()); + + //添加redis中规则 + addXMLData(); + //重新加载规则 + Thread.sleep(100); + Assertions.assertEquals("b==>c", flowExecutor.execute2Resp("chain4", "arg").getExecuteStepStr()); + } + + /** + * 测试script + */ + @Test + public void testSubWithScriptXml() throws InterruptedException { + LiteflowResponse response = flowExecutor.execute2Resp("chain3", "arg"); + DefaultContext context = response.getFirstContextBean(); + Assertions.assertTrue(response.isSuccess()); + Assertions.assertEquals("hello s1", context.getData("test1")); + Assertions.assertEquals("a==>b==>c==>s1[脚本s1]==>s2[脚本s2]", response.getExecuteStepStrWithoutTime()); + + //添加和删除脚本 + addAndDeleteScriptData(); + //修改redis脚本 + changeScriptData(); + Thread.sleep(100); + context = flowExecutor.execute2Resp("chain3", "arg").getFirstContextBean(); + Assertions.assertEquals("hello s1 version2", context.getData("test1")); + context = flowExecutor.execute2Resp("chain2", "arg").getFirstContextBean(); + Assertions.assertEquals("hello s3 version2", context.getData("test2")); + } + + /** + * 修改redisson中的chain + */ + public void changeXMLData() { + RedisParserHelper.changeChain("chain1", "THEN(a, c, b);"); + } + + /** + * 删除redisson中的chain + */ + public void deleteXMLData() { + FlowBus.removeChain("chain1"); + FlowBus.removeChain("chain4"); + } + + /** + * 新增redisson中的chain + */ + public void addXMLData() { + RedisParserHelper.changeChain("chain4", "THEN(b, c);"); + } + + /** + * 修改redisson中的脚本 + */ + public void changeScriptData() { + RedisParserHelper.changeScriptNode("s1:script:脚本s1:groovy", "defaultContext.setData(\"test1\",\"hello s1 version2\");"); + RedisParserHelper.changeScriptNode("s3:script:脚本s3", "defaultContext.setData(\"test2\",\"hello s3 version2\");"); + } + + /** + * 新增和删除redisson中的chain + */ + public void addAndDeleteScriptData() { + NodeConvertHelper.NodeSimpleVO nodeSimpleVO = NodeConvertHelper.convert("s3:script:脚本s3"); + FlowBus.unloadScriptNode(nodeSimpleVO.getNodeId()); + + RedisParserHelper.changeScriptNode("s5:script:脚本s5:groovy", "defaultContext.setData(\"test1\",\"hello s5\");"); + } + + +} diff --git a/liteflow-testcase-el/liteflow-testcase-el-redis-springboot/src/test/resources/redis/application-poll-cluster-xml.properties b/liteflow-testcase-el/liteflow-testcase-el-redis-springboot/src/test/resources/redis/application-poll-cluster-xml.properties new file mode 100644 index 000000000..6153f5d55 --- /dev/null +++ b/liteflow-testcase-el/liteflow-testcase-el-redis-springboot/src/test/resources/redis/application-poll-cluster-xml.properties @@ -0,0 +1,9 @@ +liteflow.rule-source-ext-data={\ + "redisMode":"cluster",\ + "clusterAddress":"127.0.0.1:26389,127.0.0.1:26379",\ + "pollingInterval":1,\ + "pollingStartTime":2,\ + "chainKey":"pollChainKey",\ + "scriptKey":"pollScriptKey"\ + } +liteflow.parse-mode=PARSE_ALL_ON_FIRST_EXEC \ No newline at end of file diff --git a/liteflow-testcase-el/liteflow-testcase-el-redis-springboot/src/test/resources/redis/application-sub-cluster-xml.properties b/liteflow-testcase-el/liteflow-testcase-el-redis-springboot/src/test/resources/redis/application-sub-cluster-xml.properties new file mode 100644 index 000000000..826ac8948 --- /dev/null +++ b/liteflow-testcase-el/liteflow-testcase-el-redis-springboot/src/test/resources/redis/application-sub-cluster-xml.properties @@ -0,0 +1,10 @@ +liteflow.rule-source-ext-data={\ + "redisMode":"cluster",\ + "clusterAddress":"127.0.0.1:26389,127.0.0.1:26379",\ + "mode":"sub",\ + "chainDataBase":1,\ + "chainKey":"testChainKey",\ + "scriptDataBase":1,\ + "scriptKey":"testScriptKey"\ + } +liteflow.parse-mode=PARSE_ALL_ON_FIRST_EXEC \ No newline at end of file -- Gitee From e659a4a2b0c50247ceec00356688cdbaa49e8f5c Mon Sep 17 00:00:00 2001 From: jay li Date: Tue, 24 Jun 2025 11:37:12 +0800 Subject: [PATCH 2/3] =?UTF-8?q?=20#ICGGAW=20redis=E6=95=B0=E6=8D=AE?= =?UTF-8?q?=E6=BA=90=E6=94=AF=E6=8C=81=E9=9B=86=E7=BE=A4=E6=A8=A1=E5=BC=8F?= =?UTF-8?q?,=E4=BF=AE=E6=94=B9=E6=A0=A1=E9=AA=8C?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../com/yomahub/liteflow/parser/redis/RedisXmlELParser.java | 2 +- .../parser/redis/mode/polling/RedisParserPollingMode.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/liteflow-rule-plugin/liteflow-rule-redis/src/main/java/com/yomahub/liteflow/parser/redis/RedisXmlELParser.java b/liteflow-rule-plugin/liteflow-rule-redis/src/main/java/com/yomahub/liteflow/parser/redis/RedisXmlELParser.java index ce652aa03..92196a6ac 100644 --- a/liteflow-rule-plugin/liteflow-rule-redis/src/main/java/com/yomahub/liteflow/parser/redis/RedisXmlELParser.java +++ b/liteflow-rule-plugin/liteflow-rule-redis/src/main/java/com/yomahub/liteflow/parser/redis/RedisXmlELParser.java @@ -119,7 +119,7 @@ public class RedisXmlELParser extends ClassXmlFlowELParser { if (redisParserVO.getRedisMode().equals(RedisMode.CLUSTER) && CollectionUtil.isEmpty(redisParserVO.getClusterNodeAddress())) { throw new RedisException(StrFormatter.format(ERROR_MSG_PATTERN, "cluster address list")); } - if (ObjectUtil.isNull(redisParserVO.getScriptKey()) && ObjectUtil.isNull(redisParserVO.getScriptDataBase()) && !redisParserVO.getRedisMode().equals(RedisMode.CLUSTER)) { + if (ObjectUtil.isNotNull(redisParserVO.getScriptKey()) && ObjectUtil.isNull(redisParserVO.getScriptDataBase()) && !redisParserVO.getRedisMode().equals(RedisMode.CLUSTER)) { throw new RedisException(StrFormatter.format(ERROR_MSG_PATTERN, "scriptDataBase")); } } diff --git a/liteflow-rule-plugin/liteflow-rule-redis/src/main/java/com/yomahub/liteflow/parser/redis/mode/polling/RedisParserPollingMode.java b/liteflow-rule-plugin/liteflow-rule-redis/src/main/java/com/yomahub/liteflow/parser/redis/mode/polling/RedisParserPollingMode.java index 3338f772f..4331a5f02 100644 --- a/liteflow-rule-plugin/liteflow-rule-redis/src/main/java/com/yomahub/liteflow/parser/redis/mode/polling/RedisParserPollingMode.java +++ b/liteflow-rule-plugin/liteflow-rule-redis/src/main/java/com/yomahub/liteflow/parser/redis/mode/polling/RedisParserPollingMode.java @@ -214,7 +214,7 @@ public class RedisParserPollingMode implements RedisParserHelper { redisParserVO.getPollingInterval().longValue(), TimeUnit.SECONDS); //如果有脚本 - if (ObjectUtil.isNotNull(scriptClient) && (ObjectUtil.isNotNull(redisParserVO.getScriptDataBase()) || RedisMode.CLUSTER.getMode().equals(redisParserVO.getMode().getMode())) + if (ObjectUtil.isNotNull(scriptClient) && (ObjectUtil.isNotNull(redisParserVO.getScriptDataBase()) || RedisMode.CLUSTER.equals(redisParserVO.getRedisMode())) && StrUtil.isNotBlank(redisParserVO.getScriptKey())) { //将lua脚本添加到scriptJedis脚本缓存 String keyLuaOfScript = scriptClient.scriptLoad(luaOfKey); -- Gitee From 23c540ae53d9ced9b7569222a2a784fc54499eb0 Mon Sep 17 00:00:00 2001 From: jay li Date: Tue, 24 Jun 2025 14:36:17 +0800 Subject: [PATCH 3/3] =?UTF-8?q?=20#ICGGAW=20=E9=98=B2=E6=AD=A2=E6=B5=8B?= =?UTF-8?q?=E8=AF=95=E7=B1=BB=E5=85=B3=E9=97=AD=E7=BA=BF=E7=A8=8B=E6=B1=A0?= =?UTF-8?q?=E4=BA=92=E7=9B=B8=E5=BD=B1=E5=93=8D?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../redis/RedisClusterPollSpringBootTest.java | 21 +++++++++------- .../RedisWithXmlELPollSpringbootTest.java | 24 +++++++++++-------- 2 files changed, 27 insertions(+), 18 deletions(-) diff --git a/liteflow-testcase-el/liteflow-testcase-el-redis-springboot/src/test/java/com/yomahub/liteflow/test/redis/RedisClusterPollSpringBootTest.java b/liteflow-testcase-el/liteflow-testcase-el-redis-springboot/src/test/java/com/yomahub/liteflow/test/redis/RedisClusterPollSpringBootTest.java index b6149f86f..14f697796 100644 --- a/liteflow-testcase-el/liteflow-testcase-el-redis-springboot/src/test/java/com/yomahub/liteflow/test/redis/RedisClusterPollSpringBootTest.java +++ b/liteflow-testcase-el/liteflow-testcase-el-redis-springboot/src/test/java/com/yomahub/liteflow/test/redis/RedisClusterPollSpringBootTest.java @@ -27,6 +27,7 @@ import javax.annotation.Resource; import java.lang.reflect.Field; import java.util.HashSet; import java.util.Set; +import java.util.concurrent.Executors; import java.util.concurrent.ScheduledThreadPoolExecutor; import static org.mockito.ArgumentMatchers.anyString; @@ -74,17 +75,21 @@ public class RedisClusterPollSpringBootTest extends BaseTest { static LFLog LOG = LFLoggerManager.getLogger(RedisClusterPollSpringBootTest.class); - - @AfterAll - public static void after() { - //关闭poll模式的轮询线程池 - try{ + @AfterEach + void afterEach() { + try { Field pollExecutor = RedisParserPollingMode.class.getDeclaredField("pollExecutor"); pollExecutor.setAccessible(true); - ScheduledThreadPoolExecutor threadPoolExecutor = (ScheduledThreadPoolExecutor) pollExecutor.get(null); - threadPoolExecutor.shutdownNow(); + // 关闭旧线程池 + ScheduledThreadPoolExecutor oldPool = (ScheduledThreadPoolExecutor) pollExecutor.get(null); + if (oldPool != null) { + oldPool.shutdownNow(); + } + // 创建新线程池并设置回去 + ScheduledThreadPoolExecutor newPool = (ScheduledThreadPoolExecutor) Executors.newScheduledThreadPool(1); + pollExecutor.set(null, newPool); } catch (Exception ignored) { - LOG.error("[Polling thread pool not closed]", ignored); + LOG.error("[Polling thread pool reset failed]", ignored); } } diff --git a/liteflow-testcase-el/liteflow-testcase-el-redis-springboot/src/test/java/com/yomahub/liteflow/test/redis/RedisWithXmlELPollSpringbootTest.java b/liteflow-testcase-el/liteflow-testcase-el-redis-springboot/src/test/java/com/yomahub/liteflow/test/redis/RedisWithXmlELPollSpringbootTest.java index 2de4189a9..2c0a7397a 100644 --- a/liteflow-testcase-el/liteflow-testcase-el-redis-springboot/src/test/java/com/yomahub/liteflow/test/redis/RedisWithXmlELPollSpringbootTest.java +++ b/liteflow-testcase-el/liteflow-testcase-el-redis-springboot/src/test/java/com/yomahub/liteflow/test/redis/RedisWithXmlELPollSpringbootTest.java @@ -11,9 +11,7 @@ import com.yomahub.liteflow.parser.redis.mode.RClient; import com.yomahub.liteflow.parser.redis.mode.polling.RedisParserPollingMode; import com.yomahub.liteflow.slot.DefaultContext; import com.yomahub.liteflow.test.BaseTest; -import org.junit.jupiter.api.AfterAll; -import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.*; import org.junit.jupiter.api.extension.ExtendWith; import org.springframework.boot.autoconfigure.EnableAutoConfiguration; import org.springframework.boot.test.context.SpringBootTest; @@ -26,6 +24,7 @@ import javax.annotation.Resource; import java.lang.reflect.Field; import java.util.HashSet; import java.util.Set; +import java.util.concurrent.Executors; import java.util.concurrent.ScheduledThreadPoolExecutor; import static org.mockito.ArgumentMatchers.anyString; @@ -71,16 +70,21 @@ public class RedisWithXmlELPollSpringbootTest extends BaseTest { static LFLog LOG = LFLoggerManager.getLogger(RedisWithXmlELPollSpringbootTest.class); - @AfterAll - public static void after() { - //关闭poll模式的轮询线程池 - try{ + @AfterEach + void afterEach() { + try { Field pollExecutor = RedisParserPollingMode.class.getDeclaredField("pollExecutor"); pollExecutor.setAccessible(true); - ScheduledThreadPoolExecutor threadPoolExecutor = (ScheduledThreadPoolExecutor) pollExecutor.get(null); - threadPoolExecutor.shutdownNow(); + // 关闭旧线程池 + ScheduledThreadPoolExecutor oldPool = (ScheduledThreadPoolExecutor) pollExecutor.get(null); + if (oldPool != null) { + oldPool.shutdownNow(); + } + // 创建新线程池并设置回去 + ScheduledThreadPoolExecutor newPool = (ScheduledThreadPoolExecutor) Executors.newScheduledThreadPool(1); + pollExecutor.set(null, newPool); } catch (Exception ignored) { - LOG.error("[Polling thread pool not closed]", ignored); + LOG.error("[Polling thread pool reset failed]", ignored); } } -- Gitee