From 5c6b8387dd145fa2a514f1ccdf6c4c4b4975431e Mon Sep 17 00:00:00 2001 From: luoyi <972849752@qq.com> Date: Mon, 15 Jan 2024 12:26:33 +0800 Subject: [PATCH] =?UTF-8?q?bug=20#I8MXHX=20=E4=BF=AE=E5=A4=8D=20WhenCondit?= =?UTF-8?q?ion=20=E4=B8=8B=E7=9A=84=20node=20=E9=87=8D=E5=A4=8D=E6=89=A7?= =?UTF-8?q?=E8=A1=8C=20isAccess=20=E6=96=B9=E6=B3=95=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../yomahub/liteflow/flow/element/Node.java | 22 ++++++++++++------- .../strategy/AllOfParallelExecutor.java | 7 ------ .../strategy/AnyOfParallelExecutor.java | 16 -------------- .../strategy/ParallelStrategyExecutor.java | 14 +++++++----- .../strategy/SpecifyParallelExecutor.java | 17 -------------- 5 files changed, 22 insertions(+), 54 deletions(-) diff --git a/liteflow-core/src/main/java/com/yomahub/liteflow/flow/element/Node.java b/liteflow-core/src/main/java/com/yomahub/liteflow/flow/element/Node.java index 269a4dd62..50ec99f48 100644 --- a/liteflow-core/src/main/java/com/yomahub/liteflow/flow/element/Node.java +++ b/liteflow-core/src/main/java/com/yomahub/liteflow/flow/element/Node.java @@ -21,8 +21,6 @@ import com.yomahub.liteflow.flow.executor.NodeExecutor; import com.yomahub.liteflow.flow.executor.NodeExecutorHelper; import com.yomahub.liteflow.log.LFLog; import com.yomahub.liteflow.log.LFLoggerManager; -import com.yomahub.liteflow.property.LiteflowConfig; -import com.yomahub.liteflow.property.LiteflowConfigGetter; import com.yomahub.liteflow.slot.DataBus; import com.yomahub.liteflow.slot.Slot; @@ -30,6 +28,7 @@ import com.yomahub.liteflow.slot.Slot; * Node节点,实现可执行器 Node节点并不是单例的,每构建一次都会copy出一个新的实例 * * @author Bryan.Zhang + * @author luo yi */ public class Node implements Executable, Cloneable, Rollbackable{ @@ -57,6 +56,9 @@ public class Node implements Executable, Cloneable, Rollbackable{ private String currChainId; + // node 的 isAccess 结果,主要用于 WhenCondition 的提前 isAccess 判断,避免 isAccess 方法重复执行 + private boolean accessResult; + private TransmittableThreadLocal loopIndexTL = new TransmittableThreadLocal<>(); private TransmittableThreadLocal currLoopObject = new TransmittableThreadLocal<>(); @@ -125,16 +127,13 @@ public class Node implements Executable, Cloneable, Rollbackable{ throw new FlowSystemException("there is no instance for node id " + id); } - Slot slot = DataBus.getSlot(slotIndex); try { // 把线程属性赋值给组件对象 instance.setSlotIndex(slotIndex); instance.setRefNode(this); - LiteflowConfig liteflowConfig = LiteflowConfigGetter.get(); - // 判断是否可执行,所以isAccess经常作为一个组件进入的实际判断要素,用作检查slot里的参数的完备性 - if (instance.isAccess()) { + if (accessResult || instance.isAccess()) { LOG.info("[O]start component[{}] execution", instance.getDisplayName()); // 这里开始进行重试的逻辑和主逻辑的运行 @@ -142,8 +141,7 @@ public class Node implements Executable, Cloneable, Rollbackable{ .buildNodeExecutor(instance.getNodeExecutorClass()); // 调用节点执行器进行执行 nodeExecutor.execute(instance); - } - else { + } else { LOG.info("[X]skip component[{}] execution", instance.getDisplayName()); } // 如果组件覆盖了isEnd方法,或者在在逻辑中主要调用了setEnd(true)的话,流程就会立马结束 @@ -253,6 +251,14 @@ public class Node implements Executable, Cloneable, Rollbackable{ return currChainId; } + public boolean getAccessResult() { + return accessResult; + } + + public void setAccessResult(boolean accessResult) { + this.accessResult = accessResult; + } + public void setLoopIndex(int index) { this.loopIndexTL.set(index); } diff --git a/liteflow-core/src/main/java/com/yomahub/liteflow/flow/parallel/strategy/AllOfParallelExecutor.java b/liteflow-core/src/main/java/com/yomahub/liteflow/flow/parallel/strategy/AllOfParallelExecutor.java index 3ca243fbd..dd5f518e6 100644 --- a/liteflow-core/src/main/java/com/yomahub/liteflow/flow/parallel/strategy/AllOfParallelExecutor.java +++ b/liteflow-core/src/main/java/com/yomahub/liteflow/flow/parallel/strategy/AllOfParallelExecutor.java @@ -1,12 +1,10 @@ package com.yomahub.liteflow.flow.parallel.strategy; -import com.yomahub.liteflow.flow.element.Executable; import com.yomahub.liteflow.flow.element.condition.WhenCondition; import com.yomahub.liteflow.flow.parallel.WhenFutureObj; import java.util.List; import java.util.concurrent.CompletableFuture; -import java.util.stream.Stream; /** * 完成全部任务 @@ -31,9 +29,4 @@ public class AllOfParallelExecutor extends ParallelStrategyExecutor { } - //在allOf这个场景中,不需要过滤 - @Override - protected Stream filterAccess(Stream stream, Integer slotIndex) { - return stream; - } } diff --git a/liteflow-core/src/main/java/com/yomahub/liteflow/flow/parallel/strategy/AnyOfParallelExecutor.java b/liteflow-core/src/main/java/com/yomahub/liteflow/flow/parallel/strategy/AnyOfParallelExecutor.java index 6f4c19257..ca1475dec 100644 --- a/liteflow-core/src/main/java/com/yomahub/liteflow/flow/parallel/strategy/AnyOfParallelExecutor.java +++ b/liteflow-core/src/main/java/com/yomahub/liteflow/flow/parallel/strategy/AnyOfParallelExecutor.java @@ -1,12 +1,10 @@ package com.yomahub.liteflow.flow.parallel.strategy; -import com.yomahub.liteflow.flow.element.Executable; import com.yomahub.liteflow.flow.element.condition.WhenCondition; import com.yomahub.liteflow.flow.parallel.WhenFutureObj; import java.util.List; import java.util.concurrent.CompletableFuture; -import java.util.stream.Stream; /** * 完成任一任务 @@ -31,18 +29,4 @@ public class AnyOfParallelExecutor extends ParallelStrategyExecutor { } - //在anyOf这个场景中,需要过滤掉isAccess为false的场景 - //因为不过滤这个的话,如果加上了 any,那么 isAccess 为 false 那就是最快的了 - //换句话说,就是anyOf这个场景,isAccess会被执行两次 - @Override - protected Stream filterAccess(Stream stream, Integer slotIndex) { - return stream.filter(executable -> { - try { - return executable.isAccess(slotIndex); - } catch (Exception e) { - LOG.error("there was an error when executing the when component isAccess", e); - return false; - } - }); - } } diff --git a/liteflow-core/src/main/java/com/yomahub/liteflow/flow/parallel/strategy/ParallelStrategyExecutor.java b/liteflow-core/src/main/java/com/yomahub/liteflow/flow/parallel/strategy/ParallelStrategyExecutor.java index b8199419e..0cf8f2bd0 100644 --- a/liteflow-core/src/main/java/com/yomahub/liteflow/flow/parallel/strategy/ParallelStrategyExecutor.java +++ b/liteflow-core/src/main/java/com/yomahub/liteflow/flow/parallel/strategy/ParallelStrategyExecutor.java @@ -6,6 +6,7 @@ import cn.hutool.core.util.StrUtil; import com.yomahub.liteflow.enums.ParallelStrategyEnum; import com.yomahub.liteflow.exception.WhenExecuteException; import com.yomahub.liteflow.flow.element.Executable; +import com.yomahub.liteflow.flow.element.Node; import com.yomahub.liteflow.flow.element.condition.FinallyCondition; import com.yomahub.liteflow.flow.element.condition.PreCondition; import com.yomahub.liteflow.flow.element.condition.WhenCondition; @@ -92,22 +93,23 @@ public abstract class ParallelStrategyExecutor { protected Stream filterWhenTaskList(List executableList, Integer slotIndex) { // 1.先进行过滤,前置和后置组件过滤掉,因为在 EL Chain 处理的时候已经提出来了 // 2.过滤 isAccess 为 false 的情况,因为不过滤这个的话,如果加上了 any,那么 isAccess 为 false 那就是最快的了 - Stream stream = executableList.stream() + // 3.为避免同一个 node 的 isAccess 方法重复执行,给 node 设置 isAccess 方法执行结果 + return executableList.stream() .filter(executable -> !(executable instanceof PreCondition) && !(executable instanceof FinallyCondition)) .filter(executable -> { try { - return executable.isAccess(slotIndex); + boolean access = executable.isAccess(slotIndex); + if (executable instanceof Node) { + ((Node) executable).setAccessResult(access); + } + return access; } catch (Exception e) { LOG.error("there was an error when executing the when component isAccess", e); return false; } }); - return filterAccess(stream, slotIndex); } - //过滤isAccess的抽象接口方法 - protected abstract Stream filterAccess(Stream stream, Integer slotIndex); - /** * 获取 WHEN 所需线程池 * @param whenCondition diff --git a/liteflow-core/src/main/java/com/yomahub/liteflow/flow/parallel/strategy/SpecifyParallelExecutor.java b/liteflow-core/src/main/java/com/yomahub/liteflow/flow/parallel/strategy/SpecifyParallelExecutor.java index c345a83fd..e9a78ef9a 100644 --- a/liteflow-core/src/main/java/com/yomahub/liteflow/flow/parallel/strategy/SpecifyParallelExecutor.java +++ b/liteflow-core/src/main/java/com/yomahub/liteflow/flow/parallel/strategy/SpecifyParallelExecutor.java @@ -1,14 +1,12 @@ package com.yomahub.liteflow.flow.parallel.strategy; import cn.hutool.core.collection.CollUtil; -import com.yomahub.liteflow.flow.element.Executable; import com.yomahub.liteflow.flow.element.condition.WhenCondition; import com.yomahub.liteflow.flow.parallel.WhenFutureObj; import java.util.*; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; -import java.util.stream.Stream; /** * 完成指定任务执行器,使用 ID 进行比较 @@ -77,19 +75,4 @@ public class SpecifyParallelExecutor extends ParallelStrategyExecutor { } - //在must这个场景中,需要过滤掉isAccess为false的场景 - //因为不过滤这个的话,如果加上了 any,那么 isAccess 为 false 那就是最快的了 - //换句话说,就是must这个场景,isAccess会被执行两次 - @Override - protected Stream filterAccess(Stream stream, Integer slotIndex) { - return stream.filter(executable -> { - try { - return executable.isAccess(slotIndex); - } catch (Exception e) { - LOG.error("there was an error when executing the when component isAccess", e); - return false; - } - }); - } - } -- Gitee