Compare commits

...

9 Commits

Author SHA1 Message Date
zhangwei 594dec4538 Merge branch 'master' of https://gitlink.org.cn/JointCloud/pcm-coordinator 2025-07-30 14:40:47 +08:00
zhangwei c98ab4a888 容器接口更新 2025-07-30 14:40:32 +08:00
jagger 855d02da5c refactor UpdateHpcTaskStatus; streamline task synchronization logic, improve error handling, and enhance status reporting
Signed-off-by: jagger <cossjie@foxmail.com>
2025-07-30 11:44:05 +08:00
jagger 80f07495d5 refactor UpdateHpcTaskStatus; streamline task synchronization logic, improve error handling, and enhance status reporting
Signed-off-by: jagger <cossjie@foxmail.com>
2025-07-30 11:24:40 +08:00
jagger 3185981438 refactor API endpoints; remove projectId from createDataSet and createAlgorithm paths, add new endpoints for model and container operations
Signed-off-by: jagger <cossjie@foxmail.com>
2025-07-30 09:35:56 +08:00
jagger b574a80516 add GetClusterBaseInfo handler and logic; implement API for retrieving cluster base information
Signed-off-by: jagger <cossjie@foxmail.com>
2025-07-29 18:20:26 +08:00
jagger a7f90f139a add optional UserId field to resource specifications; enhance user tracking in resource updates
Signed-off-by: jagger <cossjie@foxmail.com>
2025-07-25 20:25:02 +08:00
jagger d672a1e0b8 add UserId field to resource spec requests and handlers; enhance user tracking in resource synchronization
Signed-off-by: jagger <cossjie@foxmail.com>
2025-07-25 19:35:14 +08:00
zhangweiii e702c27355 Merge pull request '容器相关接口' (#528) from zhangweiii/pcm-coordinator:master into master 2025-07-24 15:48:32 +08:00
22 changed files with 473 additions and 101 deletions

View File

@ -947,6 +947,16 @@ type (
ProxyEnable string `json:"proxyEnable,omitempty" db:"proxy_enable"`
Driver string `json:"driver,omitempty" db:"driver"`
}
ClusterBaseInfo {
Id string `json:"id,omitempty" db:"id"`
AdapterId int64 `json:"adapterId,omitempty,string" db:"adapter_id"`
Name string `json:"name,omitempty" db:"name"`
Nickname string `json:"nickname,omitempty" db:"nickname"`
Description string `json:"description,omitempty" db:"description"`
Server string `json:"server,omitempty" db:"server"`
Driver string `json:"driver,omitempty" db:"driver"`
}
)
type ClusterDelReq {
@ -1416,6 +1426,7 @@ type ResourceSpecReq {
type FetchResourceSpecReq {
ClusterId string `form:"clusterId,optional"`
Tag string `form:"tag,optional"`
UserId int64 `form:"userId,optional"`
}
type IdReq {
@ -1479,8 +1490,10 @@ type EditResourceReq {
CpuUnit string `json:"cpuUnit,optional"`
MemoryValue string `json:"memoryValue,optional"`
MemoryUnit string `json:"memoryUnit,optional"`
UserId int64 `json:"userId,optional"`
}
type SyncResourceReq {
Id string `json:"id"`
UserId int64 `json:"userId,optional"`
}

View File

@ -338,7 +338,7 @@ service pcm {
@doc "创建数据集"
@handler CreateDataSetHandler
post /ai/createDataSet/:projectId (CreateDataSetReq) returns (CreateDataSetResp)
post /ai/createDataSet (CreateDataSetReq) returns (CreateDataSetResp)
@doc "删除数据集"
@handler DeleteDataSetHandler
@ -362,7 +362,7 @@ service pcm {
@doc "创建算法"
@handler CreateAlgorithmHandler
post /ai/CreateAlgorithm/:projectId (CreateAlgorithmReq) returns (CreateAlgorithmResp)
post /ai/createAlgorithm (CreateAlgorithmReq) returns (CreateAlgorithmResp)
@doc "查询创建算法列表"
@handler ListAlgorithms
@ -948,6 +948,9 @@ service pcm {
@handler GetAdapterInfoHandler
get /adapter/getAdapterInfo (adapterInfoNameReq) returns (adapterInfoNameReqResp)
@handler GetClusterBaseInfoHandler
get /adapter/cluster/getClusterBaseInfo (ClusterReq) returns (PageResult)
}
@server (

View File

@ -0,0 +1,24 @@
package adapters
import (
"github.com/zeromicro/go-zero/rest/httpx"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/logic/adapters"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/svc"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/types"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/repository/result"
"net/http"
)
func GetClusterBaseInfoHandler(svcCtx *svc.ServiceContext) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
var req types.ClusterReq
if err := httpx.Parse(r, &req); err != nil {
result.ParamErrorResult(r, w, err)
return
}
l := adapters.NewGetClusterBaseInfoLogic(r.Context(), svcCtx)
resp, err := l.GetClusterBaseInfo(&req)
result.HttpResult(r, w, resp, err)
}
}

View File

@ -2,6 +2,7 @@ package cloud
import (
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/logic/cloud"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/models"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/repository/result"
"io"
"k8s.io/apimachinery/pkg/util/json"
@ -24,7 +25,11 @@ func ContainerCreateHandler(svcCtx *svc.ServiceContext) http.HandlerFunc {
result.ParamErrorResult(r, w, err)
return
}
// 获取用户信息
userStr := r.Header.Get("User")
user := &models.JccUserInfo{}
json.Unmarshal([]byte(userStr), user)
req.UserId = user.Id
l := cloud.NewContainerCreateLogic(r.Context(), svcCtx)
resp, err := l.ContainerCreate(&req)
result.HttpResult(r, w, resp, err)

View File

@ -6,6 +6,7 @@ import (
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/svc"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/types"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/repository/result"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/utils"
"net/http"
)
@ -16,7 +17,14 @@ func CompareResourceSpecHandler(svcCtx *svc.ServiceContext) http.HandlerFunc {
result.ParamErrorResult(r, w, err)
return
}
token := r.Header.Get("Authorization")
// 获取用户信息
jccUserInfo, err := utils.ParseTokenWithoutVerify(token)
if err != nil {
result.ParamErrorResult(r, w, err)
return
}
req.UserId = jccUserInfo.Id
l := core.NewCompareResourceSpecLogic(r.Context(), svcCtx)
resp, err := l.CompareResourceSpec(&req)
result.HttpResult(r, w, resp, err)

View File

@ -6,6 +6,7 @@ import (
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/svc"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/types"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/repository/result"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/utils"
"net/http"
)
@ -17,6 +18,15 @@ func EditResourceSpecHandler(svcCtx *svc.ServiceContext) http.HandlerFunc {
return
}
token := r.Header.Get("Authorization")
// 获取用户信息
jccUserInfo, err := utils.ParseTokenWithoutVerify(token)
if err != nil {
result.ParamErrorResult(r, w, err)
return
}
req.UserId = jccUserInfo.Id
l := core.NewEditResourceSpecLogic(r.Context(), svcCtx)
resp, err := l.EditResourceSpec(&req)
result.HttpResult(r, w, resp, err)

View File

@ -6,6 +6,7 @@ import (
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/svc"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/types"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/repository/result"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/utils"
"net/http"
)
@ -17,6 +18,14 @@ func SyncResourceSpecHandler(svcCtx *svc.ServiceContext) http.HandlerFunc {
return
}
token := r.Header.Get("Authorization")
// 获取用户信息
jccUserInfo, err := utils.ParseTokenWithoutVerify(token)
if err != nil {
result.ParamErrorResult(r, w, err)
return
}
req.UserId = jccUserInfo.Id
l := core.NewSyncResourceSpecLogic(r.Context(), svcCtx)
resp, err := l.SyncResourceSpec(&req)
result.HttpResult(r, w, resp, err)

View File

@ -39,6 +39,11 @@ func RegisterHandlers(server *rest.Server, serverCtx *svc.ServiceContext) {
Path: "/adapter/cluster/get",
Handler: adapters.GetClusterHandler(serverCtx),
},
{
Method: http.MethodGet,
Path: "/adapter/cluster/getClusterBaseInfo",
Handler: adapters.GetClusterBaseInfoHandler(serverCtx),
},
{
Method: http.MethodGet,
Path: "/adapter/cluster/list",

View File

@ -0,0 +1,84 @@
package adapters
import (
"context"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/svc"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/types"
"github.com/zeromicro/go-zero/core/logx"
)
type GetClusterBaseInfoLogic struct {
logx.Logger
ctx context.Context
svcCtx *svc.ServiceContext
}
func NewGetClusterBaseInfoLogic(ctx context.Context, svcCtx *svc.ServiceContext) *GetClusterBaseInfoLogic {
return &GetClusterBaseInfoLogic{
Logger: logx.WithContext(ctx),
ctx: ctx,
svcCtx: svcCtx,
}
}
func (l *GetClusterBaseInfoLogic) GetClusterBaseInfo(req *types.ClusterReq) (resp *types.PageResult, err error) {
limit := req.PageSize
offset := req.PageSize * (req.PageNum - 1)
resp = &types.PageResult{}
var list []types.ClusterBaseInfo
db := l.svcCtx.DbEngin.Model(&types.AdapterInfo{}).Table("t_cluster")
db = db.Joins("left join t_adapter on t_adapter.id = t_cluster.adapter_id").
Where("t_cluster.deleted_at is null")
if req.Name != "" {
db = db.Where("t_cluster.name LIKE ?", "%"+req.Name+"%")
}
if req.AdapterId != "" {
db = db.Where("t_cluster.adapter_id = ?", req.AdapterId)
}
if req.Nickname != "" {
db = db.Where("t_cluster.nickname LIKE ?", "%"+req.Nickname+"%")
}
if req.Label != "" {
db = db.Where("t_cluster.label = ?", req.Label)
}
if req.Version != "" {
db = db.Where("t_cluster.version = ?", req.Version)
}
if req.ProducerDict != "" {
db = db.Where("t_cluster.producer_dict = ?", req.ProducerDict)
}
if req.RegionDict != "" {
db = db.Where("t_cluster.region_dict = ?", req.RegionDict)
}
if req.Type != "" {
db = db.Where("t_adapter.type = ?", req.Type)
}
if req.ResourceType != "" {
db = db.Where("t_adapter.resource_type = ?", req.ResourceType)
}
if req.StorageSchedule != "" {
db = db.Where("t_cluster.storage_schedule = ?", req.StorageSchedule)
}
//count total
var total int64
err = db.Select("*").Count(&total).Error
if err != nil {
return resp, err
}
db = db.Limit(limit).Offset(offset)
err = db.Select("t_cluster.*").Order("t_cluster.create_time desc").Scan(&list).Error
if err != nil {
return resp, err
}
resp.List = list
resp.PageSize = req.PageSize
resp.PageNum = req.PageNum
resp.Total = total
return resp, nil
}

View File

@ -37,11 +37,11 @@ func (l *UpdateClusterLogic) UpdateCluster(req *types.ClusterCreateReq) (resp *t
}
utils.Convert(req, &cluster)
// 获取集群经纬度
location, err := GeoMap(req.RegionName)
if err != nil {
return nil, err
}
cluster.Location = location
//location, err := GeoMap(req.RegionName)
//if err != nil {
// return nil, err
//}
//cluster.Location = location
l.svcCtx.DbEngin.Table("t_cluster").Model(&cluster).Updates(&cluster)
// 更新资源价格表
clusterId, err := strconv.ParseInt(req.Id, 10, 64)

View File

@ -60,7 +60,7 @@ func (l *CommitGeneralTaskLogic) CommitGeneralTask(req *types.GeneralTaskReq) (r
logx.Info("commit success")
}
}()
adapterId, _ := strconv.ParseUint(req.AdapterIds[0], 10, 64)
adapterId, _ := strconv.ParseInt(req.AdapterIds[0], 10, 64)
var clusters []*models.CloudModel
err = tx.Raw("SELECT * FROM `t_cluster` where adapter_id in ? and id in ?", req.AdapterIds, req.ClusterIds).Scan(&clusters).Error
if err != nil {
@ -111,11 +111,11 @@ func (l *CommitGeneralTaskLogic) CommitGeneralTask(req *types.GeneralTaskReq) (r
for _, s := range req.ReqBody {
sStruct := UnMarshalK8sStruct(s, int64(r.Replica))
unString, _ := sStruct.MarshalJSON()
taskCloud.Id = utils.GenSnowflakeIDUint()
taskCloud.Id = utils.GenSnowflakeID()
taskCloud.Name = sStruct.GetName() + "-" + sStruct.GetKind()
taskCloud.TaskId = uint(taskModel.Id)
taskCloud.TaskId = taskModel.Id
clusterId, _ := strconv.ParseUint(r.ClusterId, 10, 64)
taskCloud.AdapterId = uint(adapterId)
taskCloud.AdapterId = adapterId
taskCloud.AdapterName = adapterName
taskCloud.UserId = req.UserId
taskCloud.ClusterId = uint(clusterId)

View File

@ -21,7 +21,12 @@ import (
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/participant/cloud"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/svc"
container "gitlink.org.cn/JointCloud/pcm-coordinator/internal/types/cloud"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/constants"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/models"
cloud2 "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/models/cloud"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/utils"
"net/http"
"time"
)
type ContainerCreateLogic struct {
@ -39,8 +44,10 @@ func NewContainerCreateLogic(ctx context.Context, svcCtx *svc.ServiceContext) *C
}
func (l *ContainerCreateLogic) ContainerCreate(req *container.CreateParam) (resp interface{}, err error) {
param := &cloud.CreateParam{
Name: req.Name,
Description: req.Description,
Port: req.Port,
Cpu: req.Cpu,
Memory: req.Memory,
@ -59,6 +66,38 @@ func (l *ContainerCreateLogic) ContainerCreate(req *container.CreateParam) (resp
if create.Code != http.StatusOK {
return nil, errors.New(create.Message)
}
resp = create.Data
// 构建主任务结构体
taskModel := models.Task{
Id: utils.GenSnowflakeID(),
Status: constants.Saved,
Description: req.Description,
Name: req.Name,
UserId: req.UserId,
AdapterTypeDict: "0",
CommitTime: time.Now(),
}
// 保存任务数据到数据库
tx := l.svcCtx.DbEngin.Create(&taskModel)
if tx.Error != nil {
}
var adapterId int64
tx.Table("t_cluster").Select("adapter_id").Where("id=?", req.ClusterId).Find(&adapterId)
// 构建cloud任务结构体
cloudTaskModel := cloud2.TaskCloudModel{
Id: utils.GenSnowflakeID(),
TaskId: taskModel.Id,
Name: req.Name,
AdapterId: adapterId,
Status: constants.Saved,
Namespace: "default",
UserId: req.UserId,
}
// 保存任务数据到数据库
tx = l.svcCtx.DbEngin.Create(&cloudTaskModel)
if tx.Error != nil {
}
resp = taskModel.Id
return
}

View File

@ -79,7 +79,7 @@ func (l *CompareResourceSpecLogic) CompareResourceSpec(req *types.FetchResourceS
}
// 同步资源到数据库
if err := l.syncResourcesToDB(apiResources); err != nil {
if err := l.syncResourcesToDB(apiResources, req.UserId); err != nil {
return nil, fmt.Errorf("failed to sync resources: %w", err)
}
@ -135,10 +135,10 @@ func decodeAPIResponse(input interface{}, output *[]APIResponse) error {
return nil
}
func (l *CompareResourceSpecLogic) syncResourcesToDB(apiResponses []APIResponse) error {
func (l *CompareResourceSpecLogic) syncResourcesToDB(apiResponses []APIResponse, userId int64) error {
for _, response := range apiResponses {
// 转换API响应到数据库模型
dbSpecs, apiSpecs, err := l.processAPIResponse(response)
dbSpecs, apiSpecs, err := l.processAPIResponse(response, userId)
if err != nil {
return err
}
@ -151,7 +151,7 @@ func (l *CompareResourceSpecLogic) syncResourcesToDB(apiResponses []APIResponse)
return nil
}
func (l *CompareResourceSpecLogic) processAPIResponse(response APIResponse) ([]models.TResourceSpec, []models.TResourceSpec, error) {
func (l *CompareResourceSpecLogic) processAPIResponse(response APIResponse, userId int64) ([]models.TResourceSpec, []models.TResourceSpec, error) {
ClusterId := utils.StringToInt64(response.ClusterId)
var dbSpecs []models.TResourceSpec
if err := l.svcCtx.DbEngin.Model(models.TResourceSpec{}).Preload("BaseResourceSpecs").
@ -167,7 +167,7 @@ func (l *CompareResourceSpecLogic) processAPIResponse(response APIResponse) ([]m
if res.Resource.Name == "" || res.Resource.Type == "" {
continue
}
spec := l.convertToResourceSpec(ClusterId, res, response.Tag)
spec := l.convertToResourceSpec(ClusterId, res, response.Tag, userId)
apiSpecs = append(apiSpecs, spec)
}
@ -333,7 +333,7 @@ func (l *CompareResourceSpecLogic) isSpecChanged(old, new models.TResourceSpec)
return len(oldBaseMap) > 0
}
func (l *CompareResourceSpecLogic) convertToResourceSpec(ClusterId int64, res Resource, tag string) models.TResourceSpec {
func (l *CompareResourceSpecLogic) convertToResourceSpec(ClusterId int64, res Resource, tag string, userId int64) models.TResourceSpec {
spec := models.TResourceSpec{
SourceKey: resourceKey(res.Resource.Type, res.Resource.Name, tag),
Type: res.Resource.Type,
@ -344,6 +344,7 @@ func (l *CompareResourceSpecLogic) convertToResourceSpec(ClusterId int64, res Re
ClusterId: ClusterId,
CreateTime: time.Now(),
UpdateTime: time.Now(),
UserId: userId,
ChangeType: ChangeTypeNormal,
}
@ -355,6 +356,7 @@ func (l *CompareResourceSpecLogic) convertToResourceSpec(ClusterId int64, res Re
TotalUnit: br.Total.Unit,
AvailableValue: br.Available.Value,
AvailableUnit: br.Available.Unit,
UserId: userId,
CreateTime: time.Now(),
UpdateTime: time.Now(),
})

View File

@ -58,7 +58,7 @@ func (l *EditResourceSpecLogic) EditResourceSpec(req *types.EditResourceReq) (re
costPerUnit := utils.StringToFloat64(req.CostPerUnit)
// 4. 更新主资源规格
if err = updateMainResourceSpec(tx, req.Id, statusInt, req.CostType, costPerUnit); err != nil {
if err = updateMainResourceSpec(tx, req.Id, statusInt, req.CostType, costPerUnit, req.UserId); err != nil {
return nil, err
}
@ -98,13 +98,14 @@ func validateRequestParams(req *types.EditResourceReq) error {
}
// updateMainResourceSpec 更新主资源规格
func updateMainResourceSpec(tx *gorm.DB, id int64, status int64, costType string, costPerUnit float64) error {
func updateMainResourceSpec(tx *gorm.DB, id int64, status int64, costType string, costPerUnit float64, userId int64) error {
return tx.Model(&models.TResourceSpec{}).
Where("id = ?", id).
Updates(map[string]interface{}{
"status": status,
"cost_type": costType,
"cost_per_unit": costPerUnit,
"user_id": userId,
}).
Error
}

View File

@ -51,7 +51,7 @@ func (l *SyncResourceSpecLogic) SyncResourceSpec(req *types.SyncResourceReq) (re
}
for _, response := range apiResources {
// 转换API响应到数据库模型
_, apiSpecs, err := compareLogic.processAPIResponse(response)
_, apiSpecs, err := compareLogic.processAPIResponse(response, req.UserId)
if err != nil {
return nil, err
}

View File

@ -13,6 +13,7 @@ type Resp struct {
type CreateParam struct {
ContainerGroupName string `json:"containerGroupName"`
Description string `json:"description,omitempty"`
Name string `json:"name"`
Image string `json:"image"`
Cpu string `json:"cpu,omitempty"`

View File

@ -59,8 +59,8 @@ func NewCloudScheduler(ctx context.Context, val string, scheduler *scheduler.Sch
func (as *CloudScheduler) GetNewStructForDb(task *response.TaskInfo, resource string, participantId int64) (interface{}, error) {
c := cloud.TaskCloudModel{
AdapterId: uint(participantId),
TaskId: uint(task.TaskId),
AdapterId: (participantId),
TaskId: (task.TaskId),
Status: constants.Saved,
YamlString: as.yamlString,
}

View File

@ -1,6 +1,8 @@
package status
import (
"context"
"fmt"
jsoniter "github.com/json-iterator/go"
"github.com/rs/zerolog/log"
"github.com/zeromicro/go-zero/core/logx"
@ -10,8 +12,9 @@ import (
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/constants"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/models"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/utils"
"net/http"
"gorm.io/gorm"
"strconv"
"time"
)
func reportHpcStatusMessages(svc *svc.ServiceContext, task *types.TaskModel, hpcTask *models.TaskHpc, status bool, message string) error {
@ -38,84 +41,226 @@ func reportHpcStatusMessages(svc *svc.ServiceContext, task *types.TaskModel, hpc
}
// UpdateHpcTaskStatus 更新超算任务状态,并通知中间件
//func UpdateHpcTaskStatus(svc *svc.ServiceContext) {
// svc.Scheduler.HpcService.TaskSyncLock.Lock()
// defer svc.Scheduler.HpcService.TaskSyncLock.Unlock()
// taskHpcs := make([]*models.TaskHpc, 0)
// sqlStr := `SELECT *
// FROM task_hpc
// WHERE
// job_id != ''
// AND (
// status NOT IN ('Failed', 'Completed', 'Cancelled')
// OR start_time < created_time
// )
// ORDER BY created_time DESC
// LIMIT 10`
// db := svc.DbEngin.Raw(sqlStr).Scan(&taskHpcs)
// if db.Error != nil {
// logx.Errorf(db.Error.Error())
// return
// }
// for _, hpc := range taskHpcs {
// //更新task表的超算任务状态
// task := &types.TaskModel{}
// tx := svc.DbEngin.Model(models.Task{}).Where("id", hpc.TaskId).Scan(&task)
// if tx.Error != nil {
// logx.Errorf(tx.Error.Error())
// break
// }
// clusterId := utils.Int64ToString(hpc.ClusterId)
// h := http.Request{}
// hpcTask, err := svc.Scheduler.HpcService.HpcExecutorAdapterMap[strconv.FormatInt(hpc.AdapterId, 10)].GetTask(h.Context(), hpc.JobId, clusterId)
// if err != nil {
// logx.Errorf(err.Error())
// break
// }
// switch hpcTask.Status {
// case constants.Running:
// if hpc.Status != hpcTask.Status {
// svc.Scheduler.HpcStorages.AddNoticeInfo(strconv.FormatInt(hpc.AdapterId, 10), hpc.AdapterName, strconv.FormatInt(hpc.ClusterId, 10), hpc.ClusterName, hpc.Name, "running", "任务运行中")
// hpc.Status = hpcTask.Status
// task.Status = hpcTask.Status
// }
// case constants.Failed:
// if hpc.Status != hpcTask.Status {
// svc.Scheduler.HpcStorages.AddNoticeInfo(strconv.FormatInt(hpc.AdapterId, 10), hpc.AdapterName, strconv.FormatInt(hpc.ClusterId, 10), hpc.ClusterName, hpc.Name, "failed", "任务失败")
// hpc.Status = hpcTask.Status
// task.Status = hpcTask.Status
// logx.Infof("[%v]:任务执行失败,发送通知, 任务状态: [%v]", hpcTask, hpcTask.Status)
// _ = reportHpcStatusMessages(svc, task, hpc, false, "任务失败")
// }
// case constants.Completed:
// if hpc.Status != hpcTask.Status {
// svc.Scheduler.HpcStorages.AddNoticeInfo(strconv.FormatInt(hpc.AdapterId, 10), hpc.AdapterName, strconv.FormatInt(hpc.ClusterId, 10), hpc.ClusterName, hpc.Name, "completed", "任务完成")
// hpc.Status = hpcTask.Status
// task.Status = hpcTask.Status
// logx.Infof("[%v]:任务执行完成,发送通知, 任务状态: [%v]", hpcTask, hpcTask.Status)
// _ = reportHpcStatusMessages(svc, task, hpc, true, "任务完成")
// }
// default:
// if hpc.Status != hpcTask.Status {
// svc.Scheduler.HpcStorages.AddNoticeInfo(strconv.FormatInt(hpc.AdapterId, 10), hpc.AdapterName, strconv.FormatInt(hpc.ClusterId, 10), hpc.ClusterName, hpc.Name, "pending", "任务pending")
// hpc.Status = hpcTask.Status
// task.Status = hpcTask.Status
// }
// }
// //task.Id=hpcTask.
// task.StartTime = hpcTask.Start
// task.EndTime = hpcTask.End
// hpc.StartTime = hpcTask.Start
// hpc.EndTime = hpcTask.End
// logx.Info("# task 开始时间: %v, 结束时间: %v", task.StartTime, task.EndTime)
// err = svc.Scheduler.HpcStorages.UpdateTask(task)
// if err != nil {
// logx.Errorf(err.Error())
// break
// }
// err = svc.Scheduler.HpcStorages.UpdateHpcTask(hpc)
// if err != nil {
// logx.Errorf(err.Error())
// break
// }
// }
//}
// UpdateHpcTaskStatus HPC 任务状态同步函数
func UpdateHpcTaskStatus(svc *svc.ServiceContext) {
svc.Scheduler.HpcService.TaskSyncLock.Lock()
defer svc.Scheduler.HpcService.TaskSyncLock.Unlock()
taskList := make([]*models.TaskHpc, 0)
sqlStr := `SELECT *
FROM task_hpc
WHERE
job_id != ''
AND (
status NOT IN ('Failed', 'Completed', 'Cancelled')
OR start_time < created_time
)
ORDER BY created_time DESC
LIMIT 10`
db := svc.DbEngin.Raw(sqlStr).Scan(&taskList)
if db.Error != nil {
logx.Errorf(db.Error.Error())
// 1. 查询需要同步的 HPC 任务
var hpcTasks []*models.TaskHpc
sqlStr := `SELECT * FROM task_hpc WHERE job_id != '' AND status NOT IN ('Failed', 'Completed', 'Cancelled') ORDER BY created_time DESC LIMIT 10`
if err := svc.DbEngin.Raw(sqlStr).Scan(&hpcTasks).Error; err != nil {
logx.Errorf("Failed to query HPC tasks for sync: %v", err)
return
}
for _, hpc := range taskList {
//更新task表的超算任务状态
task := &types.TaskModel{}
tx := svc.DbEngin.Model(models.Task{}).Where("id", hpc.TaskId).Scan(&task)
if tx.Error != nil {
logx.Errorf(tx.Error.Error())
break
if len(hpcTasks) == 0 {
return
}
// 2. 批量获取关联的 Task 模型
taskIDs := make([]int64, len(hpcTasks))
for i, hpc := range hpcTasks {
taskIDs[i] = hpc.TaskId
}
taskMap := make(map[int64]*types.TaskModel)
var tasks []*types.TaskModel
if err := svc.DbEngin.Model(&models.Task{}).Where("id IN ?", taskIDs).Find(&tasks).Error; err != nil {
logx.Errorf("Failed to batch query tasks: %v", err)
return
}
for _, task := range tasks {
taskMap[task.Id] = task
}
// 3. 遍历 HPC 任务并更新状态
for _, hpc := range hpcTasks {
task, ok := taskMap[hpc.TaskId]
if !ok {
logx.Errorf("Task with ID %d not found for HPC task %d, skipping", hpc.TaskId, hpc.Id)
continue
}
clusterId := utils.Int64ToString(hpc.ClusterId)
h := http.Request{}
hpcTask, err := svc.Scheduler.HpcService.HpcExecutorAdapterMap[strconv.FormatInt(hpc.AdapterId, 10)].GetTask(h.Context(), hpc.JobId, clusterId)
// 使用带超时的 Context防止 API 调用阻塞
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
adapterIDStr := strconv.FormatInt(hpc.AdapterId, 10)
adapter, adapterExists := svc.Scheduler.HpcService.HpcExecutorAdapterMap[adapterIDStr]
if !adapterExists {
logx.Errorf("HPC adapter with ID %s not found, skipping task %s", adapterIDStr, hpc.Name)
continue
}
// 4. 从 HPC 集群获取最新状态
hpcTaskInfo, err := adapter.GetTask(ctx, hpc.JobId, utils.Int64ToString(hpc.ClusterId))
if err != nil {
logx.Errorf(err.Error())
break
logx.Errorf("Failed to get task status from HPC executor for job %s: %v", hpc.JobId, err)
continue // 继续处理下一个任务
}
switch hpcTask.Status {
case constants.Running:
if hpc.Status != hpcTask.Status {
svc.Scheduler.HpcStorages.AddNoticeInfo(strconv.FormatInt(hpc.AdapterId, 10), hpc.AdapterName, strconv.FormatInt(hpc.ClusterId, 10), hpc.ClusterName, hpc.Name, "running", "任务运行中")
hpc.Status = hpcTask.Status
task.Status = hpcTask.Status
}
case constants.Failed:
if hpc.Status != hpcTask.Status {
svc.Scheduler.HpcStorages.AddNoticeInfo(strconv.FormatInt(hpc.AdapterId, 10), hpc.AdapterName, strconv.FormatInt(hpc.ClusterId, 10), hpc.ClusterName, hpc.Name, "failed", "任务失败")
hpc.Status = hpcTask.Status
task.Status = hpcTask.Status
logx.Infof("[%v]:任务执行失败,发送通知, 任务状态: [%v]", hpcTask, hpcTask.Status)
_ = reportHpcStatusMessages(svc, task, hpc, false, "任务失败")
}
case constants.Completed:
if hpc.Status != hpcTask.Status {
svc.Scheduler.HpcStorages.AddNoticeInfo(strconv.FormatInt(hpc.AdapterId, 10), hpc.AdapterName, strconv.FormatInt(hpc.ClusterId, 10), hpc.ClusterName, hpc.Name, "completed", "任务完成")
hpc.Status = hpcTask.Status
task.Status = hpcTask.Status
logx.Infof("[%v]:任务执行完成,发送通知, 任务状态: [%v]", hpcTask, hpcTask.Status)
_ = reportHpcStatusMessages(svc, task, hpc, true, "任务完成")
}
default:
if hpc.Status != hpcTask.Status {
svc.Scheduler.HpcStorages.AddNoticeInfo(strconv.FormatInt(hpc.AdapterId, 10), hpc.AdapterName, strconv.FormatInt(hpc.ClusterId, 10), hpc.ClusterName, hpc.Name, "pending", "任务pending")
hpc.Status = hpcTask.Status
task.Status = hpcTask.Status
}
// 如果状态没有变化,则跳过
if hpc.Status == hpcTaskInfo.Status {
continue
}
task.StartTime = hpcTask.Start
task.EndTime = hpcTask.End
hpc.StartTime = hpcTask.Start
hpc.EndTime = hpcTask.End
logx.Info("# task 开始时间: %v, 结束时间: %v", task.StartTime, task.EndTime)
err = svc.Scheduler.HpcStorages.UpdateTask(task)
// 5. 准备更新
previousStatus := hpc.Status
hpc.Status = hpcTaskInfo.Status
hpc.StartTime = hpcTaskInfo.Start
hpc.EndTime = hpcTaskInfo.End
task.Status = hpcTaskInfo.Status
task.StartTime = hpcTaskInfo.Start
task.EndTime = hpcTaskInfo.End
logx.Infof("HPC task status change detected for job %s: %s -> %s", hpc.JobId, previousStatus, hpc.Status)
// 6. 在事务中更新数据库
err = svc.DbEngin.Transaction(func(tx *gorm.DB) error {
task.UpdatedTime = time.Now().Format(constants.Layout)
if err := tx.Table("task").Updates(task).Error; err != nil {
return fmt.Errorf("failed to update task table: %w", err)
}
if err := tx.Table("task_hpc").Updates(hpc).Error; err != nil {
return fmt.Errorf("failed to update hpc_task table: %w", err)
}
return nil
})
if err != nil {
logx.Errorf(err.Error())
break
logx.Errorf("Failed to update database in transaction for job %s: %v", hpc.JobId, err)
// 事务失败,回滚状态,继续处理下一个任务
hpc.Status = previousStatus
task.Status = previousStatus
continue
}
err = svc.Scheduler.HpcStorages.UpdateHpcTask(hpc)
if err != nil {
logx.Errorf(err.Error())
break
// 7. 根据新状态执行后续操作 (通知、报告等)
handleStatusChange(svc, task, hpc, hpcTaskInfo.Status)
}
}
// handleStatusChange 根据新状态执行后续操作
func handleStatusChange(svc *svc.ServiceContext, task *types.TaskModel, hpc *models.TaskHpc, newStatus string) {
adapterIDStr := strconv.FormatInt(hpc.AdapterId, 10)
clusterIDStr := strconv.FormatInt(hpc.ClusterId, 10)
var noticeType, noticeMessage string
var reportSuccess bool
var shouldReport bool
switch newStatus {
case constants.Running:
noticeType = "running"
noticeMessage = "任务运行中"
case constants.Failed:
noticeType = "failed"
noticeMessage = "任务失败"
reportSuccess = false
shouldReport = true
case constants.Completed:
noticeType = "completed"
noticeMessage = "任务完成"
reportSuccess = true
shouldReport = true
case constants.Pending:
noticeType = "pending"
noticeMessage = "任务pending"
default:
// 对于其他未知状态,可以选择记录日志并返回
logx.Errorf("Unhandled HPC task status '%s' for job %s", newStatus, hpc.JobId)
return
}
// 发送通知
svc.Scheduler.HpcStorages.AddNoticeInfo(adapterIDStr, hpc.AdapterName, clusterIDStr, hpc.ClusterName, hpc.Name, noticeType, noticeMessage)
logx.Infof("[%s]: 任务状态变更为 [%s],发送通知。", hpc.Name, newStatus)
// 上报状态
if shouldReport {
if err := reportHpcStatusMessages(svc, task, hpc, reportSuccess, noticeMessage); err != nil {
logx.Errorf("Failed to report HPC status for job %s: %v", hpc.JobId, err)
}
}
}

View File

@ -162,8 +162,14 @@ func (m *ModelArtsLink) SubmitTask(ctx context.Context, imageId string, cmd stri
outputs := make([]*modelarts.OutputTraining, 0)
outputValue := ""
for _, env := range envs {
s := strings.Split(env, COMMA)
environments[s[0]] = s[1]
// 找到第一个逗号位置
idx := strings.Index(env, COMMA)
if idx == -1 {
continue
}
key := strings.TrimSpace(env[:idx])
value := strings.TrimSpace(env[idx+1:])
environments[key] = value
}
for _, param := range params {
s := strings.Split(param, COMMA)

View File

@ -12,6 +12,8 @@ type ContainerDeleteParameter interface {
type CreateParam struct {
ClusterId string `json:"clusterId,omitempty"`
ContainerGroupName string `json:"containerGroupName"`
Description string `json:"description,omitempty"`
UserId int64 `json:"userId"`
Name string `json:"name"`
Image string `json:"image"`
Cpu string `json:"cpu,omitempty"`
@ -61,6 +63,7 @@ type EciDeleteParam struct {
// 获取容器信息
type GetParam struct {
TaskId string `json:"taskId"`
ClusterId string `json:"clusterId,omitempty"`
Name string `json:"name,omitempty"`
GetParameter ContainerGetParameter `json:"getParameter,omitempty"`

View File

@ -700,6 +700,16 @@ type ClusterAvail struct {
ClusterName string `json:"clusterName"`
}
type ClusterBaseInfo struct {
Id string `json:"id,omitempty" db:"id"`
AdapterId int64 `json:"adapterId,omitempty,string" db:"adapter_id"`
Name string `json:"name,omitempty" db:"name"`
Nickname string `json:"nickname,omitempty" db:"nickname"`
Description string `json:"description,omitempty" db:"description"`
Server string `json:"server,omitempty" db:"server"`
Driver string `json:"driver,omitempty" db:"driver"`
}
type ClusterCreateReq struct {
Id string `json:"id,optional"`
AdapterId string `json:"adapterId,optional"`
@ -2145,6 +2155,8 @@ type EditResourceReq struct {
CpuUnit string `json:"cpuUnit,optional"`
MemoryValue string `json:"memoryValue,optional"`
MemoryUnit string `json:"memoryUnit,optional"`
UserId int64 `json:"userId,optional"`
}
type EndpointsReq struct {
@ -2283,6 +2295,7 @@ type Fault struct {
type FetchResourceSpecReq struct {
ClusterId string `form:"clusterId,optional"`
Tag string `form:"tag,optional"`
UserId int64 `form:"userId,optional"`
}
type Fields struct {
@ -5566,7 +5579,8 @@ type SyncClusterAlertReq struct {
}
type SyncResourceReq struct {
Id string `json:"id"`
Id string `json:"id"`
UserId int64 `json:"userId,optional"`
}
type Tags struct {

View File

@ -6,10 +6,10 @@ import (
)
type TaskCloudModel struct {
Id uint `json:"id" gorm:"primarykey;not null;comment:id"`
Id int64 `json:"id" gorm:"primarykey;not null;comment:id"`
Name string `json:"name" gorm:"null;comment:名称"`
TaskId uint `json:"taskId" gorm:"not null;comment:task表id"`
AdapterId uint `json:"adapterId" gorm:"not null;comment:适配器id"`
TaskId int64 `json:"taskId" gorm:"not null;comment:task表id"`
AdapterId int64 `json:"adapterId" gorm:"not null;comment:适配器id"`
AdapterName string `json:"adapterName" gorm:"not null;comment:适配器名称"`
ClusterId uint `json:"clusterId" gorm:"not null;comment:集群id"`
ClusterName string `json:"clusterName" gorm:"not null;comment:集群名称"`