enhancement #I61D1N 解析增加 enable 逻辑,完成 zk 改造
This commit is contained in:
parent
5ec298a3d0
commit
20c4aca0c3
|
@ -151,7 +151,9 @@ public class LiteFlowNodeBuilder {
|
|||
}
|
||||
|
||||
public LiteFlowNodeBuilder setLanguage(String language) {
|
||||
this.node.setLanguage(language);
|
||||
if (StrUtil.isNotBlank(language)){
|
||||
this.node.setLanguage(language);
|
||||
}
|
||||
return this;
|
||||
}
|
||||
|
||||
|
|
|
@ -166,6 +166,7 @@ public class ApolloParseHelper {
|
|||
.setType(NodeTypeEnum.getEnumByCode(nodeSimpleVO.getType()))
|
||||
.setName(nodeSimpleVO.getName())
|
||||
.setScript(nodeSimpleVO.getScript())
|
||||
.setLanguage(nodeSimpleVO.getLanguage())
|
||||
.build();
|
||||
}
|
||||
// 禁用就删除
|
||||
|
|
|
@ -174,6 +174,7 @@ public class EtcdParserHelper {
|
|||
.setType(NodeTypeEnum.getEnumByCode(nodeSimpleVO.getType()))
|
||||
.setName(nodeSimpleVO.getName())
|
||||
.setScript(nodeSimpleVO.getScript())
|
||||
.setLanguage(nodeSimpleVO.getLanguage())
|
||||
.build();
|
||||
}
|
||||
// 禁用就删除
|
||||
|
|
|
@ -1,9 +1,9 @@
|
|||
package com.yomahub.liteflow.parser.zk.util;
|
||||
|
||||
import cn.hutool.core.collection.CollUtil;
|
||||
import cn.hutool.core.collection.CollectionUtil;
|
||||
import cn.hutool.core.collection.ListUtil;
|
||||
import cn.hutool.core.io.file.FileNameUtil;
|
||||
import cn.hutool.core.lang.Pair;
|
||||
import cn.hutool.core.util.StrUtil;
|
||||
import com.yomahub.liteflow.builder.LiteFlowNodeBuilder;
|
||||
import com.yomahub.liteflow.builder.el.LiteFlowChainELBuilder;
|
||||
|
@ -12,6 +12,7 @@ import com.yomahub.liteflow.flow.FlowBus;
|
|||
import com.yomahub.liteflow.parser.helper.NodeConvertHelper;
|
||||
import com.yomahub.liteflow.parser.zk.exception.ZkException;
|
||||
import com.yomahub.liteflow.parser.zk.vo.ZkParserVO;
|
||||
import com.yomahub.liteflow.util.RuleParsePluginUtil;
|
||||
import org.apache.curator.framework.CuratorFramework;
|
||||
import org.apache.curator.framework.CuratorFrameworkFactory;
|
||||
import org.apache.curator.framework.recipes.cache.CuratorCache;
|
||||
|
@ -26,186 +27,178 @@ import java.util.Objects;
|
|||
|
||||
public class ZkParserHelper {
|
||||
|
||||
private static final Logger LOG = LoggerFactory.getLogger(ZkParserHelper.class);
|
||||
private static final Logger LOG = LoggerFactory.getLogger(ZkParserHelper.class);
|
||||
|
||||
private final ZkParserVO zkParserVO;
|
||||
private final ZkParserVO zkParserVO;
|
||||
|
||||
private final CuratorFramework client;
|
||||
private final CuratorFramework client;
|
||||
|
||||
private final String CHAIN_XML_PATTERN = "<chain name=\"{}\">{}</chain>";
|
||||
private final String CHAIN_XML_PATTERN = "<chain name=\"{}\">{}</chain>";
|
||||
|
||||
private final String NODE_XML_PATTERN = "<nodes>{}</nodes>";
|
||||
private final String NODE_XML_PATTERN = "<nodes>{}</nodes>";
|
||||
|
||||
private final String NODE_ITEM_XML_PATTERN = "<node id=\"{}\" name=\"{}\" type=\"{}\"><![CDATA[{}]]></node>";
|
||||
private final String NODE_ITEM_XML_PATTERN = "<node id=\"{}\" name=\"{}\" type=\"{}\"><![CDATA[{}]]></node>";
|
||||
|
||||
private final String NODE_ITEM_XML_WITH_LANGUAGE_PATTERN = "<node id=\"{}\" name=\"{}\" type=\"{}\" language=\"{}\"><![CDATA[{}]]></node>";
|
||||
private final String NODE_ITEM_XML_WITH_LANGUAGE_PATTERN = "<node id=\"{}\" name=\"{}\" type=\"{}\" language=\"{}\"><![CDATA[{}]]></node>";
|
||||
|
||||
private final String XML_PATTERN = "<?xml version=\"1.0\" encoding=\"UTF-8\"?><flow>{}{}</flow>";
|
||||
private final String XML_PATTERN = "<?xml version=\"1.0\" encoding=\"UTF-8\"?><flow>{}{}</flow>";
|
||||
|
||||
public ZkParserHelper(ZkParserVO zkParserVO) {
|
||||
this.zkParserVO = zkParserVO;
|
||||
public ZkParserHelper(ZkParserVO zkParserVO) {
|
||||
this.zkParserVO = zkParserVO;
|
||||
|
||||
try {
|
||||
CuratorFramework client = CuratorFrameworkFactory.newClient(zkParserVO.getConnectStr(),
|
||||
new RetryNTimes(10, 5000));
|
||||
client.start();
|
||||
try {
|
||||
CuratorFramework client = CuratorFrameworkFactory.newClient(zkParserVO.getConnectStr(),
|
||||
new RetryNTimes(10, 5000));
|
||||
client.start();
|
||||
|
||||
this.client = client;
|
||||
}
|
||||
catch (Exception e) {
|
||||
throw new ZkException(e.getMessage());
|
||||
}
|
||||
}
|
||||
this.client = client;
|
||||
} catch (Exception e) {
|
||||
throw new ZkException(e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
public String getContent() {
|
||||
try {
|
||||
// 检查zk上有没有chainPath节点
|
||||
if (client.checkExists().forPath(zkParserVO.getChainPath()) == null) {
|
||||
throw new ZkException(StrUtil.format("zk node[{}] is not exist", zkParserVO.getChainPath()));
|
||||
}
|
||||
public String getContent() {
|
||||
try {
|
||||
// 检查zk上有没有chainPath节点
|
||||
if (client.checkExists().forPath(zkParserVO.getChainPath()) == null) {
|
||||
throw new ZkException(StrUtil.format("zk node[{}] is not exist", zkParserVO.getChainPath()));
|
||||
}
|
||||
|
||||
// 检查chainPath路径下有没有子节点
|
||||
List<String> chainNameList = client.getChildren().forPath(zkParserVO.getChainPath());
|
||||
// 获取chainPath路径下的所有子节点内容List
|
||||
List<String> chainItemContentList = new ArrayList<>();
|
||||
for (String chainName : chainNameList) {
|
||||
String chainData = new String(
|
||||
client.getData().forPath(StrUtil.format("{}/{}", zkParserVO.getChainPath(), chainName)));
|
||||
if (StrUtil.isBlank(chainData)){
|
||||
continue;
|
||||
}
|
||||
chainItemContentList.add(StrUtil.format(CHAIN_XML_PATTERN, chainName, chainData));
|
||||
}
|
||||
// 合并成所有chain的xml内容
|
||||
String chainAllContent = CollUtil.join(chainItemContentList, StrUtil.EMPTY);
|
||||
// 检查chainPath路径下有没有子节点
|
||||
List<String> chainNameList = client.getChildren().forPath(zkParserVO.getChainPath());
|
||||
// 获取chainPath路径下的所有子节点内容List
|
||||
List<String> chainItemContentList = new ArrayList<>();
|
||||
for (String chainName : chainNameList) {
|
||||
RuleParsePluginUtil.ChainDto chainDto = RuleParsePluginUtil.parseChainKey(chainName);
|
||||
String chainData = new String(
|
||||
client.getData().forPath(StrUtil.format("{}/{}", zkParserVO.getChainPath(), chainName)));
|
||||
if (StrUtil.isNotBlank(chainData) && chainDto.isEnable()) {
|
||||
chainItemContentList.add(chainDto.toElXml(chainData));
|
||||
}
|
||||
}
|
||||
// 合并成所有chain的xml内容
|
||||
String chainAllContent = CollUtil.join(chainItemContentList, StrUtil.EMPTY);
|
||||
|
||||
// 检查是否有脚本内容,如果有,进行脚本内容的获取
|
||||
String scriptAllContent = StrUtil.EMPTY;
|
||||
if (hasScript()) {
|
||||
List<String> scriptNodeValueList = client.getChildren().forPath(zkParserVO.getScriptPath());
|
||||
// 检查是否有脚本内容,如果有,进行脚本内容的获取
|
||||
String scriptAllContent = StrUtil.EMPTY;
|
||||
if (hasScript()) {
|
||||
List<String> scriptNodeValueList = client.getChildren().forPath(zkParserVO.getScriptPath());
|
||||
|
||||
List<String> scriptItemContentList = new ArrayList<>();
|
||||
for (String scriptNodeValue : scriptNodeValueList) {
|
||||
NodeConvertHelper.NodeSimpleVO nodeSimpleVO = NodeConvertHelper.convert(scriptNodeValue);
|
||||
if (Objects.isNull(nodeSimpleVO)) {
|
||||
throw new ZkException(StrUtil.format("The name of the zk node is invalid:{}", scriptNodeValue));
|
||||
}
|
||||
String scriptData = new String(client.getData()
|
||||
.forPath(StrUtil.format("{}/{}", zkParserVO.getScriptPath(), scriptNodeValue)));
|
||||
List<String> scriptItemContentList = new ArrayList<>();
|
||||
for (String scriptNodeValue : scriptNodeValueList) {
|
||||
NodeConvertHelper.NodeSimpleVO nodeSimpleVO = NodeConvertHelper.convert(scriptNodeValue);
|
||||
if (Objects.isNull(nodeSimpleVO)) {
|
||||
throw new ZkException(StrUtil.format("The name of the zk node is invalid:{}", scriptNodeValue));
|
||||
}
|
||||
String scriptData = new String(client.getData()
|
||||
.forPath(StrUtil.format("{}/{}", zkParserVO.getScriptPath(), scriptNodeValue)));
|
||||
|
||||
// 有语言类型
|
||||
if (StrUtil.isNotBlank(nodeSimpleVO.getLanguage())) {
|
||||
scriptItemContentList.add(StrUtil.format(NODE_ITEM_XML_WITH_LANGUAGE_PATTERN,
|
||||
nodeSimpleVO.getNodeId(), nodeSimpleVO.getName(), nodeSimpleVO.getType(),
|
||||
nodeSimpleVO.getLanguage(), scriptData));
|
||||
}
|
||||
// 没有语言类型
|
||||
else {
|
||||
scriptItemContentList.add(StrUtil.format(NODE_ITEM_XML_PATTERN, nodeSimpleVO.getNodeId(),
|
||||
nodeSimpleVO.getName(), nodeSimpleVO.getType(), scriptData));
|
||||
}
|
||||
}
|
||||
nodeSimpleVO.setScript(scriptData);
|
||||
scriptItemContentList.add(RuleParsePluginUtil.toScriptXml(nodeSimpleVO));
|
||||
}
|
||||
|
||||
scriptAllContent = StrUtil.format(NODE_XML_PATTERN,
|
||||
CollUtil.join(scriptItemContentList, StrUtil.EMPTY));
|
||||
}
|
||||
scriptAllContent = StrUtil.format(NODE_XML_PATTERN,
|
||||
CollUtil.join(scriptItemContentList, StrUtil.EMPTY));
|
||||
}
|
||||
|
||||
return StrUtil.format(XML_PATTERN, scriptAllContent, chainAllContent);
|
||||
}
|
||||
catch (Exception e) {
|
||||
throw new ZkException(e.getMessage());
|
||||
}
|
||||
}
|
||||
return StrUtil.format(XML_PATTERN, scriptAllContent, chainAllContent);
|
||||
} catch (Exception e) {
|
||||
throw new ZkException(e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
public boolean hasScript() {
|
||||
// 没有配置scriptPath
|
||||
if (StrUtil.isBlank(zkParserVO.getScriptPath())) {
|
||||
return false;
|
||||
}
|
||||
public boolean hasScript() {
|
||||
// 没有配置scriptPath
|
||||
if (StrUtil.isBlank(zkParserVO.getScriptPath())) {
|
||||
return false;
|
||||
}
|
||||
|
||||
try {
|
||||
// 配置了,但是不存在这个节点
|
||||
if (client.checkExists().forPath(zkParserVO.getScriptPath()) == null) {
|
||||
return false;
|
||||
}
|
||||
try {
|
||||
// 配置了,但是不存在这个节点
|
||||
if (client.checkExists().forPath(zkParserVO.getScriptPath()) == null) {
|
||||
return false;
|
||||
}
|
||||
|
||||
// 存在这个节点,但是子节点不存在
|
||||
List<String> chainNameList = client.getChildren().forPath(zkParserVO.getScriptPath());
|
||||
return !CollUtil.isEmpty(chainNameList);
|
||||
}
|
||||
catch (Exception e) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
// 存在这个节点,但是子节点不存在
|
||||
List<String> chainNameList = client.getChildren().forPath(zkParserVO.getScriptPath());
|
||||
return !CollUtil.isEmpty(chainNameList);
|
||||
} catch (Exception e) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 监听 zk 节点
|
||||
*/
|
||||
public void listenZkNode() {
|
||||
// 监听chain
|
||||
CuratorCache cache1 = CuratorCache.build(client, zkParserVO.getChainPath());
|
||||
cache1.start();
|
||||
cache1.listenable().addListener((type, oldData, data) -> {
|
||||
String path = data.getPath();
|
||||
String value = new String(data.getData());
|
||||
if (StrUtil.isBlank(value)) {
|
||||
return;
|
||||
}
|
||||
if (ListUtil.toList(CuratorCacheListener.Type.NODE_CREATED, CuratorCacheListener.Type.NODE_CHANGED)
|
||||
.contains(type)) {
|
||||
LOG.info("starting reload flow config... {} path={} value={},", type.name(), path, value);
|
||||
String chainName = FileNameUtil.getName(path);
|
||||
LiteFlowChainELBuilder.createChain().setChainId(chainName).setEL(value).build();
|
||||
}
|
||||
else if (CuratorCacheListener.Type.NODE_DELETED.equals(type)) {
|
||||
LOG.info("starting reload flow config... delete path={}", path);
|
||||
String chainName = FileNameUtil.getName(path);
|
||||
FlowBus.removeChain(chainName);
|
||||
}
|
||||
});
|
||||
/**
|
||||
* 监听 zk 节点
|
||||
*/
|
||||
public void listenZkNode() {
|
||||
// 监听chain
|
||||
CuratorCache cache1 = CuratorCache.build(client, zkParserVO.getChainPath());
|
||||
cache1.start();
|
||||
cache1.listenable().addListener((type, oldData, data) -> {
|
||||
String path = data.getPath();
|
||||
String value = new String(data.getData());
|
||||
if (StrUtil.isBlank(value)) {
|
||||
return;
|
||||
}
|
||||
if (ListUtil.toList(CuratorCacheListener.Type.NODE_CREATED, CuratorCacheListener.Type.NODE_CHANGED)
|
||||
.contains(type)) {
|
||||
LOG.info("starting reload flow config... {} path={} value={},", type.name(), path, value);
|
||||
String chainName = FileNameUtil.getName(path);
|
||||
Pair<Boolean/*启停*/, String/*id*/> pair = RuleParsePluginUtil.parseIdKey(chainName);
|
||||
String id = pair.getValue();
|
||||
// 如果是启用,就正常更新
|
||||
if (pair.getKey()) {
|
||||
LiteFlowChainELBuilder.createChain().setChainId(id).setEL(value).build();
|
||||
}
|
||||
// 如果是禁用,就删除
|
||||
else {
|
||||
FlowBus.removeChain(id);
|
||||
}
|
||||
} else if (CuratorCacheListener.Type.NODE_DELETED.equals(type)) {
|
||||
LOG.info("starting reload flow config... delete path={}", path);
|
||||
String chainName = FileNameUtil.getName(path);
|
||||
Pair<Boolean/*启停*/, String/*id*/> pair = RuleParsePluginUtil.parseIdKey(chainName);
|
||||
FlowBus.removeChain(pair.getValue());
|
||||
}
|
||||
});
|
||||
|
||||
if (StrUtil.isNotBlank(zkParserVO.getScriptPath())) {
|
||||
// 监听script
|
||||
CuratorCache cache2 = CuratorCache.build(client, zkParserVO.getScriptPath());
|
||||
cache2.start();
|
||||
cache2.listenable().addListener((type, oldData, data) -> {
|
||||
String path = data.getPath();
|
||||
String value = new String(data.getData());
|
||||
if (StrUtil.isBlank(value)) {
|
||||
return;
|
||||
}
|
||||
if (ListUtil.toList(CuratorCacheListener.Type.NODE_CREATED, CuratorCacheListener.Type.NODE_CHANGED)
|
||||
.contains(type)) {
|
||||
LOG.info("starting reload flow config... {} path={} value={},", type.name(), path, value);
|
||||
String scriptNodeValue = FileNameUtil.getName(path);
|
||||
NodeConvertHelper.NodeSimpleVO nodeSimpleVO = NodeConvertHelper.convert(scriptNodeValue);
|
||||
// 有语言类型
|
||||
if (StrUtil.isNotBlank(nodeSimpleVO.getLanguage())) {
|
||||
LiteFlowNodeBuilder.createScriptNode()
|
||||
.setId(nodeSimpleVO.getNodeId())
|
||||
.setType(NodeTypeEnum.getEnumByCode(nodeSimpleVO.getType()))
|
||||
.setName(nodeSimpleVO.getName())
|
||||
.setScript(value)
|
||||
.setLanguage(nodeSimpleVO.getLanguage())
|
||||
.build();
|
||||
}
|
||||
// 没有语言类型
|
||||
else {
|
||||
LiteFlowNodeBuilder.createScriptNode()
|
||||
.setId(nodeSimpleVO.getNodeId())
|
||||
.setType(NodeTypeEnum.getEnumByCode(nodeSimpleVO.getType()))
|
||||
.setName(nodeSimpleVO.getName())
|
||||
.setScript(value)
|
||||
.build();
|
||||
}
|
||||
}
|
||||
else if (CuratorCacheListener.Type.NODE_DELETED.equals(type)) {
|
||||
LOG.info("starting reload flow config... delete path={}", path);
|
||||
String scriptNodeValue = FileNameUtil.getName(path);
|
||||
NodeConvertHelper.NodeSimpleVO nodeSimpleVO = NodeConvertHelper.convert(scriptNodeValue);
|
||||
FlowBus.getNodeMap().remove(nodeSimpleVO.getNodeId());
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
if (StrUtil.isNotBlank(zkParserVO.getScriptPath())) {
|
||||
// 监听script
|
||||
CuratorCache cache2 = CuratorCache.build(client, zkParserVO.getScriptPath());
|
||||
cache2.start();
|
||||
cache2.listenable().addListener((type, oldData, data) -> {
|
||||
String path = data.getPath();
|
||||
String value = new String(data.getData());
|
||||
if (StrUtil.isBlank(value)) {
|
||||
return;
|
||||
}
|
||||
if (ListUtil.toList(CuratorCacheListener.Type.NODE_CREATED, CuratorCacheListener.Type.NODE_CHANGED)
|
||||
.contains(type)) {
|
||||
LOG.info("starting reload flow config... {} path={} value={},", type.name(), path, value);
|
||||
String scriptNodeValue = FileNameUtil.getName(path);
|
||||
NodeConvertHelper.NodeSimpleVO nodeSimpleVO = NodeConvertHelper.convert(scriptNodeValue);
|
||||
|
||||
// 启用就正常更新
|
||||
if (nodeSimpleVO.getEnable()) {
|
||||
LiteFlowNodeBuilder.createScriptNode()
|
||||
.setId(nodeSimpleVO.getNodeId())
|
||||
.setType(NodeTypeEnum.getEnumByCode(nodeSimpleVO.getType()))
|
||||
.setName(nodeSimpleVO.getName())
|
||||
.setScript(nodeSimpleVO.getScript())
|
||||
.setLanguage(nodeSimpleVO.getLanguage())
|
||||
.build();
|
||||
}
|
||||
// 禁用就删除
|
||||
else {
|
||||
FlowBus.getNodeMap().remove(nodeSimpleVO.getNodeId());
|
||||
}
|
||||
} else if (CuratorCacheListener.Type.NODE_DELETED.equals(type)) {
|
||||
LOG.info("starting reload flow config... delete path={}", path);
|
||||
String scriptNodeValue = FileNameUtil.getName(path);
|
||||
NodeConvertHelper.NodeSimpleVO nodeSimpleVO = NodeConvertHelper.convert(scriptNodeValue);
|
||||
FlowBus.getNodeMap().remove(nodeSimpleVO.getNodeId());
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,6 +1,8 @@
|
|||
package com.yomahub.liteflow.test.zookeeper;
|
||||
|
||||
import com.yomahub.liteflow.core.FlowExecutor;
|
||||
import com.yomahub.liteflow.exception.ChainNotFoundException;
|
||||
import com.yomahub.liteflow.flow.FlowBus;
|
||||
import com.yomahub.liteflow.flow.LiteflowResponse;
|
||||
import com.yomahub.liteflow.slot.DefaultContext;
|
||||
import com.yomahub.liteflow.test.BaseTest;
|
||||
|
@ -66,6 +68,10 @@ public class ZkNodeWithXmlELSpringbootTest extends BaseTest {
|
|||
zkClient.createPersistent(chain2Path, true);
|
||||
zkClient.writeData(chain2Path, "THEN(a, b, c, s3);");
|
||||
|
||||
String chain3Path = ZK_CHAIN_PATH + "/chain3:false";
|
||||
zkClient.createPersistent(chain3Path, true);
|
||||
zkClient.writeData(chain3Path, "THEN(a, b, c, s3);");
|
||||
|
||||
String script1Path = ZK_SCRIPT_PATH + "/s1:script:脚本s1:groovy";
|
||||
zkClient.createPersistent(script1Path, true);
|
||||
zkClient.writeData(script1Path, "defaultContext.setData(\"test\",\"hello\");");
|
||||
|
@ -77,6 +83,10 @@ public class ZkNodeWithXmlELSpringbootTest extends BaseTest {
|
|||
String script3Path = ZK_SCRIPT_PATH + "/s3:script:脚本s3";
|
||||
zkClient.createPersistent(script3Path, true);
|
||||
zkClient.writeData(script3Path, "defaultContext.setData(\"test\",\"hello\");");
|
||||
|
||||
String script4Path = ZK_SCRIPT_PATH + "/s4:script:脚本s3:groovy:false";
|
||||
zkClient.createPersistent(script4Path, true);
|
||||
zkClient.writeData(script4Path, "defaultContext.setData(\"test\",\"hello\");");
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -94,6 +104,14 @@ public class ZkNodeWithXmlELSpringbootTest extends BaseTest {
|
|||
DefaultContext context = response.getFirstContextBean();
|
||||
Assertions.assertTrue(response.isSuccess());
|
||||
Assertions.assertEquals("hello", context.getData("test"));
|
||||
|
||||
// 测试 chain 停用
|
||||
Assertions.assertThrows(ChainNotFoundException.class, () -> {
|
||||
throw flowExecutor.execute2Resp("chain3", "arg").getCause();
|
||||
});
|
||||
|
||||
// 测试 script 停用
|
||||
Assertions.assertTrue(!FlowBus.getNodeMap().containsKey("s4"));
|
||||
}
|
||||
|
||||
@AfterAll
|
||||
|
|
Loading…
Reference in New Issue