bug #I4XRBA 关于when和then混合使用时(有any和isAccess的情况下),then的节点先执行的问题
This commit is contained in:
parent
84ce849a61
commit
8b7a40e4b6
|
@ -77,4 +77,8 @@ public class LiteflowResponse<T> implements Serializable {
|
|||
public String getExecuteStepStr(){
|
||||
return getSlot().getExecuteStepStr();
|
||||
}
|
||||
|
||||
public String getExecuteStepStrWithoutTime(){
|
||||
return getSlot().getExecuteStepStr(false);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -34,6 +34,7 @@ import java.util.concurrent.CompletableFuture;
|
|||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.function.Predicate;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
/**
|
||||
|
@ -149,14 +150,19 @@ public class Chain implements Executable {
|
|||
//1.根据condition.getNodeList()的集合进行流处理,用map进行把executable对象转换成List<CompletableFuture<WhenFutureObj>>
|
||||
//2.在转的过程中,套入CompletableFutureTimeout方法进行超时判断,如果超时则用WhenFutureObj.timeOut返回超时的对象
|
||||
//3.第2个参数是主要的本体CompletableFuture,传入了ParallelSupplier和线程池对象
|
||||
List<CompletableFuture<WhenFutureObj>> completableFutureList = condition.getNodeList().stream().map(
|
||||
executable -> CompletableFutureTimeout.completeOnTimeout(
|
||||
List<CompletableFuture<WhenFutureObj>> completableFutureList = condition.getNodeList().stream().filter(executable -> {
|
||||
try {
|
||||
return executable.isAccess(slotIndex);
|
||||
}catch (Exception e){
|
||||
LOG.error("there was an error when executing the when component isAccess",e);
|
||||
return false;
|
||||
}
|
||||
}).map(executable -> CompletableFutureTimeout.completeOnTimeout(
|
||||
WhenFutureObj.timeOut(executable.getExecuteName()),
|
||||
CompletableFuture.supplyAsync(new ParallelSupplier(executable, slotIndex), parallelExecutor),
|
||||
liteflowConfig.getWhenMaxWaitSeconds(),
|
||||
TimeUnit.SECONDS
|
||||
)
|
||||
).collect(Collectors.toList());
|
||||
)).collect(Collectors.toList());
|
||||
|
||||
|
||||
CompletableFuture<?> resultCompletableFuture;
|
||||
|
|
|
@ -12,6 +12,10 @@ public interface Executable{
|
|||
|
||||
void execute(Integer slotIndex) throws Exception;
|
||||
|
||||
default boolean isAccess(Integer slotIndex) throws Exception{
|
||||
return true;
|
||||
}
|
||||
|
||||
ExecuteTypeEnum getExecuteType();
|
||||
|
||||
String getExecuteName();
|
||||
|
|
|
@ -111,9 +111,8 @@ public class Node implements Executable,Cloneable{
|
|||
if (ObjectUtil.isNull(instance)) {
|
||||
throw new FlowSystemException("there is no instance for node id " + id);
|
||||
}
|
||||
//每次执行node前,把分配的slot index信息放入threadLocal里
|
||||
instance.setSlotIndex(slotIndex);
|
||||
Slot slot = DataBus.getSlot(slotIndex);
|
||||
Slot<?> slot = DataBus.getSlot(slotIndex);
|
||||
|
||||
try {
|
||||
//把tag和condNodeMap赋给NodeComponent
|
||||
|
@ -163,6 +162,16 @@ public class Node implements Executable,Cloneable{
|
|||
}
|
||||
}
|
||||
|
||||
//在同步场景并不会单独执行这方法,同步场景会在execute里面去判断isAccess。
|
||||
//但是在异步场景的any=true情况下,如果isAccess返回了false,那么异步的any有可能会认为这个组件先执行完。就会导致不正常
|
||||
//增加这个方法是为了在异步的时候,先去过滤掉isAccess为false的异步组件。然后再异步执行。
|
||||
//详情见这个issue:https://gitee.com/dromara/liteFlow/issues/I4XRBA
|
||||
@Override
|
||||
public boolean isAccess(Integer slotIndex) throws Exception {
|
||||
instance.setSlotIndex(slotIndex);
|
||||
return instance.isAccess();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Object clone() throws CloneNotSupportedException {
|
||||
return super.clone();
|
||||
|
|
|
@ -43,4 +43,10 @@ public class CmpStepSpringbootTest extends BaseTest {
|
|||
Assert.assertEquals(RuntimeException.class, response.getExecuteSteps().get("d").getException().getClass());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testStep2() throws Exception{
|
||||
LiteflowResponse<DefaultContext> response = flowExecutor.execute2Resp("chain2", "arg");
|
||||
Assert.assertTrue(response.isSuccess());
|
||||
Assert.assertEquals("a==>b", response.getExecuteStepStrWithoutTime());
|
||||
}
|
||||
}
|
||||
|
|
|
@ -18,7 +18,8 @@ import org.springframework.stereotype.Component;
|
|||
public class ACmp{
|
||||
|
||||
@LiteflowMethod(LiteFlowMethodEnum.PROCESS)
|
||||
public void process(NodeComponent bindCmp) {
|
||||
public void process(NodeComponent bindCmp) throws Exception{
|
||||
Thread.sleep(5000L);
|
||||
System.out.println("ACmp executed!");
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,29 @@
|
|||
/**
|
||||
* <p>Title: liteflow</p>
|
||||
* <p>Description: 轻量级的组件式流程框架</p>
|
||||
* @author Bryan.Zhang
|
||||
* @email weenyc31@163.com
|
||||
* @Date 2020/4/1
|
||||
*/
|
||||
package com.yomahub.liteflow.test.cmpStep.cmp;
|
||||
|
||||
import com.yomahub.liteflow.annotation.LiteflowCmpDefine;
|
||||
import com.yomahub.liteflow.annotation.LiteflowMethod;
|
||||
import com.yomahub.liteflow.core.NodeComponent;
|
||||
import com.yomahub.liteflow.enums.LiteFlowMethodEnum;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
@Component("e")
|
||||
@LiteflowCmpDefine
|
||||
public class ECmp{
|
||||
|
||||
@LiteflowMethod(LiteFlowMethodEnum.PROCESS)
|
||||
public void process(NodeComponent bindCmp) {
|
||||
System.out.println("ECmp executed!");
|
||||
}
|
||||
|
||||
@LiteflowMethod(LiteFlowMethodEnum.IS_ACCESS)
|
||||
public boolean isAccess(NodeComponent bindCmp) {
|
||||
return false;
|
||||
}
|
||||
}
|
|
@ -4,4 +4,9 @@
|
|||
<then value="a,b"/>
|
||||
<when value="c,d"/>
|
||||
</chain>
|
||||
|
||||
<chain name="chain2">
|
||||
<when value="e,a" any="true"/>
|
||||
<then value="b"/>
|
||||
</chain>
|
||||
</flow>
|
|
@ -22,7 +22,7 @@ public class CmpStepTest extends BaseTest{
|
|||
}
|
||||
|
||||
@Test
|
||||
public void testStep(){
|
||||
public void testStep1(){
|
||||
LiteflowResponse<DefaultContext> response = flowExecutor.execute2Resp("chain1", "arg");
|
||||
Assert.assertFalse(response.isSuccess());
|
||||
Assert.assertTrue(response.getExecuteSteps().get("a").isSuccess());
|
||||
|
@ -33,4 +33,11 @@ public class CmpStepTest extends BaseTest{
|
|||
Assert.assertEquals(RuntimeException.class, response.getExecuteSteps().get("c").getException().getClass());
|
||||
Assert.assertEquals(RuntimeException.class, response.getExecuteSteps().get("d").getException().getClass());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testStep2() throws Exception{
|
||||
LiteflowResponse<DefaultContext> response = flowExecutor.execute2Resp("chain2", "arg");
|
||||
Assert.assertTrue(response.isSuccess());
|
||||
Assert.assertEquals("a==>b", response.getExecuteStepStrWithoutTime());
|
||||
}
|
||||
}
|
||||
|
|
|
@ -12,7 +12,8 @@ import com.yomahub.liteflow.core.NodeComponent;
|
|||
public class ACmp extends NodeComponent {
|
||||
|
||||
@Override
|
||||
public void process() {
|
||||
public void process() throws Exception{
|
||||
Thread.sleep(5000L);
|
||||
System.out.println("ACmp executed!");
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,23 @@
|
|||
/**
|
||||
* <p>Title: liteflow</p>
|
||||
* <p>Description: 轻量级的组件式流程框架</p>
|
||||
* @author Bryan.Zhang
|
||||
* @email weenyc31@163.com
|
||||
* @Date 2020/4/1
|
||||
*/
|
||||
package com.yomahub.liteflow.test.cmpStep.cmp;
|
||||
|
||||
import com.yomahub.liteflow.core.NodeComponent;
|
||||
|
||||
public class ECmp extends NodeComponent {
|
||||
|
||||
@Override
|
||||
public void process() {
|
||||
System.out.println("ECmp executed!");
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isAccess() {
|
||||
return false;
|
||||
}
|
||||
}
|
|
@ -5,10 +5,16 @@
|
|||
<node id="b" class="com.yomahub.liteflow.test.cmpStep.cmp.BCmp"/>
|
||||
<node id="c" class="com.yomahub.liteflow.test.cmpStep.cmp.CCmp"/>
|
||||
<node id="d" class="com.yomahub.liteflow.test.cmpStep.cmp.DCmp"/>
|
||||
<node id="e" class="com.yomahub.liteflow.test.cmpStep.cmp.ECmp"/>
|
||||
</nodes>
|
||||
|
||||
<chain name="chain1">
|
||||
<then value="a,b"/>
|
||||
<when value="c,d"/>
|
||||
</chain>
|
||||
|
||||
<chain name="chain2">
|
||||
<when value="e,a" any="true"/>
|
||||
<then value="b"/>
|
||||
</chain>
|
||||
</flow>
|
|
@ -16,9 +16,9 @@ import org.springframework.test.context.junit4.SpringRunner;
|
|||
import javax.annotation.Resource;
|
||||
|
||||
/**
|
||||
* springboot环境最普通的例子测试
|
||||
* springboot环境step的测试例子
|
||||
* @author Bryan.Zhang
|
||||
* @since 2.6.4
|
||||
* @since 2.7.0
|
||||
*/
|
||||
@RunWith(SpringRunner.class)
|
||||
@TestPropertySource(value = "classpath:/cmpStep/application.properties")
|
||||
|
@ -33,7 +33,7 @@ public class CmpStepSpringbootTest extends BaseTest {
|
|||
//ab串行
|
||||
//cd并行,都抛错,其中c耗时2秒
|
||||
@Test
|
||||
public void testStep() throws Exception{
|
||||
public void testStep1() throws Exception{
|
||||
LiteflowResponse<DefaultContext> response = flowExecutor.execute2Resp("chain1", "arg");
|
||||
Assert.assertFalse(response.isSuccess());
|
||||
Assert.assertTrue(response.getExecuteSteps().get("a").isSuccess());
|
||||
|
@ -45,4 +45,11 @@ public class CmpStepSpringbootTest extends BaseTest {
|
|||
Assert.assertEquals(RuntimeException.class, response.getExecuteSteps().get("d").getException().getClass());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testStep2() throws Exception{
|
||||
LiteflowResponse<DefaultContext> response = flowExecutor.execute2Resp("chain2", "arg");
|
||||
Assert.assertTrue(response.isSuccess());
|
||||
Assert.assertEquals("a==>b", response.getExecuteStepStrWithoutTime());
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -14,7 +14,8 @@ import org.springframework.stereotype.Component;
|
|||
public class ACmp extends NodeComponent {
|
||||
|
||||
@Override
|
||||
public void process() {
|
||||
public void process() throws Exception{
|
||||
Thread.sleep(5000L);
|
||||
System.out.println("ACmp executed!");
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,25 @@
|
|||
/**
|
||||
* <p>Title: liteflow</p>
|
||||
* <p>Description: 轻量级的组件式流程框架</p>
|
||||
* @author Bryan.Zhang
|
||||
* @email weenyc31@163.com
|
||||
* @Date 2020/4/1
|
||||
*/
|
||||
package com.yomahub.liteflow.test.cmpStep.cmp;
|
||||
|
||||
import com.yomahub.liteflow.core.NodeComponent;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
@Component("e")
|
||||
public class ECmp extends NodeComponent {
|
||||
|
||||
@Override
|
||||
public void process() {
|
||||
System.out.println("ECmp executed!");
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isAccess() {
|
||||
return false;
|
||||
}
|
||||
}
|
|
@ -4,4 +4,9 @@
|
|||
<then value="a,b"/>
|
||||
<when value="c,d"/>
|
||||
</chain>
|
||||
|
||||
<chain name="chain2">
|
||||
<when value="e,a" any="true"/>
|
||||
<then value="b"/>
|
||||
</chain>
|
||||
</flow>
|
|
@ -20,7 +20,7 @@ public class CmpStepSpringTest extends BaseTest {
|
|||
private FlowExecutor flowExecutor;
|
||||
|
||||
@Test
|
||||
public void testStep(){
|
||||
public void testStep1(){
|
||||
LiteflowResponse<DefaultContext> response = flowExecutor.execute2Resp("chain1", "arg");
|
||||
Assert.assertFalse(response.isSuccess());
|
||||
Assert.assertTrue(response.getExecuteSteps().get("a").isSuccess());
|
||||
|
@ -31,4 +31,11 @@ public class CmpStepSpringTest extends BaseTest {
|
|||
Assert.assertEquals(RuntimeException.class, response.getExecuteSteps().get("c").getException().getClass());
|
||||
Assert.assertEquals(RuntimeException.class, response.getExecuteSteps().get("d").getException().getClass());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testStep2() throws Exception{
|
||||
LiteflowResponse<DefaultContext> response = flowExecutor.execute2Resp("chain2", "arg");
|
||||
Assert.assertTrue(response.isSuccess());
|
||||
Assert.assertEquals("a==>b", response.getExecuteStepStrWithoutTime());
|
||||
}
|
||||
}
|
||||
|
|
|
@ -14,7 +14,8 @@ import org.springframework.stereotype.Component;
|
|||
public class ACmp extends NodeComponent {
|
||||
|
||||
@Override
|
||||
public void process() {
|
||||
public void process() throws Exception{
|
||||
Thread.sleep(5000L);
|
||||
System.out.println("ACmp executed!");
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,25 @@
|
|||
/**
|
||||
* <p>Title: liteflow</p>
|
||||
* <p>Description: 轻量级的组件式流程框架</p>
|
||||
* @author Bryan.Zhang
|
||||
* @email weenyc31@163.com
|
||||
* @Date 2020/4/1
|
||||
*/
|
||||
package com.yomahub.liteflow.test.cmpStep.cmp;
|
||||
|
||||
import com.yomahub.liteflow.core.NodeComponent;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
@Component("e")
|
||||
public class ECmp extends NodeComponent {
|
||||
|
||||
@Override
|
||||
public void process() {
|
||||
System.out.println("ECmp executed!");
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isAccess() {
|
||||
return false;
|
||||
}
|
||||
}
|
|
@ -4,4 +4,9 @@
|
|||
<then value="a,b"/>
|
||||
<when value="c,d"/>
|
||||
</chain>
|
||||
|
||||
<chain name="chain2">
|
||||
<when value="e,a" any="true"/>
|
||||
<then value="b"/>
|
||||
</chain>
|
||||
</flow>
|
Loading…
Reference in New Issue