!188 feature #I7HJFX 循环表达式中支持异步模式

Merge pull request !188 from zhhhhy/dev
This commit is contained in:
铂赛东 2023-07-25 03:02:44 +00:00 committed by Gitee
commit bd40dc3c23
No known key found for this signature in database
GPG Key ID: 173E9B9CA92EEF8F
105 changed files with 3269 additions and 185 deletions

View File

@ -78,6 +78,7 @@ public class LiteFlowChainELBuilder {
EXPRESS_RUNNER.addFunctionAndClassMethod(ChainConstant.DO, Object.class, new DoOperator());
EXPRESS_RUNNER.addFunctionAndClassMethod(ChainConstant.BREAK, Object.class, new BreakOperator());
EXPRESS_RUNNER.addFunctionAndClassMethod(ChainConstant.DATA, Object.class, new DataOperator());
EXPRESS_RUNNER.addFunctionAndClassMethod(ChainConstant.PARALLEL, Object.class, new ParallelOperator());
}
public static LiteFlowChainELBuilder createChain() {

View File

@ -0,0 +1,25 @@
package com.yomahub.liteflow.builder.el.operator;
import com.yomahub.liteflow.builder.el.operator.base.BaseOperator;
import com.yomahub.liteflow.builder.el.operator.base.OperatorHelper;
import com.yomahub.liteflow.flow.element.condition.LoopCondition;
/**
* EL规则中的parallel的操作符
*
* @author zhhhhy
* @since 2.11.0
*/
public class ParallelOperator extends BaseOperator<LoopCondition> {
@Override
public LoopCondition build(Object[] objects) throws Exception {
OperatorHelper.checkObjectSizeEqTwo(objects);
LoopCondition loopCondition = OperatorHelper.convert(objects[0], LoopCondition.class);
Boolean parallel = OperatorHelper.convert(objects[1], Boolean.class);
loopCondition.setParallel(parallel);
return loopCondition;
}
}

View File

@ -6,6 +6,7 @@ package com.yomahub.liteflow.common;
* @author tangkc
*/
public interface ChainConstant {
String PARALLEL = "parallel";
String CHAIN = "chain";

View File

@ -6,10 +6,17 @@ import com.yomahub.liteflow.enums.ConditionTypeEnum;
import com.yomahub.liteflow.exception.NoForNodeException;
import com.yomahub.liteflow.flow.element.Executable;
import com.yomahub.liteflow.flow.element.Node;
import com.yomahub.liteflow.flow.parallel.LoopFutureObj;
import com.yomahub.liteflow.slot.DataBus;
import com.yomahub.liteflow.slot.Slot;
import com.yomahub.liteflow.thread.ExecutorHelper;
import com.yomahub.liteflow.util.LiteFlowProxyUtil;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
/**
* 循环次数Condition
*
@ -18,67 +25,92 @@ import com.yomahub.liteflow.util.LiteFlowProxyUtil;
*/
public class ForCondition extends LoopCondition {
@Override
public void executeCondition(Integer slotIndex) throws Exception {
Slot slot = DataBus.getSlot(slotIndex);
Node forNode = this.getForNode();
if (ObjectUtil.isNull(forNode)) {
String errorInfo = StrUtil.format("[{}]:no for-node found", slot.getRequestId());
throw new NoForNodeException(errorInfo);
}
@Override
public void executeCondition(Integer slotIndex) throws Exception {
Slot slot = DataBus.getSlot(slotIndex);
Node forNode = this.getForNode();
if (ObjectUtil.isNull(forNode)) {
String errorInfo = StrUtil.format("[{}]:no for-node found", slot.getRequestId());
throw new NoForNodeException(errorInfo);
}
// 先去判断isAccess方法如果isAccess方法都返回false整个FOR表达式不执行
if (!this.getForNode().isAccess(slotIndex)) {
return;
}
// 先去判断isAccess方法如果isAccess方法都返回false整个FOR表达式不执行
if (!this.getForNode().isAccess(slotIndex)) {
return;
}
// 执行forCount组件
forNode.setCurrChainId(this.getCurrChainId());
forNode.execute(slotIndex);
// 执行forCount组件
forNode.setCurrChainId(this.getCurrChainId());
forNode.execute(slotIndex);
// 获得循环次数
int forCount = forNode.getItemResultMetaValue(slotIndex);
// 获得循环次数
int forCount = forNode.getItemResultMetaValue(slotIndex);
// 获得要循环的可执行对象
Executable executableItem = this.getDoExecutor();
// 获得要循环的可执行对象
Executable executableItem = this.getDoExecutor();
// 获取Break节点
Executable breakItem = this.getBreakItem();
// 获取Break节点
Executable breakItem = this.getBreakItem();
try{
// 循环执行
for (int i = 0; i < forCount; i++) {
executableItem.setCurrChainId(this.getCurrChainId());
// 设置循环index
setLoopIndex(executableItem, i);
executableItem.execute(slotIndex);
// 如果break组件不为空则去执行
if (ObjectUtil.isNotNull(breakItem)) {
breakItem.setCurrChainId(this.getCurrChainId());
setLoopIndex(breakItem, i);
breakItem.execute(slotIndex);
boolean isBreak = breakItem.getItemResultMetaValue(slotIndex);
if (isBreak) {
break;
}
}
}
}finally {
removeLoopIndex(executableItem);
}
}
try {
if (!isParallel()) {
//串行循环执行
for (int i = 0; i < forCount; i++) {
executableItem.setCurrChainId(this.getCurrChainId());
// 设置循环index
setLoopIndex(executableItem, i);
executableItem.execute(slotIndex);
// 如果break组件不为空则去执行
if (ObjectUtil.isNotNull(breakItem)) {
breakItem.setCurrChainId(this.getCurrChainId());
setLoopIndex(breakItem, i);
breakItem.execute(slotIndex);
boolean isBreak = breakItem.getItemResultMetaValue(slotIndex);
if (isBreak) {
break;
}
}
}
}else{
//并行循环执行
//存储所有的并行执行子项的CompletableFuture
List<CompletableFuture<LoopFutureObj>> futureList = new ArrayList<>();
//获取并行循环的线程池
ExecutorService parallelExecutor = ExecutorHelper.loadInstance().buildLoopParallelExecutor();
for (int i = 0; i < forCount; i++){
//提交异步任务
CompletableFuture<LoopFutureObj> future =
CompletableFuture.supplyAsync(new LoopParallelSupplier(executableItem, this.getCurrChainId(), slotIndex, i), parallelExecutor);
futureList.add(future);
if (ObjectUtil.isNotNull(breakItem)) {
breakItem.setCurrChainId(this.getCurrChainId());
setLoopIndex(breakItem, i);
breakItem.execute(slotIndex);
boolean isBreak = breakItem.getItemResultMetaValue(slotIndex);
if (isBreak) {
break;
}
}
}
//等待所有的异步执行完毕
handleFutureList(futureList);
}
} finally {
removeLoopIndex(executableItem);
}
}
@Override
public ConditionTypeEnum getConditionType() {
return ConditionTypeEnum.TYPE_FOR;
}
@Override
public ConditionTypeEnum getConditionType() {
return ConditionTypeEnum.TYPE_FOR;
}
public Node getForNode() {
return (Node) this.getExecutableOne(ConditionKey.FOR_KEY);
}
public Node getForNode() {
return (Node) this.getExecutableOne(ConditionKey.FOR_KEY);
}
public void setForNode(Node forNode) {
this.addExecutable(ConditionKey.FOR_KEY, forNode);
}
public void setForNode(Node forNode) {
this.addExecutable(ConditionKey.FOR_KEY, forNode);
}
}

View File

@ -6,83 +6,118 @@ import com.yomahub.liteflow.enums.ConditionTypeEnum;
import com.yomahub.liteflow.exception.NoIteratorNodeException;
import com.yomahub.liteflow.flow.element.Executable;
import com.yomahub.liteflow.flow.element.Node;
import com.yomahub.liteflow.flow.parallel.LoopFutureObj;
import com.yomahub.liteflow.slot.DataBus;
import com.yomahub.liteflow.slot.Slot;
import com.yomahub.liteflow.thread.ExecutorHelper;
import com.yomahub.liteflow.util.LiteFlowProxyUtil;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
public class IteratorCondition extends LoopCondition {
@Override
public void executeCondition(Integer slotIndex) throws Exception {
Slot slot = DataBus.getSlot(slotIndex);
Node iteratorNode = this.getIteratorNode();
@Override
public void executeCondition(Integer slotIndex) throws Exception {
Slot slot = DataBus.getSlot(slotIndex);
Node iteratorNode = this.getIteratorNode();
if (ObjectUtil.isNull(iteratorNode)) {
String errorInfo = StrUtil.format("[{}]:no iterator-node found", slot.getRequestId());
throw new NoIteratorNodeException(errorInfo);
}
if (ObjectUtil.isNull(iteratorNode)) {
String errorInfo = StrUtil.format("[{}]:no iterator-node found", slot.getRequestId());
throw new NoIteratorNodeException(errorInfo);
}
// 先去判断isAccess方法如果isAccess方法都返回false整个ITERATOR表达式不执行
if (!iteratorNode.isAccess(slotIndex)) {
return;
}
// 先去判断isAccess方法如果isAccess方法都返回false整个ITERATOR表达式不执行
if (!iteratorNode.isAccess(slotIndex)) {
return;
}
// 执行Iterator组件
iteratorNode.setCurrChainId(this.getCurrChainId());
iteratorNode.execute(slotIndex);
// 执行Iterator组件
iteratorNode.setCurrChainId(this.getCurrChainId());
iteratorNode.execute(slotIndex);
Iterator<?> it = iteratorNode.getItemResultMetaValue(slotIndex);
Iterator<?> it = iteratorNode.getItemResultMetaValue(slotIndex);
// 获得要循环的可执行对象
Executable executableItem = this.getDoExecutor();
// 获得要循环的可执行对象
Executable executableItem = this.getDoExecutor();
// 获取Break节点
Executable breakItem = this.getBreakItem();
// 获取Break节点
Executable breakItem = this.getBreakItem();
try{
int index = 0;
while (it.hasNext()) {
Object itObj = it.next();
try {
int index = 0;
if (!this.isParallel()) {
//原本的串行循环执行
while (it.hasNext()) {
Object itObj = it.next();
executableItem.setCurrChainId(this.getCurrChainId());
// 设置循环index
setLoopIndex(executableItem, index);
// 设置循环迭代器对象
setCurrLoopObject(executableItem, itObj);
// 执行可执行对象
executableItem.execute(slotIndex);
// 如果break组件不为空则去执行
if (ObjectUtil.isNotNull(breakItem)) {
breakItem.setCurrChainId(this.getCurrChainId());
setLoopIndex(breakItem, index);
setCurrLoopObject(breakItem, itObj);
breakItem.execute(slotIndex);
boolean isBreak = breakItem.getItemResultMetaValue(slotIndex);
if (isBreak) {
break;
}
}
index++;
}
}finally{
removeLoopIndex(executableItem);
removeCurrLoopObject(executableItem);
}
}
executableItem.setCurrChainId(this.getCurrChainId());
// 设置循环index
setLoopIndex(executableItem, index);
// 设置循环迭代器对象
setCurrLoopObject(executableItem, itObj);
// 执行可执行对象
executableItem.execute(slotIndex);
// 如果break组件不为空则去执行
if (ObjectUtil.isNotNull(breakItem)) {
breakItem.setCurrChainId(this.getCurrChainId());
setLoopIndex(breakItem, index);
setCurrLoopObject(breakItem, itObj);
breakItem.execute(slotIndex);
boolean isBreak = breakItem.getItemResultMetaValue(slotIndex);
if (isBreak) {
break;
}
}
index++;
}
} else {
//并行循环执行
//存储所有的并行执行子项的CompletableFuture
List<CompletableFuture<LoopFutureObj>> futureList = new ArrayList<>();
//获取并行循环的线程池
ExecutorService parallelExecutor = ExecutorHelper.loadInstance().buildLoopParallelExecutor();
while (it.hasNext()) {
Object itObj = it.next();
//提交异步任务
CompletableFuture<LoopFutureObj> future =
CompletableFuture.supplyAsync(new LoopParallelSupplier(executableItem, this.getCurrChainId(), slotIndex, index, itObj), parallelExecutor);
futureList.add(future);
//break判断
if (ObjectUtil.isNotNull(breakItem)) {
breakItem.setCurrChainId(this.getCurrChainId());
setLoopIndex(breakItem, index);
breakItem.execute(slotIndex);
boolean isBreak = breakItem.getItemResultMetaValue(slotIndex);
if (isBreak) {
break;
}
}
index++;
}
//等待所有的异步执行完毕
handleFutureList(futureList);
}
} finally {
removeLoopIndex(executableItem);
removeCurrLoopObject(executableItem);
}
}
@Override
public ConditionTypeEnum getConditionType() {
return ConditionTypeEnum.TYPE_ITERATOR;
}
@Override
public ConditionTypeEnum getConditionType() {
return ConditionTypeEnum.TYPE_ITERATOR;
}
public Node getIteratorNode() {
return (Node) this.getExecutableOne(ConditionKey.ITERATOR_KEY);
}
public Node getIteratorNode() {
return (Node) this.getExecutableOne(ConditionKey.ITERATOR_KEY);
}
public void setIteratorNode(Node iteratorNode) {
this.addExecutable(ConditionKey.ITERATOR_KEY, iteratorNode);
}
public void setIteratorNode(Node iteratorNode) {
this.addExecutable(ConditionKey.ITERATOR_KEY, iteratorNode);
}
}

View File

@ -4,6 +4,11 @@ import com.yomahub.liteflow.flow.element.Chain;
import com.yomahub.liteflow.flow.element.Condition;
import com.yomahub.liteflow.flow.element.Executable;
import com.yomahub.liteflow.flow.element.Node;
import com.yomahub.liteflow.flow.parallel.LoopFutureObj;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.function.Supplier;
/**
* 循环Condition的抽象类 主要继承对象有ForCondition和WhileCondition
@ -12,73 +17,131 @@ import com.yomahub.liteflow.flow.element.Node;
* @since 2.9.0
*/
public abstract class LoopCondition extends Condition {
//判断循环是否并行执行默认为false
private boolean parallel = false;
protected Executable getBreakItem() {
return this.getExecutableOne(ConditionKey.BREAK_KEY);
}
protected Executable getBreakItem() {
return this.getExecutableOne(ConditionKey.BREAK_KEY);
}
public void setBreakItem(Executable breakNode) {
this.addExecutable(ConditionKey.BREAK_KEY, breakNode);
}
public void setBreakItem(Executable breakNode) {
this.addExecutable(ConditionKey.BREAK_KEY, breakNode);
}
protected Executable getDoExecutor() {
return this.getExecutableOne(ConditionKey.DO_KEY);
}
protected Executable getDoExecutor() {
return this.getExecutableOne(ConditionKey.DO_KEY);
}
public void setDoExecutor(Executable executable) {
this.addExecutable(ConditionKey.DO_KEY, executable);
}
public void setDoExecutor(Executable executable) {
this.addExecutable(ConditionKey.DO_KEY, executable);
}
protected void setLoopIndex(Executable executableItem, int index) {
if (executableItem instanceof Chain) {
((Chain) executableItem).getConditionList().forEach(condition -> setLoopIndex(condition, index));
}
else if (executableItem instanceof Condition) {
((Condition) executableItem).getExecutableGroup()
.forEach((key, value) -> value.forEach(executable -> setLoopIndex(executable, index)));
}
else if (executableItem instanceof Node) {
((Node) executableItem).setLoopIndex(index);
}
}
protected void setLoopIndex(Executable executableItem, int index) {
if (executableItem instanceof Chain) {
((Chain) executableItem).getConditionList().forEach(condition -> setLoopIndex(condition, index));
} else if (executableItem instanceof Condition) {
((Condition) executableItem).getExecutableGroup()
.forEach((key, value) -> value.forEach(executable -> setLoopIndex(executable, index)));
} else if (executableItem instanceof Node) {
((Node) executableItem).setLoopIndex(index);
}
}
protected void setCurrLoopObject(Executable executableItem, Object obj) {
if (executableItem instanceof Chain) {
((Chain) executableItem).getConditionList().forEach(condition -> setCurrLoopObject(condition, obj));
}
else if (executableItem instanceof Condition) {
((Condition) executableItem).getExecutableGroup()
.forEach((key, value) -> value.forEach(executable -> setCurrLoopObject(executable, obj)));
}
else if (executableItem instanceof Node) {
((Node) executableItem).setCurrLoopObject(obj);
}
}
protected void setCurrLoopObject(Executable executableItem, Object obj) {
if (executableItem instanceof Chain) {
((Chain) executableItem).getConditionList().forEach(condition -> setCurrLoopObject(condition, obj));
} else if (executableItem instanceof Condition) {
((Condition) executableItem).getExecutableGroup()
.forEach((key, value) -> value.forEach(executable -> setCurrLoopObject(executable, obj)));
} else if (executableItem instanceof Node) {
((Node) executableItem).setCurrLoopObject(obj);
}
}
protected void removeLoopIndex(Executable executableItem){
if (executableItem instanceof Chain) {
((Chain) executableItem).getConditionList().forEach(this::removeLoopIndex);
}
else if (executableItem instanceof Condition) {
((Condition) executableItem).getExecutableGroup()
.forEach((key, value) -> value.forEach(this::removeLoopIndex));
}
else if (executableItem instanceof Node) {
((Node) executableItem).removeLoopIndex();
}
}
protected void removeLoopIndex(Executable executableItem) {
if (executableItem instanceof Chain) {
((Chain) executableItem).getConditionList().forEach(this::removeLoopIndex);
} else if (executableItem instanceof Condition) {
((Condition) executableItem).getExecutableGroup()
.forEach((key, value) -> value.forEach(this::removeLoopIndex));
} else if (executableItem instanceof Node) {
((Node) executableItem).removeLoopIndex();
}
}
protected void removeCurrLoopObject(Executable executableItem){
if (executableItem instanceof Chain) {
((Chain) executableItem).getConditionList().forEach(this::removeCurrLoopObject);
}
else if (executableItem instanceof Condition) {
((Condition) executableItem).getExecutableGroup()
.forEach((key, value) -> value.forEach(this::removeCurrLoopObject));
}
else if (executableItem instanceof Node) {
((Node) executableItem).removeCurrLoopObject();
}
}
protected void removeCurrLoopObject(Executable executableItem) {
if (executableItem instanceof Chain) {
((Chain) executableItem).getConditionList().forEach(this::removeCurrLoopObject);
} else if (executableItem instanceof Condition) {
((Condition) executableItem).getExecutableGroup()
.forEach((key, value) -> value.forEach(this::removeCurrLoopObject));
} else if (executableItem instanceof Node) {
((Node) executableItem).removeCurrLoopObject();
}
}
public boolean isParallel() {
return parallel;
}
public void setParallel(boolean parallel) {
this.parallel = parallel;
}
//循环并行执行的futureList处理
protected void handleFutureList(List<CompletableFuture<LoopFutureObj>> futureList)throws Exception{
CompletableFuture<?> resultCompletableFuture = CompletableFuture.allOf(futureList.toArray(new CompletableFuture[]{}));
resultCompletableFuture.get();
//获取所有的执行结果,如果有失败的那么需要抛出异常
for (CompletableFuture<LoopFutureObj> future : futureList) {
LoopFutureObj loopFutureObj = future.get();
if (!loopFutureObj.isSuccess()) {
throw loopFutureObj.getEx();
}
}
}
// 循环并行执行的Supplier封装
public class LoopParallelSupplier implements Supplier<LoopFutureObj> {
private final Executable executableItem;
private final String currChainId;
private final Integer slotIndex;
private final Integer loopIndex;
private final Object itObj;
public LoopParallelSupplier(Executable executableItem, String currChainId, Integer slotIndex, Integer loopIndex) {
this.executableItem = executableItem;
this.currChainId = currChainId;
this.slotIndex = slotIndex;
this.loopIndex = loopIndex;
this.itObj = null;
}
public LoopParallelSupplier(Executable executableItem, String currChainId, Integer slotIndex, Integer loopIndex, Object itObj) {
this.executableItem = executableItem;
this.currChainId = currChainId;
this.slotIndex = slotIndex;
this.loopIndex = loopIndex;
this.itObj = itObj;
}
@Override
public LoopFutureObj get() {
try {
executableItem.setCurrChainId(this.currChainId);
// 设置循环index
setLoopIndex(executableItem, loopIndex);
//IteratorCondition的情况下需要设置当前循环对象
if(itObj != null){
setCurrLoopObject(executableItem, itObj);
}
executableItem.execute(slotIndex);
return LoopFutureObj.success(executableItem.getId());
} catch (Exception e) {
return LoopFutureObj.fail(executableItem.getId(), e);
}
}
}
}

View File

@ -4,6 +4,13 @@ import cn.hutool.core.util.ObjectUtil;
import com.yomahub.liteflow.enums.ConditionTypeEnum;
import com.yomahub.liteflow.flow.element.Executable;
import com.yomahub.liteflow.flow.element.Node;
import com.yomahub.liteflow.flow.parallel.LoopFutureObj;
import com.yomahub.liteflow.thread.ExecutorHelper;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
/**
* 循环条件Condition
@ -30,21 +37,47 @@ public class WhileCondition extends LoopCondition {
// 循环执行
int index = 0;
while (getWhileResult(slotIndex)) {
executableItem.setCurrChainId(this.getCurrChainId());
setLoopIndex(executableItem, index);
executableItem.execute(slotIndex);
// 如果break组件不为空则去执行
if (ObjectUtil.isNotNull(breakItem)) {
breakItem.setCurrChainId(this.getCurrChainId());
setLoopIndex(breakItem, index);
breakItem.execute(slotIndex);
boolean isBreak = breakItem.getItemResultMetaValue(slotIndex);
if (isBreak) {
break;
if(!this.isParallel()){
//串行循环
while (getWhileResult(slotIndex)) {
executableItem.setCurrChainId(this.getCurrChainId());
setLoopIndex(executableItem, index);
executableItem.execute(slotIndex);
// 如果break组件不为空则去执行
if (ObjectUtil.isNotNull(breakItem)) {
breakItem.setCurrChainId(this.getCurrChainId());
setLoopIndex(breakItem, index);
breakItem.execute(slotIndex);
boolean isBreak = breakItem.getItemResultMetaValue(slotIndex);
if (isBreak) {
break;
}
}
index++;
}
index++;
}else{
//并行循环逻辑
List<CompletableFuture<LoopFutureObj>> futureList = new ArrayList<>();
//获取并行循环的线程池
ExecutorService parallelExecutor = ExecutorHelper.loadInstance().buildLoopParallelExecutor();
while (getWhileResult(slotIndex)){
CompletableFuture<LoopFutureObj> future =
CompletableFuture.supplyAsync(new LoopParallelSupplier(executableItem, this.getCurrChainId(), slotIndex, index), parallelExecutor);
futureList.add(future);
//break判断
if (ObjectUtil.isNotNull(breakItem)) {
breakItem.setCurrChainId(this.getCurrChainId());
setLoopIndex(breakItem, index);
breakItem.execute(slotIndex);
boolean isBreak = breakItem.getItemResultMetaValue(slotIndex);
if (isBreak) {
break;
}
}
index++;
}
//等待所有的异步执行完毕
handleFutureList(futureList);
}
}

View File

@ -0,0 +1,56 @@
package com.yomahub.liteflow.flow.parallel;
/**
* 并行循环各个并行子项的执行结果对象
* 如果该子项执行成功则success为trueex为null
* 否则success为falseex为程序抛出异常
*
* @author zhhhhy
* @since 2.11.0
*/
public class LoopFutureObj {
private String executorName;
private boolean success;
private Exception ex;
public static LoopFutureObj success(String executorName) {
LoopFutureObj result = new LoopFutureObj();
result.setSuccess(true);
result.setExecutorName(executorName);
return result;
}
public static LoopFutureObj fail(String executorName, Exception ex) {
LoopFutureObj result = new LoopFutureObj();
result.setSuccess(false);
result.setExecutorName(executorName);
result.setEx(ex);
return result;
}
public Exception getEx() {
return ex;
}
public String getExecutorName() {
return executorName;
}
public boolean isSuccess() {
return success;
}
public void setEx(Exception ex) {
this.ex = ex;
}
public void setExecutorName(String executorName) {
this.executorName = executorName;
}
public void setSuccess(boolean success) {
this.success = success;
}
}

View File

@ -103,6 +103,15 @@ public class LiteflowConfig {
// 规则文件/脚本文件变更监听
private Boolean enableMonitorFile = Boolean.FALSE;
//并行循环线程池所用class路径
private String parallelLoopExecutorClass;
//使用默认并行循环线程池时最大线程数
private Integer parallelMaxWorkers;
//使用默认并行循环线程池时最大队列数
private Integer parallelQueueLimit;
public Boolean getEnableMonitorFile() {
return enableMonitorFile;
}
@ -409,4 +418,40 @@ public class LiteflowConfig {
public void setWhenMaxWaitTimeUnit(TimeUnit whenMaxWaitTimeUnit) {
this.whenMaxWaitTimeUnit = whenMaxWaitTimeUnit;
}
public Integer getParallelMaxWorkers() {
if(ObjectUtil.isNull(parallelMaxWorkers)){
return 16;
}else{
return parallelMaxWorkers;
}
}
public void setParallelMaxWorkers(Integer parallelMaxWorkers) {
this.parallelMaxWorkers = parallelMaxWorkers;
}
public Integer getParallelQueueLimit() {
if(ObjectUtil.isNull(parallelQueueLimit)){
return 512;
}else{
return parallelQueueLimit;
}
}
public void setParallelQueueLimit(Integer parallelQueueLimit) {
this.parallelQueueLimit = parallelQueueLimit;
}
public String getParallelLoopExecutorClass() {
if (StrUtil.isBlank(parallelLoopExecutorClass)) {
return "com.yomahub.liteflow.thread.LiteFlowDefaultParallelLoopExecutorBuilder";
}
else {
return parallelLoopExecutorClass;
}
}
public void setParallelLoopExecutorClass(String parallelLoopExecutorClass) {
this.parallelLoopExecutorClass = parallelLoopExecutorClass;
}
}

View File

@ -113,6 +113,12 @@ public class ExecutorHelper {
return getExecutorService(clazz);
}
//构造并行循环的线程池
public ExecutorService buildLoopParallelExecutor(){
LiteflowConfig liteflowConfig = LiteflowConfigGetter.get();
return getExecutorService(liteflowConfig.getParallelLoopExecutorClass());
}
/**
* 根据线程执行构建者Class类名获取ExecutorService实例
*/

View File

@ -0,0 +1,27 @@
package com.yomahub.liteflow.thread;
import cn.hutool.core.util.ObjectUtil;
import com.yomahub.liteflow.property.LiteflowConfig;
import com.yomahub.liteflow.property.LiteflowConfigGetter;
import java.util.concurrent.ExecutorService;
/**
* LiteFlow默认的并行循环执行器实现
*
* @author zhhhhy
* @since 2.11.0
*/
public class LiteFlowDefaultParallelLoopExecutorBuilder implements ExecutorBuilder {
@Override
public ExecutorService buildExecutor() {
LiteflowConfig liteflowConfig = LiteflowConfigGetter.get();
// 只有在非spring的场景下liteflowConfig才会为null
if (ObjectUtil.isNull(liteflowConfig)) {
liteflowConfig = new LiteflowConfig();
}
return buildDefaultExecutor(liteflowConfig.getParallelMaxWorkers(), liteflowConfig.getParallelMaxWorkers(),
liteflowConfig.getParallelQueueLimit(), "loop-thead-");
}
}

View File

@ -47,6 +47,9 @@ public class LiteflowAutoConfiguration {
liteflowConfig.setMainExecutorClass(property.getMainExecutorClass());
liteflowConfig.setPrintExecutionLog(property.isPrintExecutionLog());
liteflowConfig.setSubstituteCmpClass(property.getSubstituteCmpClass());
liteflowConfig.setParallelMaxWorkers(property.getParallelMaxWorkers());
liteflowConfig.setParallelQueueLimit(property.getParallelQueueLimit());
liteflowConfig.setParallelLoopExecutorClass(property.getParallelLoopExecutorClass());
return liteflowConfig;
}

View File

@ -70,6 +70,15 @@ public class LiteflowProperty {
// 替补组件的class路径
private String substituteCmpClass;
//并行循环线程池类路径
private String parallelLoopExecutorClass;
//使用默认并行循环线程池时最大线程数
private Integer parallelMaxWorkers;
//使用默认并行循环线程池时最大队列数
private Integer parallelQueueLimit;
public boolean isEnable() {
return enable;
}
@ -219,4 +228,27 @@ public class LiteflowProperty {
this.ruleSourceExtData = ruleSourceExtData;
}
public String getParallelLoopExecutorClass() {
return parallelLoopExecutorClass;
}
public void setParallelLoopExecutorClass(String parallelLoopExecutorClass) {
this.parallelLoopExecutorClass = parallelLoopExecutorClass;
}
public Integer getParallelMaxWorkers() {
return parallelMaxWorkers;
}
public void setParallelMaxWorkers(Integer parallelMaxWorkers) {
this.parallelMaxWorkers = parallelMaxWorkers;
}
public Integer getParallelQueueLimit() {
return parallelQueueLimit;
}
public void setParallelQueueLimit(Integer parallelQueueLimit) {
this.parallelQueueLimit = parallelQueueLimit;
}
}

View File

@ -80,6 +80,14 @@ public class LiteflowProperty {
// 规则文件/脚本文件变更监听
private Boolean enableMonitorFile;
private String parallelLoopExecutorClass;
//使用默认并行循环线程池时最大线程数
private Integer parallelMaxWorkers;
//使用默认并行循环线程池时最大队列数
private Integer parallelQueueLimit;
public Boolean getEnableMonitorFile() {
return enableMonitorFile;
}
@ -257,4 +265,28 @@ public class LiteflowProperty {
public void setWhenMaxWaitTimeUnit(TimeUnit whenMaxWaitTimeUnit) {
this.whenMaxWaitTimeUnit = whenMaxWaitTimeUnit;
}
public String getParallelLoopExecutorClass() {
return parallelLoopExecutorClass;
}
public void setParallelLoopExecutorClass(String parallelLoopExecutorClass) {
this.parallelLoopExecutorClass = parallelLoopExecutorClass;
}
public Integer getParallelMaxWorkers() {
return parallelMaxWorkers;
}
public void setParallelMaxWorkers(Integer parallelMaxWorkers) {
this.parallelMaxWorkers = parallelMaxWorkers;
}
public Integer getParallelQueueLimit() {
return parallelQueueLimit;
}
public void setParallelQueueLimit(Integer parallelQueueLimit) {
this.parallelQueueLimit = parallelQueueLimit;
}
}

View File

@ -48,6 +48,9 @@ public class LiteflowPropertyAutoConfiguration {
liteflowConfig.setPrintExecutionLog(property.isPrintExecutionLog());
liteflowConfig.setSubstituteCmpClass(property.getSubstituteCmpClass());
liteflowConfig.setEnableMonitorFile(property.getEnableMonitorFile());
liteflowConfig.setParallelMaxWorkers(property.getParallelMaxWorkers());
liteflowConfig.setParallelQueueLimit(property.getParallelQueueLimit());
liteflowConfig.setParallelLoopExecutorClass(property.getParallelLoopExecutorClass());
return liteflowConfig;
}

View File

@ -0,0 +1,14 @@
package com.yomahub.liteflow.test.parallelLoop;
import com.yomahub.liteflow.exception.LiteFlowException;
/**
* 用户自定义带状态码的异常
*/
public class CustomStatefulException extends LiteFlowException {
public CustomStatefulException(String code, String message) {
super(code, message);
}
}

View File

@ -0,0 +1,23 @@
package com.yomahub.liteflow.test.parallelLoop;
import cn.hutool.core.util.ObjectUtil;
import com.yomahub.liteflow.property.LiteflowConfig;
import com.yomahub.liteflow.property.LiteflowConfigGetter;
import com.yomahub.liteflow.thread.ExecutorBuilder;
import java.util.concurrent.ExecutorService;
public class CustomThreadExecutor implements ExecutorBuilder {
@Override
public ExecutorService buildExecutor() {
LiteflowConfig liteflowConfig = LiteflowConfigGetter.get();
// 只有在非spring的场景下liteflowConfig才会为null
if (ObjectUtil.isNull(liteflowConfig)) {
liteflowConfig = new LiteflowConfig();
}
return buildDefaultExecutor(liteflowConfig.getParallelMaxWorkers(), liteflowConfig.getParallelMaxWorkers(),
liteflowConfig.getParallelQueueLimit(), "customer-loop-thead-");
}
}

View File

@ -0,0 +1,119 @@
package com.yomahub.liteflow.test.parallelLoop;
import cn.hutool.core.collection.ListUtil;
import com.yomahub.liteflow.core.FlowExecutor;
import com.yomahub.liteflow.exception.LiteFlowException;
import com.yomahub.liteflow.flow.LiteflowResponse;
import com.yomahub.liteflow.slot.DefaultContext;
import com.yomahub.liteflow.test.BaseTest;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.test.context.TestPropertySource;
import org.springframework.test.context.junit.jupiter.SpringExtension;
import javax.annotation.Resource;
import java.util.List;
import java.util.regex.Pattern;
/**
* springboot环境EL异步循环测试
*
* @author zhhhhy
* @since 2.11.0
*/
@ExtendWith(SpringExtension.class)
@TestPropertySource(value = "classpath:/parallelLoop/application.properties")
@SpringBootTest(classes = ParallelLoopELDeclMultiSpringbootTest.class)
@EnableAutoConfiguration
@ComponentScan({ "com.yomahub.liteflow.test.parallelLoop.cmp" })
public class ParallelLoopELDeclMultiSpringbootTest extends BaseTest {
@Resource
private FlowExecutor flowExecutor;
//测试并行FOR循环循环次数直接在el中定义
@Test
public void testParallelLoop1() throws Exception {
LiteflowResponse response = flowExecutor.execute2Resp("chain1", "arg");
Assertions.assertTrue(response.isSuccess());
}
//测试并行FOR循环循环次数由For组件定义
@Test
public void testParallelLoop2() throws Exception {
LiteflowResponse response = flowExecutor.execute2Resp("chain2", "arg");
Assertions.assertTrue(response.isSuccess());
}
//测试并行FOR循环中的BREAK组件能够正常发挥作用
@Test
public void testParallelLoop3() throws Exception {
LiteflowResponse response = flowExecutor.execute2Resp("chain3", "arg");
Assertions.assertTrue(response.isSuccess());
}
//测试并行FOR循环中主线程是否会正常等待所有并行子项完成后再继续执行
@Test
public void testParallelLoop4() throws Exception {
LiteflowResponse response = flowExecutor.execute2Resp("chain4", "arg");
Assertions.assertTrue(response.isSuccess());
}
@Test
//测试并行FOR循环中某个并行子项抛出异常
public void testParallelLoop5() throws Exception {
LiteflowResponse response = flowExecutor.execute2Resp("chain5", "arg");
Assertions.assertFalse(response.isSuccess());
Assertions.assertEquals("300", response.getCode());
Assertions.assertNotNull(response.getCause());
Assertions.assertTrue(response.getCause() instanceof LiteFlowException);
Assertions.assertNotNull(response.getSlot());
}
//并行的条件循环
@Test
public void testParallelLoop6() throws Exception {
LiteflowResponse response = flowExecutor.execute2Resp("chain6", "arg");
Assertions.assertTrue(response.isSuccess());
}
//并行的迭代循环
@Test
public void testParallelLoop7() throws Exception {
List<String> list = ListUtil.toList("1", "2", "3", "4", "5");
LiteflowResponse response = flowExecutor.execute2Resp("chain7", list);
Assertions.assertTrue(response.isSuccess());
}
//测试并行FOR循环中的index
@Test
public void testParallelLoop8() throws Exception {
LiteflowResponse response = flowExecutor.execute2Resp("chain8", "arg");
DefaultContext context = response.getFirstContextBean();
Assertions.assertTrue(response.isSuccess());
String regex = "(?!.*(.).*\\1)[0-4]{5}"; //匹配不包含重复数字的0-4的5位数字
Pattern pattern = Pattern.compile(regex);
//e1,e2,e3分别并行执行5次因此单个循环的顺序可以是任意的
Assertions.assertTrue(pattern.matcher(context.getData("loop_e1")).matches());
Assertions.assertTrue(pattern.matcher(context.getData("loop_e2")).matches());
Assertions.assertTrue(pattern.matcher(context.getData("loop_e3")).matches());
}
//测试自定义线程池配置是否生效
@Test
public void testParallelLoop9() throws Exception {
LiteflowResponse response = flowExecutor.execute2Resp("chain9", "arg");
DefaultContext context = response.getFirstContextBean();
Assertions.assertTrue(response.isSuccess());
Assertions.assertTrue(context.getData("threadName").toString().startsWith("customer-loop-thead"));
}
}

View File

@ -0,0 +1,120 @@
package com.yomahub.liteflow.test.parallelLoop.cmp;
import cn.hutool.core.util.StrUtil;
import com.yomahub.liteflow.annotation.LiteflowComponent;
import com.yomahub.liteflow.annotation.LiteflowMethod;
import com.yomahub.liteflow.core.NodeComponent;
import com.yomahub.liteflow.enums.LiteFlowMethodEnum;
import com.yomahub.liteflow.enums.NodeTypeEnum;
import com.yomahub.liteflow.slot.DefaultContext;
import com.yomahub.liteflow.test.exception.CustomStatefulException;
import java.util.Iterator;
import java.util.List;
@LiteflowComponent
public class CmpConfig {
@LiteflowMethod(value = LiteFlowMethodEnum.PROCESS, nodeId = "a")
public void processA(NodeComponent bindCmp) {
System.out.println("ACmp executed!");
}
@LiteflowMethod(value = LiteFlowMethodEnum.PROCESS, nodeId = "b")
public void processB(NodeComponent bindCmp) {
System.out.println("BCmp executed!");
}
@LiteflowMethod(value = LiteFlowMethodEnum.PROCESS, nodeId = "c")
public void processC(NodeComponent bindCmp) {
System.out.println("CCmp executed!");
}
@LiteflowMethod(value = LiteFlowMethodEnum.PROCESS, nodeId = "d")
public void processD(NodeComponent bindCmp) {
DefaultContext context = bindCmp.getFirstContextBean();
String key = "test";
if (context.hasData(key)) {
int count = context.getData(key);
context.setData(key, ++count);
} else {
context.setData(key, 1);
}
}
@LiteflowMethod(value = LiteFlowMethodEnum.PROCESS, nodeId = "e")
public void processE(NodeComponent bindCmp) {
synchronized (this){
DefaultContext context = bindCmp.getFirstContextBean();
String key = StrUtil.format("{}_{}", "loop", bindCmp.getTag());
if (context.hasData(key)) {
String loopStr = context.getData(key);
String loopStrReturn = StrUtil.format("{}{}", loopStr, bindCmp.getLoopIndex());
context.setData(key, loopStrReturn);
} else {
context.setData(key, bindCmp.getLoopIndex().toString());
}
}
}
@LiteflowMethod(value = LiteFlowMethodEnum.PROCESS, nodeId = "f")
public void processF(NodeComponent bindCmp){
try {
System.out.println("FCmp start to sleep 5s");
Thread.sleep(5000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println("FCmp executed!");
}
@LiteflowMethod(value = LiteFlowMethodEnum.PROCESS, nodeId = "g")
public void processG(NodeComponent bindCmp){
if(bindCmp.getLoopIndex()==1){
throw new CustomStatefulException("300", "chain execute custom stateful execption");
}
System.out.println("GCmp executed!");
}
@LiteflowMethod(value = LiteFlowMethodEnum.PROCESS, nodeId = "h")
public void processH(NodeComponent bindCmp){
DefaultContext context = bindCmp.getFirstContextBean();
context.setData("threadName", Thread.currentThread().getName());
System.out.println("HCmp executed!");
}
@LiteflowMethod(value = LiteFlowMethodEnum.PROCESS_ITERATOR, nodeId = "it", nodeType = NodeTypeEnum.ITERATOR)
public Iterator<?> processIT(NodeComponent bindCmp) {
List<String> list = bindCmp.getRequestData();
return list.iterator();
}
@LiteflowMethod(value = LiteFlowMethodEnum.PROCESS_FOR, nodeId = "x", nodeType = NodeTypeEnum.FOR)
public int processX(NodeComponent bindCmp) {
return 3;
}
@LiteflowMethod(value = LiteFlowMethodEnum.PROCESS_BREAK, nodeId = "y", nodeType = NodeTypeEnum.BREAK)
public boolean processY(NodeComponent bindCmp) {
DefaultContext context = bindCmp.getFirstContextBean();
int count = 0;
if(context.hasData("test")) {
count = context.getData("test");
}
return count > 3;
}
@LiteflowMethod(value = LiteFlowMethodEnum.PROCESS_WHILE, nodeId = "z", nodeType = NodeTypeEnum.WHILE)
public boolean processZ(NodeComponent bindCmp) {
DefaultContext context = bindCmp.getFirstContextBean();
String key = "test";
if (context.hasData(key)) {
int count = context.getData("test");
return count < 5;
}
else {
return true;
}
}
}

View File

@ -0,0 +1,4 @@
liteflow.rule-source=parallelLoop/flow.xml
liteflow.parallel-max-workers = 10
liteflow.parallel-queue-limit = 1024
liteflow.parallel-loop-executor-class =com.yomahub.liteflow.test.parallelLoop.CustomThreadExecutor

View File

@ -0,0 +1,46 @@
<?xml version="1.0" encoding="UTF-8"?>
<flow>
<chain name="chain1">
FOR(2).DO(THEN(a,b,c));
</chain>
<chain name="chain2">
FOR(x).parallel(true).DO(THEN(a,b,c));
</chain>
<chain name="chain3">
FOR(100).parallel(true).DO(THEN(a,b,d)).BREAK(y);
</chain>
<chain name="chain4">
FOR(x).parallel(true).DO(THEN(a,b,f));
</chain>
<chain name="chain5">
FOR(x).parallel(true).DO(THEN(a,b,g));
</chain>
<chain name="chain6">
WHILE(z).parallel(true).DO(THEN(a,d));
</chain>
<chain name="chain7">
ITERATOR(it).parallel(true).DO(THEN(a,b));
</chain>
<chain name="chain8">
FOR(5).parallel(true).DO(
WHEN(
THEN(a,e.tag("e1")),
THEN(c,e.tag("e2")),
THEN(b,e.tag("e3"))
)
);
</chain>
<chain name="chain9">
FOR(x).parallel(true).DO(THEN(a,b,h));
</chain>
</flow>

View File

@ -0,0 +1,14 @@
package com.yomahub.liteflow.test.parallelLoop;
import com.yomahub.liteflow.exception.LiteFlowException;
/**
* 用户自定义带状态码的异常
*/
public class CustomStatefulException extends LiteFlowException {
public CustomStatefulException(String code, String message) {
super(code, message);
}
}

View File

@ -0,0 +1,23 @@
package com.yomahub.liteflow.test.parallelLoop;
import cn.hutool.core.util.ObjectUtil;
import com.yomahub.liteflow.property.LiteflowConfig;
import com.yomahub.liteflow.property.LiteflowConfigGetter;
import com.yomahub.liteflow.thread.ExecutorBuilder;
import java.util.concurrent.ExecutorService;
public class CustomThreadExecutor implements ExecutorBuilder {
@Override
public ExecutorService buildExecutor() {
LiteflowConfig liteflowConfig = LiteflowConfigGetter.get();
// 只有在非spring的场景下liteflowConfig才会为null
if (ObjectUtil.isNull(liteflowConfig)) {
liteflowConfig = new LiteflowConfig();
}
return buildDefaultExecutor(liteflowConfig.getParallelMaxWorkers(), liteflowConfig.getParallelMaxWorkers(),
liteflowConfig.getParallelQueueLimit(), "customer-loop-thead-");
}
}

View File

@ -0,0 +1,118 @@
package com.yomahub.liteflow.test.parallelLoop;
import cn.hutool.core.collection.ListUtil;
import com.yomahub.liteflow.core.FlowExecutor;
import com.yomahub.liteflow.exception.LiteFlowException;
import com.yomahub.liteflow.flow.LiteflowResponse;
import com.yomahub.liteflow.slot.DefaultContext;
import com.yomahub.liteflow.test.BaseTest;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.test.context.TestPropertySource;
import org.springframework.test.context.junit.jupiter.SpringExtension;
import javax.annotation.Resource;
import java.util.List;
import java.util.regex.Pattern;
/**
* springboot环境EL异步循环测试
*
* @author zhhhhy
* @since 2.11.0
*/
@ExtendWith(SpringExtension.class)
@TestPropertySource(value = "classpath:/parallelLoop/application.properties")
@SpringBootTest(classes = ParallelLoopELDeclSpringbootTest.class)
@EnableAutoConfiguration
@ComponentScan({ "com.yomahub.liteflow.test.parallelLoop.cmp" })
public class ParallelLoopELDeclSpringbootTest extends BaseTest {
@Resource
private FlowExecutor flowExecutor;
//测试并行FOR循环循环次数直接在el中定义
@Test
public void testParallelLoop1() throws Exception {
LiteflowResponse response = flowExecutor.execute2Resp("chain1", "arg");
Assertions.assertTrue(response.isSuccess());
}
//测试并行FOR循环循环次数由For组件定义
@Test
public void testParallelLoop2() throws Exception {
LiteflowResponse response = flowExecutor.execute2Resp("chain2", "arg");
Assertions.assertTrue(response.isSuccess());
}
//测试并行FOR循环中的BREAK组件能够正常发挥作用
@Test
public void testParallelLoop3() throws Exception {
LiteflowResponse response = flowExecutor.execute2Resp("chain3", "arg");
Assertions.assertTrue(response.isSuccess());
}
//测试并行FOR循环中主线程是否会正常等待所有并行子项完成后再继续执行
@Test
public void testParallelLoop4() throws Exception {
LiteflowResponse response = flowExecutor.execute2Resp("chain4", "arg");
Assertions.assertTrue(response.isSuccess());
}
@Test
//测试并行FOR循环中某个并行子项抛出异常
public void testParallelLoop5() throws Exception {
LiteflowResponse response = flowExecutor.execute2Resp("chain5", "arg");
Assertions.assertFalse(response.isSuccess());
Assertions.assertEquals("300", response.getCode());
Assertions.assertNotNull(response.getCause());
Assertions.assertTrue(response.getCause() instanceof LiteFlowException);
Assertions.assertNotNull(response.getSlot());
}
//并行的条件循环
@Test
public void testParallelLoop6() throws Exception {
LiteflowResponse response = flowExecutor.execute2Resp("chain6", "arg");
Assertions.assertTrue(response.isSuccess());
}
//并行的迭代循环
@Test
public void testParallelLoop7() throws Exception {
List<String> list = ListUtil.toList("1", "2", "3", "4", "5");
LiteflowResponse response = flowExecutor.execute2Resp("chain7", list);
Assertions.assertTrue(response.isSuccess());
}
//测试并行FOR循环中的index
@Test
public void testParallelLoop8() throws Exception {
LiteflowResponse response = flowExecutor.execute2Resp("chain8", "arg");
DefaultContext context = response.getFirstContextBean();
Assertions.assertTrue(response.isSuccess());
String regex = "(?!.*(.).*\\1)[0-4]{5}"; //匹配不包含重复数字的0-4的5位数字
Pattern pattern = Pattern.compile(regex);
//e1,e2,e3分别并行执行5次因此单个循环的顺序可以是任意的
Assertions.assertTrue(pattern.matcher(context.getData("loop_e1")).matches());
Assertions.assertTrue(pattern.matcher(context.getData("loop_e2")).matches());
Assertions.assertTrue(pattern.matcher(context.getData("loop_e3")).matches());
}
//测试自定义线程池配置是否生效
@Test
public void testParallelLoop9() throws Exception {
LiteflowResponse response = flowExecutor.execute2Resp("chain9", "arg");
DefaultContext context = response.getFirstContextBean();
Assertions.assertTrue(response.isSuccess());
Assertions.assertTrue(context.getData("threadName").toString().startsWith("customer-loop-thead"));
}
}

View File

@ -0,0 +1,23 @@
/**
* <p>Title: liteflow</p>
* <p>Description: 轻量级的组件式流程框架</p>
* @author Bryan.Zhang
* @email weenyc31@163.com
* @Date 2020/4/1
*/
package com.yomahub.liteflow.test.parallelLoop.cmp;
import com.yomahub.liteflow.annotation.LiteflowMethod;
import com.yomahub.liteflow.core.NodeComponent;
import com.yomahub.liteflow.enums.LiteFlowMethodEnum;
import org.springframework.stereotype.Component;
@Component("a")
public class ACmp {
@LiteflowMethod(LiteFlowMethodEnum.PROCESS)
public void process(NodeComponent bindCmp) {
System.out.println("ACmp executed!");
}
}

View File

@ -0,0 +1,23 @@
/**
* <p>Title: liteflow</p>
* <p>Description: 轻量级的组件式流程框架</p>
* @author Bryan.Zhang
* @email weenyc31@163.com
* @Date 2020/4/1
*/
package com.yomahub.liteflow.test.parallelLoop.cmp;
import com.yomahub.liteflow.annotation.LiteflowMethod;
import com.yomahub.liteflow.core.NodeComponent;
import com.yomahub.liteflow.enums.LiteFlowMethodEnum;
import org.springframework.stereotype.Component;
@Component("b")
public class BCmp {
@LiteflowMethod(LiteFlowMethodEnum.PROCESS)
public void process(NodeComponent bindCmp) {
System.out.println("BCmp executed!");
}
}

View File

@ -0,0 +1,23 @@
/**
* <p>Title: liteflow</p>
* <p>Description: 轻量级的组件式流程框架</p>
* @author Bryan.Zhang
* @email weenyc31@163.com
* @Date 2020/4/1
*/
package com.yomahub.liteflow.test.parallelLoop.cmp;
import com.yomahub.liteflow.annotation.LiteflowMethod;
import com.yomahub.liteflow.core.NodeComponent;
import com.yomahub.liteflow.enums.LiteFlowMethodEnum;
import org.springframework.stereotype.Component;
@Component("c")
public class CCmp {
@LiteflowMethod(LiteFlowMethodEnum.PROCESS)
public void process(NodeComponent bindCmp) {
System.out.println("CCmp executed!");
}
}

View File

@ -0,0 +1,32 @@
/**
* <p>Title: liteflow</p>
* <p>Description: 轻量级的组件式流程框架</p>
* @author Bryan.Zhang
* @email weenyc31@163.com
* @Date 2020/4/1
*/
package com.yomahub.liteflow.test.parallelLoop.cmp;
import com.yomahub.liteflow.annotation.LiteflowMethod;
import com.yomahub.liteflow.core.NodeComponent;
import com.yomahub.liteflow.enums.LiteFlowMethodEnum;
import com.yomahub.liteflow.slot.DefaultContext;
import org.springframework.stereotype.Component;
@Component("d")
public class DCmp {
@LiteflowMethod(LiteFlowMethodEnum.PROCESS)
public void process(NodeComponent bindCmp) {
DefaultContext context = bindCmp.getFirstContextBean();
String key = "test";
if (context.hasData(key)) {
int count = context.getData(key);
context.setData(key, ++count);
}
else {
context.setData(key, 1);
}
}
}

View File

@ -0,0 +1,35 @@
/**
* <p>Title: liteflow</p>
* <p>Description: 轻量级的组件式流程框架</p>
* @author Bryan.Zhang
* @email weenyc31@163.com
* @Date 2020/4/1
*/
package com.yomahub.liteflow.test.parallelLoop.cmp;
import cn.hutool.core.util.StrUtil;
import com.yomahub.liteflow.annotation.LiteflowMethod;
import com.yomahub.liteflow.core.NodeComponent;
import com.yomahub.liteflow.enums.LiteFlowMethodEnum;
import com.yomahub.liteflow.slot.DefaultContext;
import org.springframework.stereotype.Component;
@Component("e")
public class ECmp {
//注意与串行的ECmp相比,并行的ECmp的process方法必须保证线程安全
@LiteflowMethod(LiteFlowMethodEnum.PROCESS)
public synchronized void process(NodeComponent bindCmp) {
DefaultContext context = bindCmp.getFirstContextBean();
String key = StrUtil.format("{}_{}", "loop", bindCmp.getTag());
if (context.hasData(key)) {
String loopStr = context.getData(key);
String loopStrReturn = StrUtil.format("{}{}", loopStr, bindCmp.getLoopIndex());
context.setData(key, loopStrReturn);
}
else {
context.setData(key, bindCmp.getLoopIndex().toString());
}
}
}

View File

@ -0,0 +1,22 @@
package com.yomahub.liteflow.test.parallelLoop.cmp;
import com.yomahub.liteflow.annotation.LiteflowMethod;
import com.yomahub.liteflow.core.NodeComponent;
import com.yomahub.liteflow.enums.LiteFlowMethodEnum;
import org.springframework.stereotype.Component;
@Component("f")
public class FCmp {
@LiteflowMethod(LiteFlowMethodEnum.PROCESS)
public void process(NodeComponent bindCmp) {
try {
System.out.println("FCmp start to sleep 5s");
Thread.sleep(5000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println("FCmp executed!");
}
}

View File

@ -0,0 +1,20 @@
package com.yomahub.liteflow.test.parallelLoop.cmp;
import com.yomahub.liteflow.annotation.LiteflowMethod;
import com.yomahub.liteflow.core.NodeComponent;
import com.yomahub.liteflow.enums.LiteFlowMethodEnum;
import com.yomahub.liteflow.test.exception.CustomStatefulException;
import org.springframework.stereotype.Component;
@Component("g")
public class GCmp {
@LiteflowMethod(LiteFlowMethodEnum.PROCESS)
public void process(NodeComponent bindCmp) {
if(bindCmp.getLoopIndex()==1){
throw new CustomStatefulException("300", "chain execute custom stateful execption");
}
System.out.println("GCmp executed!");
}
}

View File

@ -0,0 +1,21 @@
package com.yomahub.liteflow.test.parallelLoop.cmp;
import com.yomahub.liteflow.annotation.LiteflowMethod;
import com.yomahub.liteflow.core.NodeComponent;
import com.yomahub.liteflow.enums.LiteFlowMethodEnum;
import com.yomahub.liteflow.slot.DefaultContext;
import com.yomahub.liteflow.test.exception.CustomStatefulException;
import org.springframework.stereotype.Component;
@Component("h")
public class HCmp {
@LiteflowMethod(LiteFlowMethodEnum.PROCESS)
public void process(NodeComponent bindCmp) {
DefaultContext context = bindCmp.getFirstContextBean();
context.setData("threadName", Thread.currentThread().getName());
System.out.println("HCmp executed!");
}
}

View File

@ -0,0 +1,24 @@
package com.yomahub.liteflow.test.parallelLoop.cmp;
import com.yomahub.liteflow.annotation.LiteflowCmpDefine;
import com.yomahub.liteflow.annotation.LiteflowMethod;
import com.yomahub.liteflow.core.NodeComponent;
import com.yomahub.liteflow.core.NodeIteratorComponent;
import com.yomahub.liteflow.enums.LiteFlowMethodEnum;
import com.yomahub.liteflow.enums.NodeTypeEnum;
import org.springframework.stereotype.Component;
import java.util.Iterator;
import java.util.List;
@Component("it")
@LiteflowCmpDefine(NodeTypeEnum.ITERATOR)
public class ITCmp {
@LiteflowMethod(LiteFlowMethodEnum.PROCESS_ITERATOR)
public Iterator<?> processIterator(NodeComponent bindCmp) throws Exception {
List<String> list = bindCmp.getRequestData();
return list.iterator();
}
}

View File

@ -0,0 +1,17 @@
package com.yomahub.liteflow.test.parallelLoop.cmp;
import com.yomahub.liteflow.annotation.LiteflowComponent;
import com.yomahub.liteflow.annotation.LiteflowMethod;
import com.yomahub.liteflow.core.NodeComponent;
import com.yomahub.liteflow.enums.LiteFlowMethodEnum;
import com.yomahub.liteflow.enums.NodeTypeEnum;
@LiteflowComponent("x")
public class XCmp {
@LiteflowMethod(value = LiteFlowMethodEnum.PROCESS_FOR, nodeType = NodeTypeEnum.FOR)
public int processFor(NodeComponent bindCmp) throws Exception {
return 3;
}
}

View File

@ -0,0 +1,23 @@
package com.yomahub.liteflow.test.parallelLoop.cmp;
import com.yomahub.liteflow.annotation.LiteflowComponent;
import com.yomahub.liteflow.annotation.LiteflowMethod;
import com.yomahub.liteflow.core.NodeComponent;
import com.yomahub.liteflow.enums.LiteFlowMethodEnum;
import com.yomahub.liteflow.enums.NodeTypeEnum;
import com.yomahub.liteflow.slot.DefaultContext;
@LiteflowComponent("y")
public class YCmp {
@LiteflowMethod(value = LiteFlowMethodEnum.PROCESS_BREAK, nodeType = NodeTypeEnum.BREAK)
public boolean processBreak(NodeComponent bindCmp) throws Exception {
DefaultContext context = bindCmp.getFirstContextBean();
int count = 0;
if(context.hasData("test")) {
count = context.getData("test");
}
return count > 3;
}
}

View File

@ -0,0 +1,26 @@
package com.yomahub.liteflow.test.parallelLoop.cmp;
import com.yomahub.liteflow.annotation.LiteflowComponent;
import com.yomahub.liteflow.annotation.LiteflowMethod;
import com.yomahub.liteflow.core.NodeComponent;
import com.yomahub.liteflow.enums.LiteFlowMethodEnum;
import com.yomahub.liteflow.enums.NodeTypeEnum;
import com.yomahub.liteflow.slot.DefaultContext;
@LiteflowComponent("z")
public class ZCmp {
@LiteflowMethod(value = LiteFlowMethodEnum.PROCESS_WHILE, nodeType = NodeTypeEnum.WHILE)
public boolean processWhile(NodeComponent bindCmp) throws Exception {
DefaultContext context = bindCmp.getFirstContextBean();
String key = "test";
if (context.hasData(key)) {
int count = context.getData("test");
return count < 5;
}
else {
return true;
}
}
}

View File

@ -0,0 +1,4 @@
liteflow.rule-source=parallelLoop/flow.xml
liteflow.parallel-max-workers = 10
liteflow.parallel-queue-limit = 1024
liteflow.parallel-loop-executor-class =com.yomahub.liteflow.test.parallelLoop.CustomThreadExecutor

View File

@ -0,0 +1,46 @@
<?xml version="1.0" encoding="UTF-8"?>
<flow>
<chain name="chain1">
FOR(2).parallel(true).DO(THEN(a,b,c));
</chain>
<chain name="chain2">
FOR(x).parallel(true).DO(THEN(a,b,c));
</chain>
<chain name="chain3">
FOR(100).parallel(true).DO(THEN(a,b,d)).BREAK(y);
</chain>
<chain name="chain4">
FOR(x).parallel(true).DO(THEN(a,b,f));
</chain>
<chain name="chain5">
FOR(x).parallel(true).DO(THEN(a,b,g));
</chain>
<chain name="chain6">
WHILE(z).parallel(true).DO(THEN(a,d));
</chain>
<chain name="chain7">
ITERATOR(it).parallel(true).DO(THEN(a,b));
</chain>
<chain name="chain8">
FOR(5).parallel(true).DO(
WHEN(
THEN(a,e.tag("e1")),
THEN(c,e.tag("e2")),
THEN(b,e.tag("e3"))
)
);
</chain>
<chain name="chain9">
FOR(x).parallel(true).DO(THEN(a,b,h));
</chain>
</flow>

View File

@ -0,0 +1,14 @@
package com.yomahub.liteflow.test.parallelLoop;
import com.yomahub.liteflow.exception.LiteFlowException;
/**
* 用户自定义带状态码的异常
*/
public class CustomStatefulException extends LiteFlowException {
public CustomStatefulException(String code, String message) {
super(code, message);
}
}

View File

@ -0,0 +1,23 @@
package com.yomahub.liteflow.test.parallelLoop;
import cn.hutool.core.util.ObjectUtil;
import com.yomahub.liteflow.property.LiteflowConfig;
import com.yomahub.liteflow.property.LiteflowConfigGetter;
import com.yomahub.liteflow.thread.ExecutorBuilder;
import java.util.concurrent.ExecutorService;
public class CustomThreadExecutor implements ExecutorBuilder {
@Override
public ExecutorService buildExecutor() {
LiteflowConfig liteflowConfig = LiteflowConfigGetter.get();
// 只有在非spring的场景下liteflowConfig才会为null
if (ObjectUtil.isNull(liteflowConfig)) {
liteflowConfig = new LiteflowConfig();
}
return buildDefaultExecutor(liteflowConfig.getParallelMaxWorkers(), liteflowConfig.getParallelMaxWorkers(),
liteflowConfig.getParallelQueueLimit(), "customer-loop-thead-");
}
}

View File

@ -0,0 +1,120 @@
package com.yomahub.liteflow.test.parallelLoop;
import cn.hutool.core.collection.ListUtil;
import com.yomahub.liteflow.core.FlowExecutor;
import com.yomahub.liteflow.core.FlowExecutorHolder;
import com.yomahub.liteflow.exception.LiteFlowException;
import com.yomahub.liteflow.flow.LiteflowResponse;
import com.yomahub.liteflow.property.LiteflowConfig;
import com.yomahub.liteflow.slot.DefaultContext;
import com.yomahub.liteflow.test.BaseTest;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import java.util.List;
import java.util.regex.Pattern;
/**
* nospring环境EL异步循环测试
*
* @author zhhhhy
* @since 2.11.0
*/
public class ParallelLoopTest extends BaseTest {
private static FlowExecutor flowExecutor;
@BeforeAll
public static void init() {
LiteflowConfig config = new LiteflowConfig();
config.setRuleSource("parallelLoop/flow.xml");
config.setParallelMaxWorkers(10);
config.setParallelQueueLimit(1024);
config.setParallelLoopExecutorClass("com.yomahub.liteflow.test.parallelLoop.CustomThreadExecutor");
flowExecutor = FlowExecutorHolder.loadInstance(config);
}
//测试并行FOR循环循环次数直接在el中定义
@Test
public void testParallelLoop1() throws Exception {
LiteflowResponse response = flowExecutor.execute2Resp("chain1", "arg");
Assertions.assertTrue(response.isSuccess());
}
//测试并行FOR循环循环次数由For组件定义
@Test
public void testParallelLoop2() throws Exception {
LiteflowResponse response = flowExecutor.execute2Resp("chain2", "arg");
Assertions.assertTrue(response.isSuccess());
}
//测试并行FOR循环中的BREAK组件能够正常发挥作用
@Test
public void testParallelLoop3() throws Exception {
LiteflowResponse response = flowExecutor.execute2Resp("chain3", "arg");
Assertions.assertTrue(response.isSuccess());
}
//测试并行FOR循环中主线程是否会正常等待所有并行子项完成后再继续执行
@Test
public void testParallelLoop4() throws Exception {
LiteflowResponse response = flowExecutor.execute2Resp("chain4", "arg");
Assertions.assertTrue(response.isSuccess());
}
@Test
//测试并行FOR循环中某个并行子项抛出异常
public void testParallelLoop5() throws Exception {
LiteflowResponse response = flowExecutor.execute2Resp("chain5", "arg");
Assertions.assertFalse(response.isSuccess());
Assertions.assertEquals("300", response.getCode());
Assertions.assertNotNull(response.getCause());
Assertions.assertTrue(response.getCause() instanceof LiteFlowException);
Assertions.assertNotNull(response.getSlot());
}
//并行的条件循环
@Test
public void testParallelLoop6() throws Exception {
LiteflowResponse response = flowExecutor.execute2Resp("chain6", "arg");
Assertions.assertTrue(response.isSuccess());
}
//并行的迭代循环
@Test
public void testParallelLoop7() throws Exception {
List<String> list = ListUtil.toList("1", "2", "3", "4", "5");
LiteflowResponse response = flowExecutor.execute2Resp("chain7", list);
Assertions.assertTrue(response.isSuccess());
}
//测试并行FOR循环中的index
@Test
public void testParallelLoop8() throws Exception {
LiteflowResponse response = flowExecutor.execute2Resp("chain8", "arg");
DefaultContext context = response.getFirstContextBean();
Assertions.assertTrue(response.isSuccess());
String regex = "(?!.*(.).*\\1)[0-4]{5}"; //匹配不包含重复数字的0-4的5位数字
Pattern pattern = Pattern.compile(regex);
//e1,e2,e3分别并行执行5次因此单个循环的顺序可以是任意的
Assertions.assertTrue(pattern.matcher(context.getData("loop_e1")).matches());
Assertions.assertTrue(pattern.matcher(context.getData("loop_e2")).matches());
Assertions.assertTrue(pattern.matcher(context.getData("loop_e3")).matches());
}
//测试自定义线程池配置是否生效
@Test
public void testParallelLoop9() throws Exception {
LiteflowResponse response = flowExecutor.execute2Resp("chain9", "arg");
DefaultContext context = response.getFirstContextBean();
Assertions.assertTrue(response.isSuccess());
Assertions.assertTrue(context.getData("threadName").toString().startsWith("customer-loop-thead"));
}
}

View File

@ -0,0 +1,19 @@
/**
* <p>Title: liteflow</p>
* <p>Description: 轻量级的组件式流程框架</p>
* @author Bryan.Zhang
* @email weenyc31@163.com
* @Date 2020/4/1
*/
package com.yomahub.liteflow.test.parallelLoop.cmp;
import com.yomahub.liteflow.core.NodeComponent;
public class ACmp extends NodeComponent{
@Override
public void process() {
System.out.println("ACmp executed!");
}
}

View File

@ -0,0 +1,19 @@
/**
* <p>Title: liteflow</p>
* <p>Description: 轻量级的组件式流程框架</p>
* @author Bryan.Zhang
* @email weenyc31@163.com
* @Date 2020/4/1
*/
package com.yomahub.liteflow.test.parallelLoop.cmp;
import com.yomahub.liteflow.core.NodeComponent;
public class BCmp extends NodeComponent{
@Override
public void process() {
System.out.println("BCmp executed!");
}
}

View File

@ -0,0 +1,19 @@
/**
* <p>Title: liteflow</p>
* <p>Description: 轻量级的组件式流程框架</p>
* @author Bryan.Zhang
* @email weenyc31@163.com
* @Date 2020/4/1
*/
package com.yomahub.liteflow.test.parallelLoop.cmp;
import com.yomahub.liteflow.core.NodeComponent;
public class CCmp extends NodeComponent{
@Override
public void process() {
System.out.println("CCmp executed!");
}
}

View File

@ -0,0 +1,28 @@
/**
* <p>Title: liteflow</p>
* <p>Description: 轻量级的组件式流程框架</p>
* @author Bryan.Zhang
* @email weenyc31@163.com
* @Date 2020/4/1
*/
package com.yomahub.liteflow.test.parallelLoop.cmp;
import com.yomahub.liteflow.core.NodeComponent;
import com.yomahub.liteflow.slot.DefaultContext;
public class DCmp extends NodeComponent{
@Override
public void process() {
DefaultContext context = this.getFirstContextBean();
String key = "test";
if (context.hasData(key)) {
int count = context.getData(key);
context.setData(key, ++count);
}
else {
context.setData(key, 1);
}
}
}

View File

@ -0,0 +1,31 @@
/**
* <p>Title: liteflow</p>
* <p>Description: 轻量级的组件式流程框架</p>
* @author Bryan.Zhang
* @email weenyc31@163.com
* @Date 2020/4/1
*/
package com.yomahub.liteflow.test.parallelLoop.cmp;
import cn.hutool.core.util.StrUtil;
import com.yomahub.liteflow.core.NodeComponent;
import com.yomahub.liteflow.slot.DefaultContext;
public class ECmp extends NodeComponent{
//注意与串行的ECmp相比,并行的ECmp的process方法必须保证线程安全
@Override
public synchronized void process() {
DefaultContext context = this.getFirstContextBean();
String key = StrUtil.format("{}_{}", "loop", this.getTag());
if (context.hasData(key)) {
String loopStr = context.getData(key);
String loopStrReturn = StrUtil.format("{}{}", loopStr, this.getLoopIndex());
context.setData(key, loopStrReturn);
}
else {
context.setData(key, this.getLoopIndex().toString());
}
}
}

View File

@ -0,0 +1,18 @@
package com.yomahub.liteflow.test.parallelLoop.cmp;
import com.yomahub.liteflow.core.NodeComponent;
public class FCmp extends NodeComponent{
@Override
public void process() {
try {
System.out.println("FCmp start to sleep 5s");
Thread.sleep(5000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println("FCmp executed!");
}
}

View File

@ -0,0 +1,16 @@
package com.yomahub.liteflow.test.parallelLoop.cmp;
import com.yomahub.liteflow.core.NodeComponent;
import com.yomahub.liteflow.test.exception.CustomStatefulException;
public class GCmp extends NodeComponent{
@Override
public void process() {
if(this.getLoopIndex()==1){
throw new CustomStatefulException("300", "chain execute custom stateful execption");
}
System.out.println("GCmp executed!");
}
}

View File

@ -0,0 +1,16 @@
package com.yomahub.liteflow.test.parallelLoop.cmp;
import com.yomahub.liteflow.core.NodeComponent;
import com.yomahub.liteflow.slot.DefaultContext;
public class HCmp extends NodeComponent{
@Override
public void process() {
DefaultContext context = this.getFirstContextBean();
context.setData("threadName", Thread.currentThread().getName());
System.out.println("HCmp executed!");
}
}

View File

@ -0,0 +1,15 @@
package com.yomahub.liteflow.test.parallelLoop.cmp;
import com.yomahub.liteflow.core.NodeIteratorComponent;
import java.util.Iterator;
import java.util.List;
public class ITCmp extends NodeIteratorComponent {
@Override
public Iterator<?> processIterator() throws Exception {
List<String> list = this.getRequestData();
return list.iterator();
}
}

View File

@ -0,0 +1,12 @@
package com.yomahub.liteflow.test.parallelLoop.cmp;
import com.yomahub.liteflow.core.NodeForComponent;
public class XCmp extends NodeForComponent {
@Override
public int processFor() throws Exception {
return 3;
}
}

View File

@ -0,0 +1,17 @@
package com.yomahub.liteflow.test.parallelLoop.cmp;
import com.yomahub.liteflow.core.NodeBreakComponent;
import com.yomahub.liteflow.slot.DefaultContext;
public class YCmp extends NodeBreakComponent {
@Override
public boolean processBreak() throws Exception {
DefaultContext context = this.getFirstContextBean();
int count = 0;
if(context.hasData("test")) {
count = context.getData("test");
}
return count > 3;
}
}

View File

@ -0,0 +1,19 @@
package com.yomahub.liteflow.test.parallelLoop.cmp;
import com.yomahub.liteflow.core.NodeWhileComponent;
import com.yomahub.liteflow.slot.DefaultContext;
public class ZCmp extends NodeWhileComponent {
@Override
public boolean processWhile() throws Exception {
DefaultContext context = this.getFirstContextBean();
String key = "test";
if (context.hasData(key)) {
int count = context.getData("test");
return count < 5;
} else {
return true;
}
}
}

View File

@ -0,0 +1,61 @@
<?xml version="1.0" encoding="UTF-8"?>
<flow>
<nodes>
<node id="a" class="com.yomahub.liteflow.test.parallelLoop.cmp.ACmp"/>
<node id="b" class="com.yomahub.liteflow.test.parallelLoop.cmp.BCmp"/>
<node id="c" class="com.yomahub.liteflow.test.parallelLoop.cmp.CCmp"/>
<node id="d" class="com.yomahub.liteflow.test.parallelLoop.cmp.DCmp"/>
<node id="e" class="com.yomahub.liteflow.test.parallelLoop.cmp.ECmp"/>
<node id="f" class="com.yomahub.liteflow.test.parallelLoop.cmp.FCmp"/>
<node id="g" class="com.yomahub.liteflow.test.parallelLoop.cmp.GCmp"/>
<node id="h" class="com.yomahub.liteflow.test.parallelLoop.cmp.HCmp"/>
<node id="it" class="com.yomahub.liteflow.test.parallelLoop.cmp.ITCmp"/>
<node id="x" class="com.yomahub.liteflow.test.parallelLoop.cmp.XCmp"/>
<node id="y" class="com.yomahub.liteflow.test.parallelLoop.cmp.YCmp"/>
<node id="z" class="com.yomahub.liteflow.test.parallelLoop.cmp.ZCmp"/>
</nodes>
<chain name="chain1">
FOR(2).DO(THEN(a,b,c));
</chain>
<chain name="chain2">
FOR(x).parallel(true).DO(THEN(a,b,c));
</chain>
<chain name="chain3">
FOR(100).parallel(true).DO(THEN(a,b,d)).BREAK(y);
</chain>
<chain name="chain4">
FOR(x).parallel(true).DO(THEN(a,b,f));
</chain>
<chain name="chain5">
FOR(x).parallel(true).DO(THEN(a,b,g));
</chain>
<chain name="chain6">
WHILE(z).parallel(true).DO(THEN(a,d));
</chain>
<chain name="chain7">
ITERATOR(it).parallel(true).DO(THEN(a,b));
</chain>
<chain name="chain8">
FOR(5).parallel(true).DO(
WHEN(
THEN(a,e.tag("e1")),
THEN(c,e.tag("e2")),
THEN(b,e.tag("e3"))
)
);
</chain>
<chain name="chain9">
FOR(x).parallel(true).DO(THEN(a,b,h));
</chain>
</flow>

View File

@ -0,0 +1,14 @@
package com.yomahub.liteflow.test.parallelLoop;
import com.yomahub.liteflow.exception.LiteFlowException;
/**
* 用户自定义带状态码的异常
*/
public class CustomStatefulException extends LiteFlowException {
public CustomStatefulException(String code, String message) {
super(code, message);
}
}

View File

@ -0,0 +1,23 @@
package com.yomahub.liteflow.test.parallelLoop;
import cn.hutool.core.util.ObjectUtil;
import com.yomahub.liteflow.property.LiteflowConfig;
import com.yomahub.liteflow.property.LiteflowConfigGetter;
import com.yomahub.liteflow.thread.ExecutorBuilder;
import java.util.concurrent.ExecutorService;
public class CustomThreadExecutor implements ExecutorBuilder {
@Override
public ExecutorService buildExecutor() {
LiteflowConfig liteflowConfig = LiteflowConfigGetter.get();
// 只有在非spring的场景下liteflowConfig才会为null
if (ObjectUtil.isNull(liteflowConfig)) {
liteflowConfig = new LiteflowConfig();
}
return buildDefaultExecutor(liteflowConfig.getParallelMaxWorkers(), liteflowConfig.getParallelMaxWorkers(),
liteflowConfig.getParallelQueueLimit(), "customer-loop-thead-");
}
}

View File

@ -0,0 +1,113 @@
package com.yomahub.liteflow.test.parallelLoop;
import cn.hutool.core.collection.ListUtil;
import com.yomahub.liteflow.core.FlowExecutor;
import com.yomahub.liteflow.exception.LiteFlowException;
import com.yomahub.liteflow.flow.LiteflowResponse;
import com.yomahub.liteflow.slot.DefaultContext;
import com.yomahub.liteflow.test.BaseTest;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.noear.solon.annotation.Inject;
import org.noear.solon.test.SolonJUnit5Extension;
import org.noear.solon.test.annotation.TestPropertySource;
import javax.annotation.Resource;
import java.util.List;
import java.util.regex.Pattern;
/**
* springboot环境EL异步循环测试
*
* @author zhhhhy
* @since 2.11.0
*/
@ExtendWith(SolonJUnit5Extension.class)
@TestPropertySource("classpath:/parallelLoop/application.properties")
public class ParallelLoopELSpringbootTest extends BaseTest {
@Inject
private FlowExecutor flowExecutor;
//测试并行FOR循环循环次数直接在el中定义
@Test
public void testParallelLoop1() throws Exception {
LiteflowResponse response = flowExecutor.execute2Resp("chain1", "arg");
Assertions.assertTrue(response.isSuccess());
}
//测试并行FOR循环循环次数由For组件定义
@Test
public void testParallelLoop2() throws Exception {
LiteflowResponse response = flowExecutor.execute2Resp("chain2", "arg");
Assertions.assertTrue(response.isSuccess());
}
//测试并行FOR循环中的BREAK组件能够正常发挥作用
@Test
public void testParallelLoop3() throws Exception {
LiteflowResponse response = flowExecutor.execute2Resp("chain3", "arg");
Assertions.assertTrue(response.isSuccess());
}
//测试并行FOR循环中主线程是否会正常等待所有并行子项完成后再继续执行
@Test
public void testParallelLoop4() throws Exception {
LiteflowResponse response = flowExecutor.execute2Resp("chain4", "arg");
Assertions.assertTrue(response.isSuccess());
}
@Test
//测试并行FOR循环中某个并行子项抛出异常
public void testParallelLoop5() throws Exception {
LiteflowResponse response = flowExecutor.execute2Resp("chain5", "arg");
Assertions.assertFalse(response.isSuccess());
Assertions.assertEquals("300", response.getCode());
Assertions.assertNotNull(response.getCause());
Assertions.assertTrue(response.getCause() instanceof LiteFlowException);
Assertions.assertNotNull(response.getSlot());
}
//并行的条件循环
@Test
public void testParallelLoop6() throws Exception {
LiteflowResponse response = flowExecutor.execute2Resp("chain6", "arg");
Assertions.assertTrue(response.isSuccess());
}
//并行的迭代循环
@Test
public void testParallelLoop7() throws Exception {
List<String> list = ListUtil.toList("1", "2", "3", "4", "5");
LiteflowResponse response = flowExecutor.execute2Resp("chain7", list);
Assertions.assertTrue(response.isSuccess());
}
//测试并行FOR循环中的index
@Test
public void testParallelLoop8() throws Exception {
LiteflowResponse response = flowExecutor.execute2Resp("chain8", "arg");
DefaultContext context = response.getFirstContextBean();
Assertions.assertTrue(response.isSuccess());
String regex = "(?!.*(.).*\\1)[0-4]{5}"; //匹配不包含重复数字的0-4的5位数字
Pattern pattern = Pattern.compile(regex);
//e1,e2,e3分别并行执行5次因此单个循环的顺序可以是任意的
Assertions.assertTrue(pattern.matcher(context.getData("loop_e1")).matches());
Assertions.assertTrue(pattern.matcher(context.getData("loop_e2")).matches());
Assertions.assertTrue(pattern.matcher(context.getData("loop_e3")).matches());
}
//测试自定义线程池配置是否生效
@Test
public void testParallelLoop9() throws Exception {
LiteflowResponse response = flowExecutor.execute2Resp("chain9", "arg");
DefaultContext context = response.getFirstContextBean();
Assertions.assertTrue(response.isSuccess());
Assertions.assertTrue(context.getData("threadName").toString().startsWith("customer-loop-thead"));
}
}

View File

@ -0,0 +1,21 @@
/**
* <p>Title: liteflow</p>
* <p>Description: 轻量级的组件式流程框架</p>
* @author Bryan.Zhang
* @email weenyc31@163.com
* @Date 2020/4/1
*/
package com.yomahub.liteflow.test.parallelLoop.cmp;
import com.yomahub.liteflow.core.NodeComponent;
import org.noear.solon.annotation.Component;
@Component("a")
public class ACmp extends NodeComponent{
@Override
public void process() {
System.out.println("ACmp executed!");
}
}

View File

@ -0,0 +1,21 @@
/**
* <p>Title: liteflow</p>
* <p>Description: 轻量级的组件式流程框架</p>
* @author Bryan.Zhang
* @email weenyc31@163.com
* @Date 2020/4/1
*/
package com.yomahub.liteflow.test.parallelLoop.cmp;
import com.yomahub.liteflow.core.NodeComponent;
import org.noear.solon.annotation.Component;
@Component("b")
public class BCmp extends NodeComponent{
@Override
public void process() {
System.out.println("BCmp executed!");
}
}

View File

@ -0,0 +1,21 @@
/**
* <p>Title: liteflow</p>
* <p>Description: 轻量级的组件式流程框架</p>
* @author Bryan.Zhang
* @email weenyc31@163.com
* @Date 2020/4/1
*/
package com.yomahub.liteflow.test.parallelLoop.cmp;
import com.yomahub.liteflow.core.NodeComponent;
import org.noear.solon.annotation.Component;
@Component("c")
public class CCmp extends NodeComponent{
@Override
public void process() {
System.out.println("CCmp executed!");
}
}

View File

@ -0,0 +1,30 @@
/**
* <p>Title: liteflow</p>
* <p>Description: 轻量级的组件式流程框架</p>
* @author Bryan.Zhang
* @email weenyc31@163.com
* @Date 2020/4/1
*/
package com.yomahub.liteflow.test.parallelLoop.cmp;
import com.yomahub.liteflow.core.NodeComponent;
import com.yomahub.liteflow.slot.DefaultContext;
import org.noear.solon.annotation.Component;
@Component("d")
public class DCmp extends NodeComponent{
@Override
public void process() {
DefaultContext context = this.getFirstContextBean();
String key = "test";
if (context.hasData(key)) {
int count = context.getData(key);
context.setData(key, ++count);
}
else {
context.setData(key, 1);
}
}
}

View File

@ -0,0 +1,33 @@
/**
* <p>Title: liteflow</p>
* <p>Description: 轻量级的组件式流程框架</p>
* @author Bryan.Zhang
* @email weenyc31@163.com
* @Date 2020/4/1
*/
package com.yomahub.liteflow.test.parallelLoop.cmp;
import cn.hutool.core.util.StrUtil;
import com.yomahub.liteflow.core.NodeComponent;
import com.yomahub.liteflow.slot.DefaultContext;
import org.noear.solon.annotation.Component;
@Component("e")
public class ECmp extends NodeComponent{
//注意与串行的ECmp相比,并行的ECmp的process方法必须保证线程安全
@Override
public synchronized void process() {
DefaultContext context = this.getFirstContextBean();
String key = StrUtil.format("{}_{}", "loop", this.getTag());
if (context.hasData(key)) {
String loopStr = context.getData(key);
String loopStrReturn = StrUtil.format("{}{}", loopStr, this.getLoopIndex());
context.setData(key, loopStrReturn);
}
else {
context.setData(key, this.getLoopIndex().toString());
}
}
}

View File

@ -0,0 +1,20 @@
package com.yomahub.liteflow.test.parallelLoop.cmp;
import com.yomahub.liteflow.core.NodeComponent;
import org.noear.solon.annotation.Component;
@Component("f")
public class FCmp extends NodeComponent{
@Override
public void process() {
try {
System.out.println("FCmp start to sleep 5s");
Thread.sleep(5000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println("FCmp executed!");
}
}

View File

@ -0,0 +1,18 @@
package com.yomahub.liteflow.test.parallelLoop.cmp;
import com.yomahub.liteflow.core.NodeComponent;
import com.yomahub.liteflow.test.exception.CustomStatefulException;
import org.noear.solon.annotation.Component;
@Component("g")
public class GCmp extends NodeComponent{
@Override
public void process() {
if(this.getLoopIndex()==1){
throw new CustomStatefulException("300", "chain execute custom stateful execption");
}
System.out.println("GCmp executed!");
}
}

View File

@ -0,0 +1,18 @@
package com.yomahub.liteflow.test.parallelLoop.cmp;
import com.yomahub.liteflow.core.NodeComponent;
import com.yomahub.liteflow.slot.DefaultContext;
import org.noear.solon.annotation.Component;
@Component("h")
public class HCmp extends NodeComponent{
@Override
public void process() {
DefaultContext context = this.getFirstContextBean();
context.setData("threadName", Thread.currentThread().getName());
System.out.println("HCmp executed!");
}
}

View File

@ -0,0 +1,17 @@
package com.yomahub.liteflow.test.parallelLoop.cmp;
import com.yomahub.liteflow.core.NodeIteratorComponent;
import org.noear.solon.annotation.Component;
import java.util.Iterator;
import java.util.List;
@Component("it")
public class ITCmp extends NodeIteratorComponent {
@Override
public Iterator<?> processIterator() throws Exception {
List<String> list = this.getRequestData();
return list.iterator();
}
}

View File

@ -0,0 +1,14 @@
package com.yomahub.liteflow.test.parallelLoop.cmp;
import com.yomahub.liteflow.core.NodeForComponent;
import org.noear.solon.annotation.Component;
@Component("x")
public class XCmp extends NodeForComponent {
@Override
public int processFor() throws Exception {
return 3;
}
}

View File

@ -0,0 +1,19 @@
package com.yomahub.liteflow.test.parallelLoop.cmp;
import com.yomahub.liteflow.core.NodeBreakComponent;
import com.yomahub.liteflow.slot.DefaultContext;
import org.noear.solon.annotation.Component;
@Component("y")
public class YCmp extends NodeBreakComponent {
@Override
public boolean processBreak() throws Exception {
DefaultContext context = this.getFirstContextBean();
int count = 0;
if(context.hasData("test")) {
count = context.getData("test");
}
return count > 3;
}
}

View File

@ -0,0 +1,21 @@
package com.yomahub.liteflow.test.parallelLoop.cmp;
import com.yomahub.liteflow.core.NodeWhileComponent;
import com.yomahub.liteflow.slot.DefaultContext;
import org.noear.solon.annotation.Component;
@Component("z")
public class ZCmp extends NodeWhileComponent {
@Override
public boolean processWhile() throws Exception {
DefaultContext context = this.getFirstContextBean();
String key = "test";
if (context.hasData(key)) {
int count = context.getData("test");
return count < 5;
} else {
return true;
}
}
}

View File

@ -0,0 +1,4 @@
liteflow.rule-source=parallelLoop/flow.xml
liteflow.parallel-max-workers = 10
liteflow.parallel-queue-limit = 1024
liteflow.parallel-loop-executor-class =com.yomahub.liteflow.test.parallelLoop.CustomThreadExecutor

View File

@ -0,0 +1,46 @@
<?xml version="1.0" encoding="UTF-8"?>
<flow>
<chain name="chain1">
FOR(2).parallel(true).DO(THEN(a,b,c));
</chain>
<chain name="chain2">
FOR(x).parallel(true).DO(THEN(a,b,c));
</chain>
<chain name="chain3">
FOR(100).parallel(true).DO(THEN(a,b,d)).BREAK(y);
</chain>
<chain name="chain4">
FOR(x).parallel(true).DO(THEN(a,b,f));
</chain>
<chain name="chain5">
FOR(x).parallel(true).DO(THEN(a,b,g));
</chain>
<chain name="chain6">
WHILE(z).parallel(true).DO(THEN(a,d));
</chain>
<chain name="chain7">
ITERATOR(it).parallel(true).DO(THEN(a,b));
</chain>
<chain name="chain8">
FOR(5).parallel(true).DO(
WHEN(
THEN(a,e.tag("e1")),
THEN(c,e.tag("e2")),
THEN(b,e.tag("e3"))
)
);
</chain>
<chain name="chain9">
FOR(x).parallel(true).DO(THEN(a,b,h));
</chain>
</flow>

View File

@ -0,0 +1,14 @@
package com.yomahub.liteflow.test.parallelLoop;
import com.yomahub.liteflow.exception.LiteFlowException;
/**
* 用户自定义带状态码的异常
*/
public class CustomStatefulException extends LiteFlowException {
public CustomStatefulException(String code, String message) {
super(code, message);
}
}

View File

@ -0,0 +1,23 @@
package com.yomahub.liteflow.test.parallelLoop;
import cn.hutool.core.util.ObjectUtil;
import com.yomahub.liteflow.property.LiteflowConfig;
import com.yomahub.liteflow.property.LiteflowConfigGetter;
import com.yomahub.liteflow.thread.ExecutorBuilder;
import java.util.concurrent.ExecutorService;
public class CustomThreadExecutor implements ExecutorBuilder {
@Override
public ExecutorService buildExecutor() {
LiteflowConfig liteflowConfig = LiteflowConfigGetter.get();
// 只有在非spring的场景下liteflowConfig才会为null
if (ObjectUtil.isNull(liteflowConfig)) {
liteflowConfig = new LiteflowConfig();
}
return buildDefaultExecutor(liteflowConfig.getParallelMaxWorkers(), liteflowConfig.getParallelMaxWorkers(),
liteflowConfig.getParallelQueueLimit(), "customer-loop-thead-");
}
}

View File

@ -0,0 +1,118 @@
package com.yomahub.liteflow.test.parallelLoop;
import cn.hutool.core.collection.ListUtil;
import com.yomahub.liteflow.core.FlowExecutor;
import com.yomahub.liteflow.exception.LiteFlowException;
import com.yomahub.liteflow.flow.LiteflowResponse;
import com.yomahub.liteflow.slot.DefaultContext;
import com.yomahub.liteflow.test.BaseTest;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.test.context.TestPropertySource;
import org.springframework.test.context.junit.jupiter.SpringExtension;
import javax.annotation.Resource;
import java.util.List;
import java.util.regex.Pattern;
/**
* springboot环境EL异步循环测试
*
* @author zhhhhy
* @since 2.11.0
*/
@ExtendWith(SpringExtension.class)
@TestPropertySource(value = "classpath:/parallelLoop/application.properties")
@SpringBootTest(classes = ParallelLoopELSpringbootTest.class)
@EnableAutoConfiguration
@ComponentScan({ "com.yomahub.liteflow.test.parallelLoop.cmp" })
public class ParallelLoopELSpringbootTest extends BaseTest {
@Resource
private FlowExecutor flowExecutor;
//测试并行FOR循环循环次数直接在el中定义
@Test
public void testParallelLoop1() throws Exception {
LiteflowResponse response = flowExecutor.execute2Resp("chain1", "arg");
Assertions.assertTrue(response.isSuccess());
}
//测试并行FOR循环循环次数由For组件定义
@Test
public void testParallelLoop2() throws Exception {
LiteflowResponse response = flowExecutor.execute2Resp("chain2", "arg");
Assertions.assertTrue(response.isSuccess());
}
//测试并行FOR循环中的BREAK组件能够正常发挥作用
@Test
public void testParallelLoop3() throws Exception {
LiteflowResponse response = flowExecutor.execute2Resp("chain3", "arg");
Assertions.assertTrue(response.isSuccess());
}
//测试并行FOR循环中主线程是否会正常等待所有并行子项完成后再继续执行
@Test
public void testParallelLoop4() throws Exception {
LiteflowResponse response = flowExecutor.execute2Resp("chain4", "arg");
Assertions.assertTrue(response.isSuccess());
}
@Test
//测试并行FOR循环中某个并行子项抛出异常
public void testParallelLoop5() throws Exception {
LiteflowResponse response = flowExecutor.execute2Resp("chain5", "arg");
Assertions.assertFalse(response.isSuccess());
Assertions.assertEquals("300", response.getCode());
Assertions.assertNotNull(response.getCause());
Assertions.assertTrue(response.getCause() instanceof LiteFlowException);
Assertions.assertNotNull(response.getSlot());
}
//并行的条件循环
@Test
public void testParallelLoop6() throws Exception {
LiteflowResponse response = flowExecutor.execute2Resp("chain6", "arg");
Assertions.assertTrue(response.isSuccess());
}
//并行的迭代循环
@Test
public void testParallelLoop7() throws Exception {
List<String> list = ListUtil.toList("1", "2", "3", "4", "5");
LiteflowResponse response = flowExecutor.execute2Resp("chain7", list);
Assertions.assertTrue(response.isSuccess());
}
//测试并行FOR循环中的index
@Test
public void testParallelLoop8() throws Exception {
LiteflowResponse response = flowExecutor.execute2Resp("chain8", "arg");
DefaultContext context = response.getFirstContextBean();
Assertions.assertTrue(response.isSuccess());
String regex = "(?!.*(.).*\\1)[0-4]{5}"; //匹配不包含重复数字的0-4的5位数字
Pattern pattern = Pattern.compile(regex);
//e1,e2,e3分别并行执行5次因此单个循环的顺序可以是任意的
Assertions.assertTrue(pattern.matcher(context.getData("loop_e1")).matches());
Assertions.assertTrue(pattern.matcher(context.getData("loop_e2")).matches());
Assertions.assertTrue(pattern.matcher(context.getData("loop_e3")).matches());
}
//测试自定义线程池配置是否生效
@Test
public void testParallelLoop9() throws Exception {
LiteflowResponse response = flowExecutor.execute2Resp("chain9", "arg");
DefaultContext context = response.getFirstContextBean();
Assertions.assertTrue(response.isSuccess());
Assertions.assertTrue(context.getData("threadName").toString().startsWith("customer-loop-thead"));
}
}

View File

@ -0,0 +1,23 @@
/**
* <p>Title: liteflow</p>
* <p>Description: 轻量级的组件式流程框架</p>
* @author Bryan.Zhang
* @email weenyc31@163.com
* @Date 2020/4/1
*/
package com.yomahub.liteflow.test.parallelLoop.cmp;
import com.yomahub.liteflow.annotation.LiteflowMethod;
import com.yomahub.liteflow.core.NodeComponent;
import com.yomahub.liteflow.enums.LiteFlowMethodEnum;
import org.springframework.stereotype.Component;
@Component("a")
public class ACmp extends NodeComponent{
@Override
public void process() {
System.out.println("ACmp executed!");
}
}

View File

@ -0,0 +1,23 @@
/**
* <p>Title: liteflow</p>
* <p>Description: 轻量级的组件式流程框架</p>
* @author Bryan.Zhang
* @email weenyc31@163.com
* @Date 2020/4/1
*/
package com.yomahub.liteflow.test.parallelLoop.cmp;
import com.yomahub.liteflow.annotation.LiteflowMethod;
import com.yomahub.liteflow.core.NodeComponent;
import com.yomahub.liteflow.enums.LiteFlowMethodEnum;
import org.springframework.stereotype.Component;
@Component("b")
public class BCmp extends NodeComponent{
@Override
public void process() {
System.out.println("BCmp executed!");
}
}

View File

@ -0,0 +1,23 @@
/**
* <p>Title: liteflow</p>
* <p>Description: 轻量级的组件式流程框架</p>
* @author Bryan.Zhang
* @email weenyc31@163.com
* @Date 2020/4/1
*/
package com.yomahub.liteflow.test.parallelLoop.cmp;
import com.yomahub.liteflow.annotation.LiteflowMethod;
import com.yomahub.liteflow.core.NodeComponent;
import com.yomahub.liteflow.enums.LiteFlowMethodEnum;
import org.springframework.stereotype.Component;
@Component("c")
public class CCmp extends NodeComponent{
@Override
public void process() {
System.out.println("CCmp executed!");
}
}

View File

@ -0,0 +1,32 @@
/**
* <p>Title: liteflow</p>
* <p>Description: 轻量级的组件式流程框架</p>
* @author Bryan.Zhang
* @email weenyc31@163.com
* @Date 2020/4/1
*/
package com.yomahub.liteflow.test.parallelLoop.cmp;
import com.yomahub.liteflow.annotation.LiteflowMethod;
import com.yomahub.liteflow.core.NodeComponent;
import com.yomahub.liteflow.enums.LiteFlowMethodEnum;
import com.yomahub.liteflow.slot.DefaultContext;
import org.springframework.stereotype.Component;
@Component("d")
public class DCmp extends NodeComponent{
@Override
public void process() {
DefaultContext context = this.getFirstContextBean();
String key = "test";
if (context.hasData(key)) {
int count = context.getData(key);
context.setData(key, ++count);
}
else {
context.setData(key, 1);
}
}
}

View File

@ -0,0 +1,35 @@
/**
* <p>Title: liteflow</p>
* <p>Description: 轻量级的组件式流程框架</p>
* @author Bryan.Zhang
* @email weenyc31@163.com
* @Date 2020/4/1
*/
package com.yomahub.liteflow.test.parallelLoop.cmp;
import cn.hutool.core.util.StrUtil;
import com.yomahub.liteflow.annotation.LiteflowMethod;
import com.yomahub.liteflow.core.NodeComponent;
import com.yomahub.liteflow.enums.LiteFlowMethodEnum;
import com.yomahub.liteflow.slot.DefaultContext;
import org.springframework.stereotype.Component;
@Component("e")
public class ECmp extends NodeComponent{
//注意与串行的ECmp相比,并行的ECmp的process方法必须保证线程安全
@Override
public synchronized void process() {
DefaultContext context = this.getFirstContextBean();
String key = StrUtil.format("{}_{}", "loop", this.getTag());
if (context.hasData(key)) {
String loopStr = context.getData(key);
String loopStrReturn = StrUtil.format("{}{}", loopStr, this.getLoopIndex());
context.setData(key, loopStrReturn);
}
else {
context.setData(key, this.getLoopIndex().toString());
}
}
}

View File

@ -0,0 +1,22 @@
package com.yomahub.liteflow.test.parallelLoop.cmp;
import com.yomahub.liteflow.annotation.LiteflowMethod;
import com.yomahub.liteflow.core.NodeComponent;
import com.yomahub.liteflow.enums.LiteFlowMethodEnum;
import org.springframework.stereotype.Component;
@Component("f")
public class FCmp extends NodeComponent{
@Override
public void process() {
try {
System.out.println("FCmp start to sleep 5s");
Thread.sleep(5000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println("FCmp executed!");
}
}

View File

@ -0,0 +1,20 @@
package com.yomahub.liteflow.test.parallelLoop.cmp;
import com.yomahub.liteflow.annotation.LiteflowMethod;
import com.yomahub.liteflow.core.NodeComponent;
import com.yomahub.liteflow.enums.LiteFlowMethodEnum;
import com.yomahub.liteflow.test.exception.CustomStatefulException;
import org.springframework.stereotype.Component;
@Component("g")
public class GCmp extends NodeComponent{
@Override
public void process() {
if(this.getLoopIndex()==1){
throw new CustomStatefulException("300", "chain execute custom stateful execption");
}
System.out.println("GCmp executed!");
}
}

View File

@ -0,0 +1,20 @@
package com.yomahub.liteflow.test.parallelLoop.cmp;
import com.yomahub.liteflow.annotation.LiteflowMethod;
import com.yomahub.liteflow.core.NodeComponent;
import com.yomahub.liteflow.enums.LiteFlowMethodEnum;
import com.yomahub.liteflow.slot.DefaultContext;
import org.springframework.stereotype.Component;
@Component("h")
public class HCmp extends NodeComponent{
@Override
public void process() {
DefaultContext context = this.getFirstContextBean();
context.setData("threadName", Thread.currentThread().getName());
System.out.println("HCmp executed!");
}
}

View File

@ -0,0 +1,22 @@
package com.yomahub.liteflow.test.parallelLoop.cmp;
import com.yomahub.liteflow.annotation.LiteflowCmpDefine;
import com.yomahub.liteflow.annotation.LiteflowMethod;
import com.yomahub.liteflow.core.NodeComponent;
import com.yomahub.liteflow.core.NodeIteratorComponent;
import com.yomahub.liteflow.enums.LiteFlowMethodEnum;
import com.yomahub.liteflow.enums.NodeTypeEnum;
import org.springframework.stereotype.Component;
import java.util.Iterator;
import java.util.List;
@Component("it")
public class ITCmp extends NodeIteratorComponent {
@Override
public Iterator<?> processIterator() throws Exception {
List<String> list = this.getRequestData();
return list.iterator();
}
}

View File

@ -0,0 +1,19 @@
package com.yomahub.liteflow.test.parallelLoop.cmp;
import com.yomahub.liteflow.annotation.LiteflowComponent;
import com.yomahub.liteflow.annotation.LiteflowMethod;
import com.yomahub.liteflow.core.NodeComponent;
import com.yomahub.liteflow.core.NodeForComponent;
import com.yomahub.liteflow.enums.LiteFlowMethodEnum;
import com.yomahub.liteflow.enums.NodeTypeEnum;
import org.springframework.stereotype.Component;
@Component("x")
public class XCmp extends NodeForComponent {
@Override
public int processFor() throws Exception {
return 3;
}
}

View File

@ -0,0 +1,24 @@
package com.yomahub.liteflow.test.parallelLoop.cmp;
import com.yomahub.liteflow.annotation.LiteflowComponent;
import com.yomahub.liteflow.annotation.LiteflowMethod;
import com.yomahub.liteflow.core.NodeBreakComponent;
import com.yomahub.liteflow.core.NodeComponent;
import com.yomahub.liteflow.enums.LiteFlowMethodEnum;
import com.yomahub.liteflow.enums.NodeTypeEnum;
import com.yomahub.liteflow.slot.DefaultContext;
import org.springframework.stereotype.Component;
@Component("y")
public class YCmp extends NodeBreakComponent {
@Override
public boolean processBreak() throws Exception {
DefaultContext context = this.getFirstContextBean();
int count = 0;
if(context.hasData("test")) {
count = context.getData("test");
}
return count > 3;
}
}

View File

@ -0,0 +1,26 @@
package com.yomahub.liteflow.test.parallelLoop.cmp;
import com.yomahub.liteflow.annotation.LiteflowComponent;
import com.yomahub.liteflow.annotation.LiteflowMethod;
import com.yomahub.liteflow.core.NodeComponent;
import com.yomahub.liteflow.core.NodeWhileComponent;
import com.yomahub.liteflow.enums.LiteFlowMethodEnum;
import com.yomahub.liteflow.enums.NodeTypeEnum;
import com.yomahub.liteflow.slot.DefaultContext;
import org.springframework.stereotype.Component;
@Component("z")
public class ZCmp extends NodeWhileComponent {
@Override
public boolean processWhile() throws Exception {
DefaultContext context = this.getFirstContextBean();
String key = "test";
if (context.hasData(key)) {
int count = context.getData("test");
return count < 5;
} else {
return true;
}
}
}

View File

@ -0,0 +1,4 @@
liteflow.rule-source=parallelLoop/flow.xml
liteflow.parallel-max-workers = 10
liteflow.parallel-queue-limit = 1024
liteflow.parallel-loop-executor-class =com.yomahub.liteflow.test.parallelLoop.CustomThreadExecutor

View File

@ -0,0 +1,46 @@
<?xml version="1.0" encoding="UTF-8"?>
<flow>
<chain name="chain1">
FOR(2).parallel(true).DO(THEN(a,b,c));
</chain>
<chain name="chain2">
FOR(x).parallel(true).DO(THEN(a,b,c));
</chain>
<chain name="chain3">
FOR(100).parallel(true).DO(THEN(a,b,d)).BREAK(y);
</chain>
<chain name="chain4">
FOR(x).parallel(true).DO(THEN(a,b,f));
</chain>
<chain name="chain5">
FOR(x).parallel(true).DO(THEN(a,b,g));
</chain>
<chain name="chain6">
WHILE(z).parallel(true).DO(THEN(a,d));
</chain>
<chain name="chain7">
ITERATOR(it).parallel(true).DO(THEN(a,b));
</chain>
<chain name="chain8">
FOR(5).parallel(true).DO(
WHEN(
THEN(a,e.tag("e1")),
THEN(c,e.tag("e2")),
THEN(b,e.tag("e3"))
)
);
</chain>
<chain name="chain9">
FOR(x).parallel(true).DO(THEN(a,b,h));
</chain>
</flow>

View File

@ -0,0 +1,14 @@
package com.yomahub.liteflow.test.parallelLoop;
import com.yomahub.liteflow.exception.LiteFlowException;
/**
* 用户自定义带状态码的异常
*/
public class CustomStatefulException extends LiteFlowException {
public CustomStatefulException(String code, String message) {
super(code, message);
}
}

View File

@ -0,0 +1,23 @@
package com.yomahub.liteflow.test.parallelLoop;
import cn.hutool.core.util.ObjectUtil;
import com.yomahub.liteflow.property.LiteflowConfig;
import com.yomahub.liteflow.property.LiteflowConfigGetter;
import com.yomahub.liteflow.thread.ExecutorBuilder;
import java.util.concurrent.ExecutorService;
public class CustomThreadExecutor implements ExecutorBuilder {
@Override
public ExecutorService buildExecutor() {
LiteflowConfig liteflowConfig = LiteflowConfigGetter.get();
// 只有在非spring的场景下liteflowConfig才会为null
if (ObjectUtil.isNull(liteflowConfig)) {
liteflowConfig = new LiteflowConfig();
}
return buildDefaultExecutor(liteflowConfig.getParallelMaxWorkers(), liteflowConfig.getParallelMaxWorkers(),
liteflowConfig.getParallelQueueLimit(), "customer-loop-thead-");
}
}

View File

@ -0,0 +1,114 @@
package com.yomahub.liteflow.test.parallelLoop;
import cn.hutool.core.collection.ListUtil;
import com.yomahub.liteflow.core.FlowExecutor;
import com.yomahub.liteflow.exception.LiteFlowException;
import com.yomahub.liteflow.flow.LiteflowResponse;
import com.yomahub.liteflow.slot.DefaultContext;
import com.yomahub.liteflow.test.BaseTest;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit.jupiter.SpringExtension;
import javax.annotation.Resource;
import java.util.List;
import java.util.regex.Pattern;
/**
* springboot环境EL异步循环测试
*
* @author zhhhhy
* @since 2.11.0
*/
@ExtendWith(SpringExtension.class)
@ContextConfiguration("classpath:/parallelLoop/application.xml")
public class ParallelLoopELSpringTest extends BaseTest {
@Resource
private FlowExecutor flowExecutor;
//测试并行FOR循环循环次数直接在el中定义
@Test
public void testParallelLoop1() throws Exception {
LiteflowResponse response = flowExecutor.execute2Resp("chain1", "arg");
Assertions.assertTrue(response.isSuccess());
}
//测试并行FOR循环循环次数由For组件定义
@Test
public void testParallelLoop2() throws Exception {
LiteflowResponse response = flowExecutor.execute2Resp("chain2", "arg");
Assertions.assertTrue(response.isSuccess());
}
//测试并行FOR循环中的BREAK组件能够正常发挥作用
@Test
public void testParallelLoop3() throws Exception {
LiteflowResponse response = flowExecutor.execute2Resp("chain3", "arg");
Assertions.assertTrue(response.isSuccess());
}
//测试并行FOR循环中主线程是否会正常等待所有并行子项完成后再继续执行
@Test
public void testParallelLoop4() throws Exception {
LiteflowResponse response = flowExecutor.execute2Resp("chain4", "arg");
Assertions.assertTrue(response.isSuccess());
}
@Test
//测试并行FOR循环中某个并行子项抛出异常
public void testParallelLoop5() throws Exception {
LiteflowResponse response = flowExecutor.execute2Resp("chain5", "arg");
Assertions.assertFalse(response.isSuccess());
Assertions.assertEquals("300", response.getCode());
Assertions.assertNotNull(response.getCause());
Assertions.assertTrue(response.getCause() instanceof LiteFlowException);
Assertions.assertNotNull(response.getSlot());
}
//并行的条件循环
@Test
public void testParallelLoop6() throws Exception {
LiteflowResponse response = flowExecutor.execute2Resp("chain6", "arg");
Assertions.assertTrue(response.isSuccess());
}
//并行的迭代循环
@Test
public void testParallelLoop7() throws Exception {
List<String> list = ListUtil.toList("1", "2", "3", "4", "5");
LiteflowResponse response = flowExecutor.execute2Resp("chain7", list);
Assertions.assertTrue(response.isSuccess());
}
//测试并行FOR循环中的index
@Test
public void testParallelLoop8() throws Exception {
LiteflowResponse response = flowExecutor.execute2Resp("chain8", "arg");
DefaultContext context = response.getFirstContextBean();
Assertions.assertTrue(response.isSuccess());
String regex = "(?!.*(.).*\\1)[0-4]{5}"; //匹配不包含重复数字的0-4的5位数字
Pattern pattern = Pattern.compile(regex);
//e1,e2,e3分别并行执行5次因此单个循环的顺序可以是任意的
Assertions.assertTrue(pattern.matcher(context.getData("loop_e1")).matches());
Assertions.assertTrue(pattern.matcher(context.getData("loop_e2")).matches());
Assertions.assertTrue(pattern.matcher(context.getData("loop_e3")).matches());
}
//测试自定义线程池配置是否生效
@Test
public void testParallelLoop9() throws Exception {
LiteflowResponse response = flowExecutor.execute2Resp("chain9", "arg");
DefaultContext context = response.getFirstContextBean();
Assertions.assertTrue(response.isSuccess());
Assertions.assertTrue(context.getData("threadName").toString().startsWith("customer-loop-thead"));
}
}

View File

@ -0,0 +1,21 @@
/**
* <p>Title: liteflow</p>
* <p>Description: 轻量级的组件式流程框架</p>
* @author Bryan.Zhang
* @email weenyc31@163.com
* @Date 2020/4/1
*/
package com.yomahub.liteflow.test.parallelLoop.cmp;
import com.yomahub.liteflow.core.NodeComponent;
import org.springframework.stereotype.Component;
@Component("a")
public class ACmp extends NodeComponent{
@Override
public void process() {
System.out.println("ACmp executed!");
}
}

View File

@ -0,0 +1,21 @@
/**
* <p>Title: liteflow</p>
* <p>Description: 轻量级的组件式流程框架</p>
* @author Bryan.Zhang
* @email weenyc31@163.com
* @Date 2020/4/1
*/
package com.yomahub.liteflow.test.parallelLoop.cmp;
import com.yomahub.liteflow.core.NodeComponent;
import org.springframework.stereotype.Component;
@Component("b")
public class BCmp extends NodeComponent{
@Override
public void process() {
System.out.println("BCmp executed!");
}
}

View File

@ -0,0 +1,21 @@
/**
* <p>Title: liteflow</p>
* <p>Description: 轻量级的组件式流程框架</p>
* @author Bryan.Zhang
* @email weenyc31@163.com
* @Date 2020/4/1
*/
package com.yomahub.liteflow.test.parallelLoop.cmp;
import com.yomahub.liteflow.core.NodeComponent;
import org.springframework.stereotype.Component;
@Component("c")
public class CCmp extends NodeComponent{
@Override
public void process() {
System.out.println("CCmp executed!");
}
}

View File

@ -0,0 +1,30 @@
/**
* <p>Title: liteflow</p>
* <p>Description: 轻量级的组件式流程框架</p>
* @author Bryan.Zhang
* @email weenyc31@163.com
* @Date 2020/4/1
*/
package com.yomahub.liteflow.test.parallelLoop.cmp;
import com.yomahub.liteflow.core.NodeComponent;
import com.yomahub.liteflow.slot.DefaultContext;
import org.springframework.stereotype.Component;
@Component("d")
public class DCmp extends NodeComponent{
@Override
public void process() {
DefaultContext context = this.getFirstContextBean();
String key = "test";
if (context.hasData(key)) {
int count = context.getData(key);
context.setData(key, ++count);
}
else {
context.setData(key, 1);
}
}
}

View File

@ -0,0 +1,33 @@
/**
* <p>Title: liteflow</p>
* <p>Description: 轻量级的组件式流程框架</p>
* @author Bryan.Zhang
* @email weenyc31@163.com
* @Date 2020/4/1
*/
package com.yomahub.liteflow.test.parallelLoop.cmp;
import cn.hutool.core.util.StrUtil;
import com.yomahub.liteflow.core.NodeComponent;
import com.yomahub.liteflow.slot.DefaultContext;
import org.springframework.stereotype.Component;
@Component("e")
public class ECmp extends NodeComponent{
//注意与串行的ECmp相比,并行的ECmp的process方法必须保证线程安全
@Override
public synchronized void process() {
DefaultContext context = this.getFirstContextBean();
String key = StrUtil.format("{}_{}", "loop", this.getTag());
if (context.hasData(key)) {
String loopStr = context.getData(key);
String loopStrReturn = StrUtil.format("{}{}", loopStr, this.getLoopIndex());
context.setData(key, loopStrReturn);
}
else {
context.setData(key, this.getLoopIndex().toString());
}
}
}

View File

@ -0,0 +1,20 @@
package com.yomahub.liteflow.test.parallelLoop.cmp;
import com.yomahub.liteflow.core.NodeComponent;
import org.springframework.stereotype.Component;
@Component("f")
public class FCmp extends NodeComponent{
@Override
public void process() {
try {
System.out.println("FCmp start to sleep 5s");
Thread.sleep(5000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println("FCmp executed!");
}
}

View File

@ -0,0 +1,18 @@
package com.yomahub.liteflow.test.parallelLoop.cmp;
import com.yomahub.liteflow.core.NodeComponent;
import com.yomahub.liteflow.test.exception.CustomStatefulException;
import org.springframework.stereotype.Component;
@Component("g")
public class GCmp extends NodeComponent{
@Override
public void process() {
if(this.getLoopIndex()==1){
throw new CustomStatefulException("300", "chain execute custom stateful execption");
}
System.out.println("GCmp executed!");
}
}

View File

@ -0,0 +1,18 @@
package com.yomahub.liteflow.test.parallelLoop.cmp;
import com.yomahub.liteflow.core.NodeComponent;
import com.yomahub.liteflow.slot.DefaultContext;
import org.springframework.stereotype.Component;
@Component("h")
public class HCmp extends NodeComponent{
@Override
public void process() {
DefaultContext context = this.getFirstContextBean();
context.setData("threadName", Thread.currentThread().getName());
System.out.println("HCmp executed!");
}
}

View File

@ -0,0 +1,17 @@
package com.yomahub.liteflow.test.parallelLoop.cmp;
import com.yomahub.liteflow.core.NodeIteratorComponent;
import org.springframework.stereotype.Component;
import java.util.Iterator;
import java.util.List;
@Component("it")
public class ITCmp extends NodeIteratorComponent {
@Override
public Iterator<?> processIterator() throws Exception {
List<String> list = this.getRequestData();
return list.iterator();
}
}

Some files were not shown because too many files have changed in this diff Show More