diff --git a/liteflow-core/src/main/java/com/yomahub/liteflow/flow/element/Node.java b/liteflow-core/src/main/java/com/yomahub/liteflow/flow/element/Node.java index 269a4dd62c4c03a5f6af432eefc81a12633f8cd6..1a4588e7ef1d158baa405af692fd860456b781c8 100644 --- a/liteflow-core/src/main/java/com/yomahub/liteflow/flow/element/Node.java +++ b/liteflow-core/src/main/java/com/yomahub/liteflow/flow/element/Node.java @@ -21,8 +21,6 @@ import com.yomahub.liteflow.flow.executor.NodeExecutor; import com.yomahub.liteflow.flow.executor.NodeExecutorHelper; import com.yomahub.liteflow.log.LFLog; import com.yomahub.liteflow.log.LFLoggerManager; -import com.yomahub.liteflow.property.LiteflowConfig; -import com.yomahub.liteflow.property.LiteflowConfigGetter; import com.yomahub.liteflow.slot.DataBus; import com.yomahub.liteflow.slot.Slot; @@ -30,6 +28,7 @@ import com.yomahub.liteflow.slot.Slot; * Node节点,实现可执行器 Node节点并不是单例的,每构建一次都会copy出一个新的实例 * * @author Bryan.Zhang + * @author luo yi */ public class Node implements Executable, Cloneable, Rollbackable{ @@ -57,6 +56,9 @@ public class Node implements Executable, Cloneable, Rollbackable{ private String currChainId; + // node 的 isAccess 结果,主要用于 WhenCondition 的提前 isAccess 判断,避免 isAccess 方法重复执行 + private TransmittableThreadLocal accessResult = new TransmittableThreadLocal<>(); + private TransmittableThreadLocal loopIndexTL = new TransmittableThreadLocal<>(); private TransmittableThreadLocal currLoopObject = new TransmittableThreadLocal<>(); @@ -125,16 +127,13 @@ public class Node implements Executable, Cloneable, Rollbackable{ throw new FlowSystemException("there is no instance for node id " + id); } - Slot slot = DataBus.getSlot(slotIndex); try { // 把线程属性赋值给组件对象 instance.setSlotIndex(slotIndex); instance.setRefNode(this); - LiteflowConfig liteflowConfig = LiteflowConfigGetter.get(); - // 判断是否可执行,所以isAccess经常作为一个组件进入的实际判断要素,用作检查slot里的参数的完备性 - if (instance.isAccess()) { + if (getAccessResult() || instance.isAccess()) { LOG.info("[O]start component[{}] execution", instance.getDisplayName()); // 这里开始进行重试的逻辑和主逻辑的运行 @@ -142,8 +141,7 @@ public class Node implements Executable, Cloneable, Rollbackable{ .buildNodeExecutor(instance.getNodeExecutorClass()); // 调用节点执行器进行执行 nodeExecutor.execute(instance); - } - else { + } else { LOG.info("[X]skip component[{}] execution", instance.getDisplayName()); } // 如果组件覆盖了isEnd方法,或者在在逻辑中主要调用了setEnd(true)的话,流程就会立马结束 @@ -178,6 +176,7 @@ public class Node implements Executable, Cloneable, Rollbackable{ instance.removeIsEnd(); instance.removeRefNode(); removeLoopIndex(); + removeAccessResult(); } } @@ -253,6 +252,19 @@ public class Node implements Executable, Cloneable, Rollbackable{ return currChainId; } + public boolean getAccessResult() { + Boolean result = this.accessResult.get(); + return result == null ? false : result; + } + + public void setAccessResult(boolean accessResult) { + this.accessResult.set(accessResult); + } + + public void removeAccessResult() { + this.accessResult.remove(); + } + public void setLoopIndex(int index) { this.loopIndexTL.set(index); } @@ -299,6 +311,7 @@ public class Node implements Executable, Cloneable, Rollbackable{ Node node = (Node)this.clone(); node.loopIndexTL = new TransmittableThreadLocal<>(); node.currLoopObject = new TransmittableThreadLocal<>(); + node.accessResult = new TransmittableThreadLocal<>(); return node; } } diff --git a/liteflow-core/src/main/java/com/yomahub/liteflow/flow/parallel/strategy/AllOfParallelExecutor.java b/liteflow-core/src/main/java/com/yomahub/liteflow/flow/parallel/strategy/AllOfParallelExecutor.java index 3ca243fbd128e66ee92dd4851917f54db84100e5..2cb606f1ee380995c1eda62861dbe85053a36272 100644 --- a/liteflow-core/src/main/java/com/yomahub/liteflow/flow/parallel/strategy/AllOfParallelExecutor.java +++ b/liteflow-core/src/main/java/com/yomahub/liteflow/flow/parallel/strategy/AllOfParallelExecutor.java @@ -31,9 +31,10 @@ public class AllOfParallelExecutor extends ParallelStrategyExecutor { } - //在allOf这个场景中,不需要过滤 + // 在 allOf 这个场景中,不需要过滤 @Override protected Stream filterAccess(Stream stream, Integer slotIndex) { return stream; } + } diff --git a/liteflow-core/src/main/java/com/yomahub/liteflow/flow/parallel/strategy/AnyOfParallelExecutor.java b/liteflow-core/src/main/java/com/yomahub/liteflow/flow/parallel/strategy/AnyOfParallelExecutor.java index 6f4c19257f71269da8943e880e71e6a34e57861d..ca1475dec412aae4838d398e8c18843d9760f543 100644 --- a/liteflow-core/src/main/java/com/yomahub/liteflow/flow/parallel/strategy/AnyOfParallelExecutor.java +++ b/liteflow-core/src/main/java/com/yomahub/liteflow/flow/parallel/strategy/AnyOfParallelExecutor.java @@ -1,12 +1,10 @@ package com.yomahub.liteflow.flow.parallel.strategy; -import com.yomahub.liteflow.flow.element.Executable; import com.yomahub.liteflow.flow.element.condition.WhenCondition; import com.yomahub.liteflow.flow.parallel.WhenFutureObj; import java.util.List; import java.util.concurrent.CompletableFuture; -import java.util.stream.Stream; /** * 完成任一任务 @@ -31,18 +29,4 @@ public class AnyOfParallelExecutor extends ParallelStrategyExecutor { } - //在anyOf这个场景中,需要过滤掉isAccess为false的场景 - //因为不过滤这个的话,如果加上了 any,那么 isAccess 为 false 那就是最快的了 - //换句话说,就是anyOf这个场景,isAccess会被执行两次 - @Override - protected Stream filterAccess(Stream stream, Integer slotIndex) { - return stream.filter(executable -> { - try { - return executable.isAccess(slotIndex); - } catch (Exception e) { - LOG.error("there was an error when executing the when component isAccess", e); - return false; - } - }); - } } diff --git a/liteflow-core/src/main/java/com/yomahub/liteflow/flow/parallel/strategy/ParallelStrategyExecutor.java b/liteflow-core/src/main/java/com/yomahub/liteflow/flow/parallel/strategy/ParallelStrategyExecutor.java index 701c74188e06f99ea0f802fda7b1b1bf587fd09f..27a8f48929d315d7354ff486079ff0e6e57f6d89 100644 --- a/liteflow-core/src/main/java/com/yomahub/liteflow/flow/parallel/strategy/ParallelStrategyExecutor.java +++ b/liteflow-core/src/main/java/com/yomahub/liteflow/flow/parallel/strategy/ParallelStrategyExecutor.java @@ -6,6 +6,7 @@ import cn.hutool.core.util.StrUtil; import com.yomahub.liteflow.enums.ParallelStrategyEnum; import com.yomahub.liteflow.exception.WhenExecuteException; import com.yomahub.liteflow.flow.element.Executable; +import com.yomahub.liteflow.flow.element.Node; import com.yomahub.liteflow.flow.element.condition.FinallyCondition; import com.yomahub.liteflow.flow.element.condition.PreCondition; import com.yomahub.liteflow.flow.element.condition.WhenCondition; @@ -97,8 +98,21 @@ public abstract class ParallelStrategyExecutor { return filterAccess(stream, slotIndex); } - //过滤isAccess的抽象接口方法 - protected abstract Stream filterAccess(Stream stream, Integer slotIndex); + // 过滤 isAccess 的方法,默认实现,同时为避免同一个 node 的 isAccess 方法重复执行,给 node 设置 isAccess 方法执行结果 + protected Stream filterAccess(Stream stream, Integer slotIndex) { + return stream.filter(executable -> { + try { + boolean access = executable.isAccess(slotIndex); + if (executable instanceof Node) { + ((Node) executable).setAccessResult(access); + } + return access; + } catch (Exception e) { + LOG.error("there was an error when executing the when component isAccess", e); + return false; + } + }); + } /** * 获取 WHEN 所需线程池 diff --git a/liteflow-core/src/main/java/com/yomahub/liteflow/flow/parallel/strategy/SpecifyParallelExecutor.java b/liteflow-core/src/main/java/com/yomahub/liteflow/flow/parallel/strategy/SpecifyParallelExecutor.java index c345a83fd046779fd013ae5d15af0cf872bb41de..e9a78ef9aa053e500ec9107f5ad3b9b9d429a358 100644 --- a/liteflow-core/src/main/java/com/yomahub/liteflow/flow/parallel/strategy/SpecifyParallelExecutor.java +++ b/liteflow-core/src/main/java/com/yomahub/liteflow/flow/parallel/strategy/SpecifyParallelExecutor.java @@ -1,14 +1,12 @@ package com.yomahub.liteflow.flow.parallel.strategy; import cn.hutool.core.collection.CollUtil; -import com.yomahub.liteflow.flow.element.Executable; import com.yomahub.liteflow.flow.element.condition.WhenCondition; import com.yomahub.liteflow.flow.parallel.WhenFutureObj; import java.util.*; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; -import java.util.stream.Stream; /** * 完成指定任务执行器,使用 ID 进行比较 @@ -77,19 +75,4 @@ public class SpecifyParallelExecutor extends ParallelStrategyExecutor { } - //在must这个场景中,需要过滤掉isAccess为false的场景 - //因为不过滤这个的话,如果加上了 any,那么 isAccess 为 false 那就是最快的了 - //换句话说,就是must这个场景,isAccess会被执行两次 - @Override - protected Stream filterAccess(Stream stream, Integer slotIndex) { - return stream.filter(executable -> { - try { - return executable.isAccess(slotIndex); - } catch (Exception e) { - LOG.error("there was an error when executing the when component isAccess", e); - return false; - } - }); - } - } diff --git a/liteflow-testcase-el/liteflow-testcase-el-solon/src/test/java/com/yomahub/liteflow/test/cmpRetry/LiteflowRetryELSpringbootTest.java b/liteflow-testcase-el/liteflow-testcase-el-solon/src/test/java/com/yomahub/liteflow/test/cmpRetry/LiteflowRetryELSpringbootTest.java index 29f92fdb83e13a3a6ad121bbc49543c500e4b40a..4a28fed19a78061dbdc3d327bdeaf2d23d2ba006 100644 --- a/liteflow-testcase-el/liteflow-testcase-el-solon/src/test/java/com/yomahub/liteflow/test/cmpRetry/LiteflowRetryELSpringbootTest.java +++ b/liteflow-testcase-el/liteflow-testcase-el-solon/src/test/java/com/yomahub/liteflow/test/cmpRetry/LiteflowRetryELSpringbootTest.java @@ -6,7 +6,6 @@ import com.yomahub.liteflow.test.BaseTest; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; -import org.noear.snack.ONode; import org.noear.solon.annotation.Import; import org.noear.solon.annotation.Inject; import org.noear.solon.test.SolonJUnit5Extension; diff --git a/liteflow-testcase-el/liteflow-testcase-el-solon/src/test/java/com/yomahub/liteflow/test/execute2Future/Executor2FutureELSpringbootTest.java b/liteflow-testcase-el/liteflow-testcase-el-solon/src/test/java/com/yomahub/liteflow/test/execute2Future/Executor2FutureELSpringbootTest.java index 6afc58dda1e53e8fef36883cb6abdc8ae31432ef..705838dc4dbc6f4c924cf18752a9f6de3235ef3e 100644 --- a/liteflow-testcase-el/liteflow-testcase-el-solon/src/test/java/com/yomahub/liteflow/test/execute2Future/Executor2FutureELSpringbootTest.java +++ b/liteflow-testcase-el/liteflow-testcase-el-solon/src/test/java/com/yomahub/liteflow/test/execute2Future/Executor2FutureELSpringbootTest.java @@ -10,6 +10,7 @@ import org.junit.jupiter.api.extension.ExtendWith; import org.noear.solon.annotation.Import; import org.noear.solon.annotation.Inject; import org.noear.solon.test.SolonJUnit5Extension; + import java.util.concurrent.Future; /** diff --git a/liteflow-testcase-el/liteflow-testcase-el-solon/src/test/java/com/yomahub/liteflow/test/rollback/RollbackSpringbootTest.java b/liteflow-testcase-el/liteflow-testcase-el-solon/src/test/java/com/yomahub/liteflow/test/rollback/RollbackSpringbootTest.java index 03f08da5000117d3ea8a8e4b289ed6c9584c0e6f..464059e6f59a313a569b410cb96810866c297cb5 100644 --- a/liteflow-testcase-el/liteflow-testcase-el-solon/src/test/java/com/yomahub/liteflow/test/rollback/RollbackSpringbootTest.java +++ b/liteflow-testcase-el/liteflow-testcase-el-solon/src/test/java/com/yomahub/liteflow/test/rollback/RollbackSpringbootTest.java @@ -10,7 +10,6 @@ import org.junit.jupiter.api.extension.ExtendWith; import org.noear.solon.annotation.Import; import org.noear.solon.annotation.Inject; import org.noear.solon.test.SolonJUnit5Extension; -import org.noear.solon.test.annotation.TestPropertySource; @ExtendWith(SolonJUnit5Extension.class)