pcm-participant/client/initialize/aiCluster.go

150 lines
5.1 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package initialize
import (
"fmt"
"github.com/go-resty/resty/v2"
"gitlink.org.cn/JointCloud/pcm-participant-ai/platform"
"gitlink.org.cn/JointCloud/pcm-participant-ai/service"
"gitlink.org.cn/JointCloud/pcm-participant-client/common/types"
"gitlink.org.cn/JointCloud/pcm-participant-client/common/utils"
"gitlink.org.cn/JointCloud/pcm-participant-client/config"
octopus "gitlink.org.cn/JointCloud/pcm-participant-octopus"
"strconv"
openI "gitlink.org.cn/JointCloud/pcm-participant-openi"
"gitlink.org.cn/JointCloud/pcm-participant-openi/common"
"go.uber.org/zap"
)
// GetAllAIClusterInfos 获取所有集群信息
func GetAllAIClusterInfos() map[string]types.ClusterInfo {
result := make(map[string]types.ClusterInfo)
clusterInfos.Range(func(key, value interface{}) bool {
result[key.(string)] = value.(types.ClusterInfo)
return true
})
return result
}
func InitAICluster(cfg *config.Server) (*service.Service, error) {
client := utils.InitClient(cfg.PcmCore.CoordinatorHost, "")
return initAISvcs(client, cfg.PcmCore)
}
// 初始化智算集群连接池
func initAISvcs(client *utils.RestyClient, core config.PcmCore) (*service.Service, error) {
resp := types.ResultResp{}
_, err := client.Request(core.CoordinatorHost+core.AIClusterList, "GET", func(req *resty.Request) {
req.SetResult(&resp)
})
if err != nil {
return nil, fmt.Errorf("获取集群列表失败: %w", err)
}
if resp.Code != 200 {
return nil, fmt.Errorf("API返回错误: %d, 消息: %s", resp.Code, resp.Msg)
}
var platforms []platform.IPlatform
var svc *service.Service
//var octopus octopus.Octopus
for _, cluster := range resp.Data.List {
//理论上集群的认证以及地址等详细信息应该在系统界面中来进行维护服务启动时需要拿到C端数据库中的集群ID
//在服务启动时根据配置文件中的ID列表 查询C端数据库中的集群信息
//拿到集群列表之后遍历集群
/*判断集群的状态,
如果该集群的状态为离线:
修改集群的server地址为本服务的地址修改状态为在线刷新修改时间
如果该集群的状态为在线且集群的P端地址信息不是本服务的地址
跳过代理该集群服务的初始化,提示该集群已经被其他服务代理
如果该集群的状态为在线且集群的P端地址信息为本服务的地址
跳过代理该集群服务的初始化,更新集群信息的修改时间
*/
if cluster.Status == "offline" {
// 修改集群的server地址为本服务的地址修改状态为在线
cluster.Server = core.ParticipantHost
cluster.Status = "online"
updateClusterReq := types.ClusterCreateReq{
Id: cluster.Id,
AdapterId: strconv.FormatInt(cluster.AdapterId, 10),
Name: cluster.Name,
Nickname: cluster.Nickname,
Description: cluster.Description,
Server: cluster.Server,
MonitorServer: cluster.MonitorServer,
Username: cluster.Username,
Password: cluster.Password,
Token: cluster.Token,
Ak: cluster.Ak,
Sk: cluster.Sk,
RegionName: cluster.Region,
ProjectId: cluster.ProjectId,
Version: cluster.Version,
Label: cluster.Label,
OwnerId: cluster.OwnerId,
AuthType: cluster.AuthType,
ProducerDict: cluster.ProducerDict,
RegionDict: cluster.RegionDict,
Status: cluster.Status,
}
updateResp := types.ResultResp{}
_, err := client.Request(core.CoordinatorHost+"/pcm/v1/adapter/cluster/update", "PUT", func(req *resty.Request) {
req.SetBody(updateClusterReq).SetResult(&updateResp) // 添加请求体
})
if err != nil {
zap.L().Error("更新集群状态失败", zap.Error(err))
continue
}
if updateResp.Code != 200 {
zap.L().Error("更新集群状态API返回错误",
zap.Int("code", updateResp.Code),
zap.String("message", updateResp.Msg))
continue
}
} else if cluster.Status == "online" {
if cluster.Server != core.ParticipantHost {
zap.L().Warn("集群已被其他服务代理",
zap.String("cluster_id", cluster.Id),
zap.String("当前服务地址", core.ParticipantHost),
zap.String("集群记录地址", cluster.Server))
continue
} else {
// 更新集群信息的修改时间
_, err := client.Request(core.CoordinatorHost+"/pcm/v1/adapter/cluster/update", "PUT", func(req *resty.Request) {
req.SetBody(cluster)
})
if err != nil {
zap.L().Error("刷新集群时间失败", zap.Error(err))
}
}
}
if cluster.Id == "" {
zap.L().Warn("跳过无效集群条目: 缺少集群ID")
continue
}
clusterId, _ := strconv.ParseInt(cluster.Id, 10, 64)
switch cluster.Label {
case "openI":
oi, _ := openI.New(cluster.Username, cluster.Password, cluster.Token, platform.Id(clusterId), "data")
platforms = append(platforms, oi)
//更新C端集群状态
case "octopus":
oct, _ := octopus.New(cluster.Address, cluster.Username, cluster.Password, platform.Id(clusterId))
platforms = append(platforms, oct)
}
common.InitClient()
}
svc, _ = service.NewService(platforms...)
return svc, nil
}