嵌套异步迭代测试分支

This commit is contained in:
everywhere.z 2025-05-06 12:24:35 +08:00
parent 51c863def6
commit c7419ec56c
8 changed files with 82 additions and 45 deletions

View File

@ -81,10 +81,38 @@ public class Node implements Executable, Cloneable, Rollbackable{
private TransmittableThreadLocal<Boolean> accessResult = new TransmittableThreadLocal<>();
// 循环下标
private TransmittableThreadLocal<Stack<TupleOf2<Integer, Integer>>> loopIndexTL = new TransmittableThreadLocal<>();
private TransmittableThreadLocal<Stack<Integer>> loopIndexTL = new TransmittableThreadLocal<Stack<Integer>>() {
/**
* 在你提供的这个 TTL 版本中我们重写 public T copy(T parentValue) 方法
* 来实现 Stack 的克隆以确保线程隔离
*/
@Override
public Stack<Integer> copy(Stack<Integer> parentValue) {
if (parentValue == null) {
return null;
}
// 克隆 Stack
return (Stack<Integer>) parentValue.clone();
}
};
// 迭代对象
private TransmittableThreadLocal<Stack<TupleOf2<Integer, Object>>> loopObjectTL = new TransmittableThreadLocal<>();
private TransmittableThreadLocal<Stack<Object>> loopObjectTL = new TransmittableThreadLocal<Stack<Object>>() {
/**
* 在你提供的这个 TTL 版本中我们重写 public T copy(T parentValue) 方法
* 来实现 Stack 的克隆以确保线程隔离
*/
@Override
public Stack<Object> copy(Stack<Object> parentValue) {
if (parentValue == null) {
return null;
}
// 克隆 Stack
return (Stack<Object>) parentValue.clone();
}
};
// 当前slot的index
private TransmittableThreadLocal<Integer> slotIndexTL = new TransmittableThreadLocal<>();
@ -355,19 +383,12 @@ public class Node implements Executable, Cloneable, Rollbackable{
try{
lock4LoopIndex.lock();
if (this.loopIndexTL.get() == null){
Stack<TupleOf2<Integer, Integer>> stack = new Stack<>();
TupleOf2<Integer, Integer> tuple = new TupleOf2<>(condition.hashCode(), index);
stack.push(tuple);
Stack<Integer> stack = new Stack<>();
stack.push(index);
this.loopIndexTL.set(stack);
}else{
Stack<TupleOf2<Integer, Integer>> stack = this.loopIndexTL.get();
TupleOf2<Integer, Integer> thisConditionTuple = stack.stream().filter(tuple -> tuple.getA().equals(condition.hashCode())).findFirst().orElse(null);
if (thisConditionTuple != null){
thisConditionTuple.setB(index);
}else{
TupleOf2<Integer, Integer> tuple = new TupleOf2<>(condition.hashCode(), index);
stack.push(tuple);
}
Stack<Integer> stack = this.loopIndexTL.get();
stack.push(index);
}
}finally {
lock4LoopIndex.unlock();
@ -376,9 +397,9 @@ public class Node implements Executable, Cloneable, Rollbackable{
}
public Integer getLoopIndex() {
Stack<TupleOf2<Integer, Integer>> stack = this.loopIndexTL.get();
Stack<Integer> stack = this.loopIndexTL.get();
if (stack != null){
return stack.peek().getB();
return stack.peek();
}else{
return null;
}
@ -389,9 +410,9 @@ public class Node implements Executable, Cloneable, Rollbackable{
}
public Integer getPreNLoopIndex(int n){
Stack<TupleOf2<Integer, Integer>> stack = this.loopIndexTL.get();
Stack<Integer> stack = this.loopIndexTL.get();
if (stack != null && stack.size() > n){
return stack.elementAt(stack.size() - (n + 1)).getB();
return stack.elementAt(stack.size() - (n + 1));
}else{
return null;
}
@ -400,7 +421,7 @@ public class Node implements Executable, Cloneable, Rollbackable{
public void removeLoopIndex() {
try{
lock4LoopIndex.lock();
Stack<TupleOf2<Integer, Integer>> stack = this.loopIndexTL.get();
Stack<Integer> stack = this.loopIndexTL.get();
if (stack != null){
if (stack.size() > 1){
stack.pop();
@ -420,19 +441,12 @@ public class Node implements Executable, Cloneable, Rollbackable{
try{
lock4LoopObj.lock();
if (this.loopObjectTL.get() == null){
Stack<TupleOf2<Integer, Object>> stack = new Stack<>();
TupleOf2<Integer, Object> tuple = new TupleOf2<>(condition.hashCode(), obj);
stack.push(tuple);
Stack<Object> stack = new Stack<>();
stack.push(obj);
this.loopObjectTL.set(stack);
}else{
Stack<TupleOf2<Integer, Object>> stack = this.loopObjectTL.get();
TupleOf2<Integer, Object> thisConditionTuple = stack.stream().filter(tuple -> tuple.getA().equals(condition.hashCode())).findFirst().orElse(null);
if (thisConditionTuple != null){
thisConditionTuple.setB(obj);
}else{
TupleOf2<Integer, Object> tuple = new TupleOf2<>(condition.hashCode(), obj);
stack.push(tuple);
}
Stack<Object> stack = this.loopObjectTL.get();
stack.push(obj);
}
}finally {
lock4LoopObj.unlock();
@ -440,9 +454,9 @@ public class Node implements Executable, Cloneable, Rollbackable{
}
public <T> T getCurrLoopObject() {
Stack<TupleOf2<Integer, Object>> stack = this.loopObjectTL.get();
Stack<Object> stack = this.loopObjectTL.get();
if (stack != null){
return (T) stack.peek().getB();
return (T) stack.peek();
}else{
return null;
}
@ -453,9 +467,9 @@ public class Node implements Executable, Cloneable, Rollbackable{
}
public <T> T getPreNLoopObject(int n){
Stack<TupleOf2<Integer, Object>> stack = this.loopObjectTL.get();
Stack<Object> stack = this.loopObjectTL.get();
if (stack != null && stack.size() > n){
return (T) stack.elementAt(stack.size() - (n + 1)).getB();
return (T) stack.elementAt(stack.size() - (n + 1));
}else{
return null;
}
@ -464,7 +478,7 @@ public class Node implements Executable, Cloneable, Rollbackable{
public void removeCurrLoopObject() {
try{
lock4LoopObj.lock();
Stack<TupleOf2<Integer, Object>> stack = this.loopObjectTL.get();
Stack<Object> stack = this.loopObjectTL.get();
if (stack != null){
if (stack.size() > 1){
stack.pop();

View File

@ -87,8 +87,7 @@ public class IteratorCondition extends LoopCondition {
//存储所有的并行执行子项的CompletableFuture
List<CompletableFuture<LoopFutureObj>> futureList = new ArrayList<>();
//获取并行循环的线程池
ExecutorService parallelExecutor = ExecutorHelper.loadInstance().buildExecutorService(this, slotIndex
, this.getConditionType());
ExecutorService parallelExecutor = ExecutorHelper.loadInstance().buildExecutorService(this, slotIndex , this.getConditionType());
while (it.hasNext()) {
Object itObj = it.next();
//提交异步任务

View File

@ -1,6 +1,6 @@
package com.yomahub.liteflow.util;
public class TupleOf2<A, B> {
public class TupleOf2<A, B> implements Cloneable{
private A a;
@ -26,4 +26,13 @@ public class TupleOf2<A, B> {
public void setB(B b) {
this.b = b;
}
@Override
@SuppressWarnings("unchecked")
public TupleOf2<A, B> clone() throws CloneNotSupportedException {
TupleOf2<A, B> newObject = (TupleOf2<A, B>)super.clone();
newObject.setA(this.getA());
newObject.setB(this.getB());
return newObject;
}
}

View File

@ -12,7 +12,9 @@ import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.test.context.TestPropertySource;
import javax.annotation.Resource;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
/**
* springboot环境EL常规的例子测试
@ -72,9 +74,13 @@ public class IteratorELSpringbootTest extends BaseTest {
//测试多层迭代异步循环的正确性
@Test
public void testIt5() throws Exception {
for (int i = 0; i < 100; i++) {
LiteflowResponse response = flowExecutor.execute2Resp("chain5");
Assertions.assertTrue(response.isSuccess());
}
DefaultContext context = new DefaultContext();
context.setData("set", new HashSet<>());
context.setData("list1", ListUtil.toList("a", "b", "c"));
context.setData("list2", ListUtil.toList("1", "2", "3", "4"));
LiteflowResponse response = flowExecutor.execute2Resp("chain5",null, context);
Assertions.assertTrue(response.isSuccess());
Set<String> set = context.getData("set");
Assertions.assertEquals(12, set.size());
}
}

View File

@ -6,6 +6,8 @@ import com.yomahub.liteflow.core.NodeComponent;
import com.yomahub.liteflow.slot.DefaultContext;
import org.springframework.stereotype.Component;
import java.util.Set;
@Component("f")
public class FCmp extends NodeComponent {
@ -16,6 +18,9 @@ public class FCmp extends NodeComponent {
if (obj1 == null || obj2 == null) {
throw new RuntimeException("");
}
System.out.println(StrUtil.format("{}{}", obj1, obj2));
String str = StrUtil.format("{}{}", obj1, obj2);
DefaultContext context = this.getFirstContextBean();
Set<String> set = context.getData("set");
set.add(str);
}
}

View File

@ -3,6 +3,7 @@ package com.yomahub.liteflow.test.iterator.cmp;
import cn.hutool.core.collection.ListUtil;
import com.yomahub.liteflow.annotation.LiteflowComponent;
import com.yomahub.liteflow.core.NodeIteratorComponent;
import com.yomahub.liteflow.slot.DefaultContext;
import java.util.Iterator;
import java.util.List;
@ -11,7 +12,8 @@ import java.util.List;
public class X1Cmp extends NodeIteratorComponent {
@Override
public Iterator<?> processIterator() throws Exception {
List<String> list = ListUtil.toList("a", "b", "c");
DefaultContext context = this.getFirstContextBean();
List<String> list = context.getData("list1");
return list.iterator();
}
}

View File

@ -3,6 +3,7 @@ package com.yomahub.liteflow.test.iterator.cmp;
import cn.hutool.core.collection.ListUtil;
import com.yomahub.liteflow.annotation.LiteflowComponent;
import com.yomahub.liteflow.core.NodeIteratorComponent;
import com.yomahub.liteflow.slot.DefaultContext;
import java.util.Iterator;
import java.util.List;
@ -11,7 +12,8 @@ import java.util.List;
public class X2Cmp extends NodeIteratorComponent {
@Override
public Iterator<?> processIterator() throws Exception {
List<String> list = ListUtil.toList("11", "22");
DefaultContext context = this.getFirstContextBean();
List<String> list = context.getData("list2");
return list.iterator();
}
}

View File

@ -53,7 +53,7 @@
<curator.version>5.3.0</curator.version>
<junit.version>5.8.2</junit.version>
<hutool.version>5.8.26</hutool.version>
<transmittable-thread-local.version>2.12.3</transmittable-thread-local.version>
<transmittable-thread-local.version>2.14.5</transmittable-thread-local.version>
<curator-test.version>5.1.0</curator-test.version>
<zkclient.version>0.10</zkclient.version>
<jetcd.version>0.7.3</jetcd.version>