From d672a1e0b86210bb3c9bcd9c60f38939f7a6ee13 Mon Sep 17 00:00:00 2001 From: jagger Date: Fri, 25 Jul 2025 19:35:14 +0800 Subject: [PATCH 1/6] add UserId field to resource spec requests and handlers; enhance user tracking in resource synchronization Signed-off-by: jagger --- desc/core/pcm-core.api | 2 ++ .../handler/core/compareresourcespechandler.go | 10 +++++++++- internal/handler/core/syncresourcespechandler.go | 9 +++++++++ internal/logic/core/compareresourcespeclogic.go | 14 ++++++++------ internal/logic/core/syncresourcespeclogic.go | 2 +- internal/storeLink/modelarts.go | 10 ++++++++-- internal/types/types.go | 4 +++- 7 files changed, 40 insertions(+), 11 deletions(-) diff --git a/desc/core/pcm-core.api b/desc/core/pcm-core.api index 644221397..e826fdbb5 100644 --- a/desc/core/pcm-core.api +++ b/desc/core/pcm-core.api @@ -1416,6 +1416,7 @@ type ResourceSpecReq { type FetchResourceSpecReq { ClusterId string `form:"clusterId,optional"` Tag string `form:"tag,optional"` + UserId int64 `form:"userId,optional"` } type IdReq { @@ -1483,4 +1484,5 @@ type EditResourceReq { type SyncResourceReq { Id string `json:"id"` + UserId int64 `json:"userId"` } \ No newline at end of file diff --git a/internal/handler/core/compareresourcespechandler.go b/internal/handler/core/compareresourcespechandler.go index 787c9ce8d..3cc571352 100644 --- a/internal/handler/core/compareresourcespechandler.go +++ b/internal/handler/core/compareresourcespechandler.go @@ -6,6 +6,7 @@ import ( "gitlink.org.cn/JointCloud/pcm-coordinator/internal/svc" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/types" "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/repository/result" + "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/utils" "net/http" ) @@ -16,7 +17,14 @@ func CompareResourceSpecHandler(svcCtx *svc.ServiceContext) http.HandlerFunc { result.ParamErrorResult(r, w, err) return } - + token := r.Header.Get("Authorization") + // 获取用户信息 + jccUserInfo, err := utils.ParseTokenWithoutVerify(token) + if err != nil { + result.ParamErrorResult(r, w, err) + return + } + req.UserId = jccUserInfo.Id l := core.NewCompareResourceSpecLogic(r.Context(), svcCtx) resp, err := l.CompareResourceSpec(&req) result.HttpResult(r, w, resp, err) diff --git a/internal/handler/core/syncresourcespechandler.go b/internal/handler/core/syncresourcespechandler.go index 610428ce6..0461e2a1d 100644 --- a/internal/handler/core/syncresourcespechandler.go +++ b/internal/handler/core/syncresourcespechandler.go @@ -6,6 +6,7 @@ import ( "gitlink.org.cn/JointCloud/pcm-coordinator/internal/svc" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/types" "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/repository/result" + "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/utils" "net/http" ) @@ -17,6 +18,14 @@ func SyncResourceSpecHandler(svcCtx *svc.ServiceContext) http.HandlerFunc { return } + token := r.Header.Get("Authorization") + // 获取用户信息 + jccUserInfo, err := utils.ParseTokenWithoutVerify(token) + if err != nil { + result.ParamErrorResult(r, w, err) + return + } + req.UserId = jccUserInfo.Id l := core.NewSyncResourceSpecLogic(r.Context(), svcCtx) resp, err := l.SyncResourceSpec(&req) result.HttpResult(r, w, resp, err) diff --git a/internal/logic/core/compareresourcespeclogic.go b/internal/logic/core/compareresourcespeclogic.go index ebff0a545..ae51358a6 100644 --- a/internal/logic/core/compareresourcespeclogic.go +++ b/internal/logic/core/compareresourcespeclogic.go @@ -79,7 +79,7 @@ func (l *CompareResourceSpecLogic) CompareResourceSpec(req *types.FetchResourceS } // 同步资源到数据库 - if err := l.syncResourcesToDB(apiResources); err != nil { + if err := l.syncResourcesToDB(apiResources, req.UserId); err != nil { return nil, fmt.Errorf("failed to sync resources: %w", err) } @@ -135,10 +135,10 @@ func decodeAPIResponse(input interface{}, output *[]APIResponse) error { return nil } -func (l *CompareResourceSpecLogic) syncResourcesToDB(apiResponses []APIResponse) error { +func (l *CompareResourceSpecLogic) syncResourcesToDB(apiResponses []APIResponse, userId int64) error { for _, response := range apiResponses { // 转换API响应到数据库模型 - dbSpecs, apiSpecs, err := l.processAPIResponse(response) + dbSpecs, apiSpecs, err := l.processAPIResponse(response, userId) if err != nil { return err } @@ -151,7 +151,7 @@ func (l *CompareResourceSpecLogic) syncResourcesToDB(apiResponses []APIResponse) return nil } -func (l *CompareResourceSpecLogic) processAPIResponse(response APIResponse) ([]models.TResourceSpec, []models.TResourceSpec, error) { +func (l *CompareResourceSpecLogic) processAPIResponse(response APIResponse, userId int64) ([]models.TResourceSpec, []models.TResourceSpec, error) { ClusterId := utils.StringToInt64(response.ClusterId) var dbSpecs []models.TResourceSpec if err := l.svcCtx.DbEngin.Model(models.TResourceSpec{}).Preload("BaseResourceSpecs"). @@ -167,7 +167,7 @@ func (l *CompareResourceSpecLogic) processAPIResponse(response APIResponse) ([]m if res.Resource.Name == "" || res.Resource.Type == "" { continue } - spec := l.convertToResourceSpec(ClusterId, res, response.Tag) + spec := l.convertToResourceSpec(ClusterId, res, response.Tag, userId) apiSpecs = append(apiSpecs, spec) } @@ -333,7 +333,7 @@ func (l *CompareResourceSpecLogic) isSpecChanged(old, new models.TResourceSpec) return len(oldBaseMap) > 0 } -func (l *CompareResourceSpecLogic) convertToResourceSpec(ClusterId int64, res Resource, tag string) models.TResourceSpec { +func (l *CompareResourceSpecLogic) convertToResourceSpec(ClusterId int64, res Resource, tag string, userId int64) models.TResourceSpec { spec := models.TResourceSpec{ SourceKey: resourceKey(res.Resource.Type, res.Resource.Name, tag), Type: res.Resource.Type, @@ -344,6 +344,7 @@ func (l *CompareResourceSpecLogic) convertToResourceSpec(ClusterId int64, res Re ClusterId: ClusterId, CreateTime: time.Now(), UpdateTime: time.Now(), + UserId: userId, ChangeType: ChangeTypeNormal, } @@ -355,6 +356,7 @@ func (l *CompareResourceSpecLogic) convertToResourceSpec(ClusterId int64, res Re TotalUnit: br.Total.Unit, AvailableValue: br.Available.Value, AvailableUnit: br.Available.Unit, + UserId: userId, CreateTime: time.Now(), UpdateTime: time.Now(), }) diff --git a/internal/logic/core/syncresourcespeclogic.go b/internal/logic/core/syncresourcespeclogic.go index ceada07d9..da7d51161 100644 --- a/internal/logic/core/syncresourcespeclogic.go +++ b/internal/logic/core/syncresourcespeclogic.go @@ -51,7 +51,7 @@ func (l *SyncResourceSpecLogic) SyncResourceSpec(req *types.SyncResourceReq) (re } for _, response := range apiResources { // 转换API响应到数据库模型 - _, apiSpecs, err := compareLogic.processAPIResponse(response) + _, apiSpecs, err := compareLogic.processAPIResponse(response, req.UserId) if err != nil { return nil, err } diff --git a/internal/storeLink/modelarts.go b/internal/storeLink/modelarts.go index 7a3d0b2c2..af3c517c7 100644 --- a/internal/storeLink/modelarts.go +++ b/internal/storeLink/modelarts.go @@ -162,8 +162,14 @@ func (m *ModelArtsLink) SubmitTask(ctx context.Context, imageId string, cmd stri outputs := make([]*modelarts.OutputTraining, 0) outputValue := "" for _, env := range envs { - s := strings.Split(env, COMMA) - environments[s[0]] = s[1] + // 找到第一个逗号位置 + idx := strings.Index(env, COMMA) + if idx == -1 { + continue + } + key := strings.TrimSpace(env[:idx]) + value := strings.TrimSpace(env[idx+1:]) + environments[key] = value } for _, param := range params { s := strings.Split(param, COMMA) diff --git a/internal/types/types.go b/internal/types/types.go index cb131df27..4d106b80c 100644 --- a/internal/types/types.go +++ b/internal/types/types.go @@ -2283,6 +2283,7 @@ type Fault struct { type FetchResourceSpecReq struct { ClusterId string `form:"clusterId,optional"` Tag string `form:"tag,optional"` + UserId int64 `form:"userId,optional"` } type Fields struct { @@ -5566,7 +5567,8 @@ type SyncClusterAlertReq struct { } type SyncResourceReq struct { - Id string `json:"id"` + Id string `json:"id"` + UserId int64 `json:"userId"` } type Tags struct { From a7f90f139acaa4b6296affdbc6249dcd26cb223d Mon Sep 17 00:00:00 2001 From: jagger Date: Fri, 25 Jul 2025 20:25:02 +0800 Subject: [PATCH 2/6] add optional UserId field to resource specifications; enhance user tracking in resource updates Signed-off-by: jagger --- desc/core/pcm-core.api | 3 ++- internal/handler/core/editresourcespechandler.go | 10 ++++++++++ internal/logic/core/editresourcespeclogic.go | 5 +++-- internal/types/types.go | 4 +++- 4 files changed, 18 insertions(+), 4 deletions(-) diff --git a/desc/core/pcm-core.api b/desc/core/pcm-core.api index e826fdbb5..7a4f68d26 100644 --- a/desc/core/pcm-core.api +++ b/desc/core/pcm-core.api @@ -1480,9 +1480,10 @@ type EditResourceReq { CpuUnit string `json:"cpuUnit,optional"` MemoryValue string `json:"memoryValue,optional"` MemoryUnit string `json:"memoryUnit,optional"` + UserId int64 `json:"userId,optional"` } type SyncResourceReq { Id string `json:"id"` - UserId int64 `json:"userId"` + UserId int64 `json:"userId,optional"` } \ No newline at end of file diff --git a/internal/handler/core/editresourcespechandler.go b/internal/handler/core/editresourcespechandler.go index 2f6d55a9a..fad117277 100644 --- a/internal/handler/core/editresourcespechandler.go +++ b/internal/handler/core/editresourcespechandler.go @@ -6,6 +6,7 @@ import ( "gitlink.org.cn/JointCloud/pcm-coordinator/internal/svc" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/types" "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/repository/result" + "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/utils" "net/http" ) @@ -17,6 +18,15 @@ func EditResourceSpecHandler(svcCtx *svc.ServiceContext) http.HandlerFunc { return } + token := r.Header.Get("Authorization") + // 获取用户信息 + jccUserInfo, err := utils.ParseTokenWithoutVerify(token) + if err != nil { + result.ParamErrorResult(r, w, err) + return + } + req.UserId = jccUserInfo.Id + l := core.NewEditResourceSpecLogic(r.Context(), svcCtx) resp, err := l.EditResourceSpec(&req) result.HttpResult(r, w, resp, err) diff --git a/internal/logic/core/editresourcespeclogic.go b/internal/logic/core/editresourcespeclogic.go index e6ef8a5b7..c3893defc 100644 --- a/internal/logic/core/editresourcespeclogic.go +++ b/internal/logic/core/editresourcespeclogic.go @@ -58,7 +58,7 @@ func (l *EditResourceSpecLogic) EditResourceSpec(req *types.EditResourceReq) (re costPerUnit := utils.StringToFloat64(req.CostPerUnit) // 4. 更新主资源规格 - if err = updateMainResourceSpec(tx, req.Id, statusInt, req.CostType, costPerUnit); err != nil { + if err = updateMainResourceSpec(tx, req.Id, statusInt, req.CostType, costPerUnit, req.UserId); err != nil { return nil, err } @@ -98,13 +98,14 @@ func validateRequestParams(req *types.EditResourceReq) error { } // updateMainResourceSpec 更新主资源规格 -func updateMainResourceSpec(tx *gorm.DB, id int64, status int64, costType string, costPerUnit float64) error { +func updateMainResourceSpec(tx *gorm.DB, id int64, status int64, costType string, costPerUnit float64, userId int64) error { return tx.Model(&models.TResourceSpec{}). Where("id = ?", id). Updates(map[string]interface{}{ "status": status, "cost_type": costType, "cost_per_unit": costPerUnit, + "user_id": userId, }). Error } diff --git a/internal/types/types.go b/internal/types/types.go index 4d106b80c..319afae9e 100644 --- a/internal/types/types.go +++ b/internal/types/types.go @@ -2145,6 +2145,8 @@ type EditResourceReq struct { CpuUnit string `json:"cpuUnit,optional"` MemoryValue string `json:"memoryValue,optional"` MemoryUnit string `json:"memoryUnit,optional"` + + UserId int64 `json:"userId,optional"` } type EndpointsReq struct { @@ -5568,7 +5570,7 @@ type SyncClusterAlertReq struct { type SyncResourceReq struct { Id string `json:"id"` - UserId int64 `json:"userId"` + UserId int64 `json:"userId,optional"` } type Tags struct { From b574a805165e4f403f02edcfdc9e38a8baa8d3ff Mon Sep 17 00:00:00 2001 From: jagger Date: Tue, 29 Jul 2025 18:20:26 +0800 Subject: [PATCH 3/6] add GetClusterBaseInfo handler and logic; implement API for retrieving cluster base information Signed-off-by: jagger --- desc/core/pcm-core.api | 10 +++ desc/pcm.api | 3 + .../adapters/getclusterbaseinfohandler.go | 24 ++++++ internal/handler/routes.go | 40 ++------- .../logic/adapters/getclusterbaseinfologic.go | 84 +++++++++++++++++++ internal/types/types.go | 21 +++-- 6 files changed, 142 insertions(+), 40 deletions(-) create mode 100644 internal/handler/adapters/getclusterbaseinfohandler.go create mode 100644 internal/logic/adapters/getclusterbaseinfologic.go diff --git a/desc/core/pcm-core.api b/desc/core/pcm-core.api index 7a4f68d26..58894986d 100644 --- a/desc/core/pcm-core.api +++ b/desc/core/pcm-core.api @@ -947,6 +947,16 @@ type ( ProxyEnable string `json:"proxyEnable,omitempty" db:"proxy_enable"` Driver string `json:"driver,omitempty" db:"driver"` } + + ClusterBaseInfo { + Id string `json:"id,omitempty" db:"id"` + AdapterId int64 `json:"adapterId,omitempty,string" db:"adapter_id"` + Name string `json:"name,omitempty" db:"name"` + Nickname string `json:"nickname,omitempty" db:"nickname"` + Description string `json:"description,omitempty" db:"description"` + Server string `json:"server,omitempty" db:"server"` + Driver string `json:"driver,omitempty" db:"driver"` + } ) type ClusterDelReq { diff --git a/desc/pcm.api b/desc/pcm.api index 41a9737a1..9df89a97d 100644 --- a/desc/pcm.api +++ b/desc/pcm.api @@ -948,6 +948,9 @@ service pcm { @handler GetAdapterInfoHandler get /adapter/getAdapterInfo (adapterInfoNameReq) returns (adapterInfoNameReqResp) + + @handler GetClusterBaseInfoHandler + get /adapter/cluster/getClusterBaseInfo (ClusterReq) returns (PageResult) } @server ( diff --git a/internal/handler/adapters/getclusterbaseinfohandler.go b/internal/handler/adapters/getclusterbaseinfohandler.go new file mode 100644 index 000000000..747faf149 --- /dev/null +++ b/internal/handler/adapters/getclusterbaseinfohandler.go @@ -0,0 +1,24 @@ +package adapters + +import ( + "github.com/zeromicro/go-zero/rest/httpx" + "gitlink.org.cn/JointCloud/pcm-coordinator/internal/logic/adapters" + "gitlink.org.cn/JointCloud/pcm-coordinator/internal/svc" + "gitlink.org.cn/JointCloud/pcm-coordinator/internal/types" + "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/repository/result" + "net/http" +) + +func GetClusterBaseInfoHandler(svcCtx *svc.ServiceContext) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + var req types.ClusterReq + if err := httpx.Parse(r, &req); err != nil { + result.ParamErrorResult(r, w, err) + return + } + + l := adapters.NewGetClusterBaseInfoLogic(r.Context(), svcCtx) + resp, err := l.GetClusterBaseInfo(&req) + result.HttpResult(r, w, resp, err) + } +} diff --git a/internal/handler/routes.go b/internal/handler/routes.go index 03cdfbeed..3a303022e 100644 --- a/internal/handler/routes.go +++ b/internal/handler/routes.go @@ -39,6 +39,11 @@ func RegisterHandlers(server *rest.Server, serverCtx *svc.ServiceContext) { Path: "/adapter/cluster/get", Handler: adapters.GetClusterHandler(serverCtx), }, + { + Method: http.MethodGet, + Path: "/adapter/cluster/getClusterBaseInfo", + Handler: adapters.GetClusterBaseInfoHandler(serverCtx), + }, { Method: http.MethodGet, Path: "/adapter/cluster/list", @@ -98,7 +103,7 @@ func RegisterHandlers(server *rest.Server, serverCtx *svc.ServiceContext) { { // 创建算法 Method: http.MethodPost, - Path: "/ai/createAlgorithm", + Path: "/ai/CreateAlgorithm/:projectId", Handler: ai.CreateAlgorithmHandler(serverCtx), }, { @@ -218,22 +223,9 @@ func RegisterHandlers(server *rest.Server, serverCtx *svc.ServiceContext) { { // 创建数据集 Method: http.MethodPost, - Path: "/ai/createDataSet", + Path: "/ai/createDataSet/:projectId", Handler: ai.CreateDataSetHandler(serverCtx), }, - { - // 创建模型 - Method: http.MethodPost, - Path: "/ai/createModel", - Handler: ai.CreateModelHandler(serverCtx), - }, - - { - // 创建模型 - Method: http.MethodPost, - Path: "/ai/task/sync", - Handler: ai.TaskResultSyncHandler(serverCtx), - }, { // 创建notebook Method: http.MethodPost, @@ -376,24 +368,6 @@ func RegisterHandlers(server *rest.Server, serverCtx *svc.ServiceContext) { Path: "/task/list", Handler: cloud.CloudListHandler(serverCtx), }, - { - // 创建容器 - Method: http.MethodPost, - Path: "/cloud/container/create", - Handler: cloud.ContainerCreateHandler(serverCtx), - }, - { - // 删除容器 - Method: http.MethodDelete, - Path: "/cloud/container/delete", - Handler: cloud.ContainerDeleteHandler(serverCtx), - }, - { - // 获取容器 - Method: http.MethodGet, - Path: "/cloud/container/get", - Handler: cloud.ContainerGetHandler(serverCtx), - }, }, rest.WithPrefix("/pcm/v1"), ) diff --git a/internal/logic/adapters/getclusterbaseinfologic.go b/internal/logic/adapters/getclusterbaseinfologic.go new file mode 100644 index 000000000..24392937d --- /dev/null +++ b/internal/logic/adapters/getclusterbaseinfologic.go @@ -0,0 +1,84 @@ +package adapters + +import ( + "context" + + "gitlink.org.cn/JointCloud/pcm-coordinator/internal/svc" + "gitlink.org.cn/JointCloud/pcm-coordinator/internal/types" + + "github.com/zeromicro/go-zero/core/logx" +) + +type GetClusterBaseInfoLogic struct { + logx.Logger + ctx context.Context + svcCtx *svc.ServiceContext +} + +func NewGetClusterBaseInfoLogic(ctx context.Context, svcCtx *svc.ServiceContext) *GetClusterBaseInfoLogic { + return &GetClusterBaseInfoLogic{ + Logger: logx.WithContext(ctx), + ctx: ctx, + svcCtx: svcCtx, + } +} + +func (l *GetClusterBaseInfoLogic) GetClusterBaseInfo(req *types.ClusterReq) (resp *types.PageResult, err error) { + limit := req.PageSize + offset := req.PageSize * (req.PageNum - 1) + resp = &types.PageResult{} + var list []types.ClusterBaseInfo + db := l.svcCtx.DbEngin.Model(&types.AdapterInfo{}).Table("t_cluster") + + db = db.Joins("left join t_adapter on t_adapter.id = t_cluster.adapter_id"). + Where("t_cluster.deleted_at is null") + if req.Name != "" { + db = db.Where("t_cluster.name LIKE ?", "%"+req.Name+"%") + } + if req.AdapterId != "" { + db = db.Where("t_cluster.adapter_id = ?", req.AdapterId) + } + if req.Nickname != "" { + db = db.Where("t_cluster.nickname LIKE ?", "%"+req.Nickname+"%") + } + if req.Label != "" { + db = db.Where("t_cluster.label = ?", req.Label) + } + if req.Version != "" { + db = db.Where("t_cluster.version = ?", req.Version) + } + if req.ProducerDict != "" { + db = db.Where("t_cluster.producer_dict = ?", req.ProducerDict) + } + if req.RegionDict != "" { + db = db.Where("t_cluster.region_dict = ?", req.RegionDict) + } + if req.Type != "" { + db = db.Where("t_adapter.type = ?", req.Type) + } + if req.ResourceType != "" { + db = db.Where("t_adapter.resource_type = ?", req.ResourceType) + } + if req.StorageSchedule != "" { + db = db.Where("t_cluster.storage_schedule = ?", req.StorageSchedule) + } + + //count total + var total int64 + err = db.Select("*").Count(&total).Error + if err != nil { + return resp, err + } + + db = db.Limit(limit).Offset(offset) + err = db.Select("t_cluster.*").Order("t_cluster.create_time desc").Scan(&list).Error + if err != nil { + return resp, err + } + resp.List = list + resp.PageSize = req.PageSize + resp.PageNum = req.PageNum + resp.Total = total + + return resp, nil +} diff --git a/internal/types/types.go b/internal/types/types.go index 319afae9e..fb522be35 100644 --- a/internal/types/types.go +++ b/internal/types/types.go @@ -700,6 +700,16 @@ type ClusterAvail struct { ClusterName string `json:"clusterName"` } +type ClusterBaseInfo struct { + Id string `json:"id,omitempty" db:"id"` + AdapterId int64 `json:"adapterId,omitempty,string" db:"adapter_id"` + Name string `json:"name,omitempty" db:"name"` + Nickname string `json:"nickname,omitempty" db:"nickname"` + Description string `json:"description,omitempty" db:"description"` + Server string `json:"server,omitempty" db:"server"` + Driver string `json:"driver,omitempty" db:"driver"` +} + type ClusterCreateReq struct { Id string `json:"id,optional"` AdapterId string `json:"adapterId,optional"` @@ -718,14 +728,13 @@ type ClusterCreateReq struct { Version string `json:"version,optional"` Label string `json:"label,optional"` OwnerId string `json:"ownerId,omitempty,optional"` - AuthType int32 `json:"authType,optional"` + AuthType string `json:"authType,optional"` ProducerDict string `json:"producerDict,optional"` RegionDict string `json:"regionDict,optional"` RegionName string `json:"regionName,optional"` Environment map[string]string `json:"environment,optional"` CostType string `json:"costType,optional"` Price int `json:"price,optional"` - Status string `json:"status,optional"` } type ClusterData struct { @@ -757,7 +766,7 @@ type ClusterInfo struct { Version string `json:"version,omitempty" db:"version"` Label string `json:"label,omitempty" db:"label"` OwnerId string `json:"ownerId,omitempty" db:"owner_id"` - AuthType int32 `json:"authType,omitempty" db:"auth_type"` + AuthType string `json:"authType,omitempty" db:"auth_type"` ProducerDict string `json:"producerDict,omitempty" db:"producer_dict"` RegionDict string `json:"regionDict,omitempty" db:"region_dict"` Location string `json:"location,omitempty" db:"location"` @@ -769,7 +778,6 @@ type ClusterInfo struct { ProxyAddress string `json:"proxyAddress,omitempty" db:"proxy_address"` ProxyEnable string `json:"proxyEnable,omitempty" db:"proxy_enable"` Driver string `json:"driver,omitempty" db:"driver"` - Status string `json:"status,omitempty" db:"status"` } type ClusterListResp struct { @@ -825,7 +833,7 @@ type ClusterRelationInfo struct { CVersion string `json:"cVersion,omitempty" db:"version"` CLabel string `json:"cLabel,omitempty" db:"label"` COwnerId string `json:"cOwnerId,omitempty" db:"owner_id"` - CAuthType int32 `json:"cAuthType,omitempty" db:"auth_type"` + CAuthType string `json:"cAuthType,omitempty" db:"auth_type"` CRegionDict string `json:"cRegionDict,omitempty" db:"-"` CProducerDict string `json:"cProducerDict,omitempty" db:"-"` CCreateTime string `json:"cCreateTime,omitempty" db:"created_time" gorm:"autoCreateTime"` @@ -2145,8 +2153,7 @@ type EditResourceReq struct { CpuUnit string `json:"cpuUnit,optional"` MemoryValue string `json:"memoryValue,optional"` MemoryUnit string `json:"memoryUnit,optional"` - - UserId int64 `json:"userId,optional"` + UserId int64 `json:"userId,optional"` } type EndpointsReq struct { From 31859814389ed66c33a4041a3a2d0d13eff2f4fd Mon Sep 17 00:00:00 2001 From: jagger Date: Wed, 30 Jul 2025 09:35:56 +0800 Subject: [PATCH 4/6] refactor API endpoints; remove projectId from createDataSet and createAlgorithm paths, add new endpoints for model and container operations Signed-off-by: jagger --- desc/pcm.api | 4 ++-- internal/handler/routes.go | 35 +++++++++++++++++++++++++++++++++-- internal/types/types.go | 11 +++++++---- 3 files changed, 42 insertions(+), 8 deletions(-) diff --git a/desc/pcm.api b/desc/pcm.api index 9df89a97d..3463bc41d 100644 --- a/desc/pcm.api +++ b/desc/pcm.api @@ -338,7 +338,7 @@ service pcm { @doc "创建数据集" @handler CreateDataSetHandler - post /ai/createDataSet/:projectId (CreateDataSetReq) returns (CreateDataSetResp) + post /ai/createDataSet (CreateDataSetReq) returns (CreateDataSetResp) @doc "删除数据集" @handler DeleteDataSetHandler @@ -362,7 +362,7 @@ service pcm { @doc "创建算法" @handler CreateAlgorithmHandler - post /ai/CreateAlgorithm/:projectId (CreateAlgorithmReq) returns (CreateAlgorithmResp) + post /ai/createAlgorithm (CreateAlgorithmReq) returns (CreateAlgorithmResp) @doc "查询创建算法列表" @handler ListAlgorithms diff --git a/internal/handler/routes.go b/internal/handler/routes.go index 3a303022e..4a7dec33a 100644 --- a/internal/handler/routes.go +++ b/internal/handler/routes.go @@ -103,7 +103,7 @@ func RegisterHandlers(server *rest.Server, serverCtx *svc.ServiceContext) { { // 创建算法 Method: http.MethodPost, - Path: "/ai/CreateAlgorithm/:projectId", + Path: "/ai/createAlgorithm", Handler: ai.CreateAlgorithmHandler(serverCtx), }, { @@ -223,9 +223,22 @@ func RegisterHandlers(server *rest.Server, serverCtx *svc.ServiceContext) { { // 创建数据集 Method: http.MethodPost, - Path: "/ai/createDataSet/:projectId", + Path: "/ai/createDataSet", Handler: ai.CreateDataSetHandler(serverCtx), }, + { + // 创建模型 + Method: http.MethodPost, + Path: "/ai/createModel", + Handler: ai.CreateModelHandler(serverCtx), + }, + + { + // 创建模型 + Method: http.MethodPost, + Path: "/ai/task/sync", + Handler: ai.TaskResultSyncHandler(serverCtx), + }, { // 创建notebook Method: http.MethodPost, @@ -368,6 +381,24 @@ func RegisterHandlers(server *rest.Server, serverCtx *svc.ServiceContext) { Path: "/task/list", Handler: cloud.CloudListHandler(serverCtx), }, + { + // 创建容器 + Method: http.MethodPost, + Path: "/cloud/container/create", + Handler: cloud.ContainerCreateHandler(serverCtx), + }, + { + // 删除容器 + Method: http.MethodDelete, + Path: "/cloud/container/delete", + Handler: cloud.ContainerDeleteHandler(serverCtx), + }, + { + // 获取容器 + Method: http.MethodGet, + Path: "/cloud/container/get", + Handler: cloud.ContainerGetHandler(serverCtx), + }, }, rest.WithPrefix("/pcm/v1"), ) diff --git a/internal/types/types.go b/internal/types/types.go index fb522be35..1aec0096f 100644 --- a/internal/types/types.go +++ b/internal/types/types.go @@ -728,13 +728,14 @@ type ClusterCreateReq struct { Version string `json:"version,optional"` Label string `json:"label,optional"` OwnerId string `json:"ownerId,omitempty,optional"` - AuthType string `json:"authType,optional"` + AuthType int32 `json:"authType,optional"` ProducerDict string `json:"producerDict,optional"` RegionDict string `json:"regionDict,optional"` RegionName string `json:"regionName,optional"` Environment map[string]string `json:"environment,optional"` CostType string `json:"costType,optional"` Price int `json:"price,optional"` + Status string `json:"status,optional"` } type ClusterData struct { @@ -766,7 +767,7 @@ type ClusterInfo struct { Version string `json:"version,omitempty" db:"version"` Label string `json:"label,omitempty" db:"label"` OwnerId string `json:"ownerId,omitempty" db:"owner_id"` - AuthType string `json:"authType,omitempty" db:"auth_type"` + AuthType int32 `json:"authType,omitempty" db:"auth_type"` ProducerDict string `json:"producerDict,omitempty" db:"producer_dict"` RegionDict string `json:"regionDict,omitempty" db:"region_dict"` Location string `json:"location,omitempty" db:"location"` @@ -778,6 +779,7 @@ type ClusterInfo struct { ProxyAddress string `json:"proxyAddress,omitempty" db:"proxy_address"` ProxyEnable string `json:"proxyEnable,omitempty" db:"proxy_enable"` Driver string `json:"driver,omitempty" db:"driver"` + Status string `json:"status,omitempty" db:"status"` } type ClusterListResp struct { @@ -833,7 +835,7 @@ type ClusterRelationInfo struct { CVersion string `json:"cVersion,omitempty" db:"version"` CLabel string `json:"cLabel,omitempty" db:"label"` COwnerId string `json:"cOwnerId,omitempty" db:"owner_id"` - CAuthType string `json:"cAuthType,omitempty" db:"auth_type"` + CAuthType int32 `json:"cAuthType,omitempty" db:"auth_type"` CRegionDict string `json:"cRegionDict,omitempty" db:"-"` CProducerDict string `json:"cProducerDict,omitempty" db:"-"` CCreateTime string `json:"cCreateTime,omitempty" db:"created_time" gorm:"autoCreateTime"` @@ -2153,7 +2155,8 @@ type EditResourceReq struct { CpuUnit string `json:"cpuUnit,optional"` MemoryValue string `json:"memoryValue,optional"` MemoryUnit string `json:"memoryUnit,optional"` - UserId int64 `json:"userId,optional"` + + UserId int64 `json:"userId,optional"` } type EndpointsReq struct { From 80f07495d52cd0a956069e4d1c50cfbc6a2566a3 Mon Sep 17 00:00:00 2001 From: jagger Date: Wed, 30 Jul 2025 11:24:40 +0800 Subject: [PATCH 5/6] refactor UpdateHpcTaskStatus; streamline task synchronization logic, improve error handling, and enhance status reporting Signed-off-by: jagger --- .../service/utils/status/hpc_task_sync.go | 284 +++++++++++++----- 1 file changed, 214 insertions(+), 70 deletions(-) diff --git a/internal/scheduler/service/utils/status/hpc_task_sync.go b/internal/scheduler/service/utils/status/hpc_task_sync.go index 084e25a2d..2e4d1539d 100644 --- a/internal/scheduler/service/utils/status/hpc_task_sync.go +++ b/internal/scheduler/service/utils/status/hpc_task_sync.go @@ -1,6 +1,8 @@ package status import ( + "context" + "fmt" jsoniter "github.com/json-iterator/go" "github.com/rs/zerolog/log" "github.com/zeromicro/go-zero/core/logx" @@ -10,8 +12,9 @@ import ( "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/constants" "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/models" "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/utils" - "net/http" + "gorm.io/gorm" "strconv" + "time" ) func reportHpcStatusMessages(svc *svc.ServiceContext, task *types.TaskModel, hpcTask *models.TaskHpc, status bool, message string) error { @@ -38,84 +41,225 @@ func reportHpcStatusMessages(svc *svc.ServiceContext, task *types.TaskModel, hpc } // UpdateHpcTaskStatus 更新超算任务状态,并通知中间件 +//func UpdateHpcTaskStatus(svc *svc.ServiceContext) { +// svc.Scheduler.HpcService.TaskSyncLock.Lock() +// defer svc.Scheduler.HpcService.TaskSyncLock.Unlock() +// taskHpcs := make([]*models.TaskHpc, 0) +// sqlStr := `SELECT * +// FROM task_hpc +// WHERE +// job_id != '' +// AND ( +// status NOT IN ('Failed', 'Completed', 'Cancelled') +// OR start_time < created_time +// ) +// ORDER BY created_time DESC +// LIMIT 10` +// db := svc.DbEngin.Raw(sqlStr).Scan(&taskHpcs) +// if db.Error != nil { +// logx.Errorf(db.Error.Error()) +// return +// } +// for _, hpc := range taskHpcs { +// //更新task表的超算任务状态 +// task := &types.TaskModel{} +// tx := svc.DbEngin.Model(models.Task{}).Where("id", hpc.TaskId).Scan(&task) +// if tx.Error != nil { +// logx.Errorf(tx.Error.Error()) +// break +// } +// clusterId := utils.Int64ToString(hpc.ClusterId) +// h := http.Request{} +// hpcTask, err := svc.Scheduler.HpcService.HpcExecutorAdapterMap[strconv.FormatInt(hpc.AdapterId, 10)].GetTask(h.Context(), hpc.JobId, clusterId) +// if err != nil { +// logx.Errorf(err.Error()) +// break +// } +// switch hpcTask.Status { +// case constants.Running: +// if hpc.Status != hpcTask.Status { +// svc.Scheduler.HpcStorages.AddNoticeInfo(strconv.FormatInt(hpc.AdapterId, 10), hpc.AdapterName, strconv.FormatInt(hpc.ClusterId, 10), hpc.ClusterName, hpc.Name, "running", "任务运行中") +// hpc.Status = hpcTask.Status +// task.Status = hpcTask.Status +// } +// case constants.Failed: +// if hpc.Status != hpcTask.Status { +// svc.Scheduler.HpcStorages.AddNoticeInfo(strconv.FormatInt(hpc.AdapterId, 10), hpc.AdapterName, strconv.FormatInt(hpc.ClusterId, 10), hpc.ClusterName, hpc.Name, "failed", "任务失败") +// hpc.Status = hpcTask.Status +// task.Status = hpcTask.Status +// logx.Infof("[%v]:任务执行失败,发送通知, 任务状态: [%v]", hpcTask, hpcTask.Status) +// _ = reportHpcStatusMessages(svc, task, hpc, false, "任务失败") +// } +// case constants.Completed: +// if hpc.Status != hpcTask.Status { +// svc.Scheduler.HpcStorages.AddNoticeInfo(strconv.FormatInt(hpc.AdapterId, 10), hpc.AdapterName, strconv.FormatInt(hpc.ClusterId, 10), hpc.ClusterName, hpc.Name, "completed", "任务完成") +// hpc.Status = hpcTask.Status +// task.Status = hpcTask.Status +// logx.Infof("[%v]:任务执行完成,发送通知, 任务状态: [%v]", hpcTask, hpcTask.Status) +// _ = reportHpcStatusMessages(svc, task, hpc, true, "任务完成") +// } +// default: +// if hpc.Status != hpcTask.Status { +// svc.Scheduler.HpcStorages.AddNoticeInfo(strconv.FormatInt(hpc.AdapterId, 10), hpc.AdapterName, strconv.FormatInt(hpc.ClusterId, 10), hpc.ClusterName, hpc.Name, "pending", "任务pending") +// hpc.Status = hpcTask.Status +// task.Status = hpcTask.Status +// } +// } +// //task.Id=hpcTask. +// task.StartTime = hpcTask.Start +// task.EndTime = hpcTask.End +// hpc.StartTime = hpcTask.Start +// hpc.EndTime = hpcTask.End +// logx.Info("# task 开始时间: %v, 结束时间: %v", task.StartTime, task.EndTime) +// err = svc.Scheduler.HpcStorages.UpdateTask(task) +// if err != nil { +// logx.Errorf(err.Error()) +// break +// } +// err = svc.Scheduler.HpcStorages.UpdateHpcTask(hpc) +// if err != nil { +// logx.Errorf(err.Error()) +// break +// } +// } +//} + +// UpdateHpcTaskStatus HPC 任务状态同步函数 func UpdateHpcTaskStatus(svc *svc.ServiceContext) { - svc.Scheduler.HpcService.TaskSyncLock.Lock() - defer svc.Scheduler.HpcService.TaskSyncLock.Unlock() - taskList := make([]*models.TaskHpc, 0) - sqlStr := `SELECT * - FROM task_hpc - WHERE - job_id != '' - AND ( - status NOT IN ('Failed', 'Completed', 'Cancelled') - OR start_time < created_time - ) - ORDER BY created_time DESC - LIMIT 10` - db := svc.DbEngin.Raw(sqlStr).Scan(&taskList) - if db.Error != nil { - logx.Errorf(db.Error.Error()) + // 1. 查询需要同步的 HPC 任务 + var hpcTasks []*models.TaskHpc + sqlStr := `SELECT * FROM task_hpc WHERE job_id != '' AND status NOT IN ('Failed', 'Completed', 'Cancelled') OR start_time < created_time ORDER BY created_time DESC LIMIT 10` + if err := svc.DbEngin.Raw(sqlStr).Scan(&hpcTasks).Error; err != nil { + logx.Errorf("Failed to query HPC tasks for sync: %v", err) return } - for _, hpc := range taskList { - //更新task表的超算任务状态 - task := &types.TaskModel{} - tx := svc.DbEngin.Model(models.Task{}).Where("id", hpc.TaskId).Scan(&task) - if tx.Error != nil { - logx.Errorf(tx.Error.Error()) - break + + if len(hpcTasks) == 0 { + return + } + + // 2. 批量获取关联的 Task 模型 + taskIDs := make([]int64, len(hpcTasks)) + for i, hpc := range hpcTasks { + taskIDs[i] = hpc.TaskId + } + + taskMap := make(map[int64]*types.TaskModel) + var tasks []*types.TaskModel + if err := svc.DbEngin.Model(&models.Task{}).Where("id IN ?", taskIDs).Find(&tasks).Error; err != nil { + logx.Errorf("Failed to batch query tasks: %v", err) + return + } + for _, task := range tasks { + taskMap[task.Id] = task + } + + // 3. 遍历 HPC 任务并更新状态 + for _, hpc := range hpcTasks { + task, ok := taskMap[hpc.TaskId] + if !ok { + logx.Errorf("Task with ID %d not found for HPC task %d, skipping", hpc.TaskId, hpc.Id) + continue } - clusterId := utils.Int64ToString(hpc.ClusterId) - h := http.Request{} - hpcTask, err := svc.Scheduler.HpcService.HpcExecutorAdapterMap[strconv.FormatInt(hpc.AdapterId, 10)].GetTask(h.Context(), hpc.JobId, clusterId) + + // 使用带超时的 Context,防止 API 调用阻塞 + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + adapterIDStr := strconv.FormatInt(hpc.AdapterId, 10) + adapter, adapterExists := svc.Scheduler.HpcService.HpcExecutorAdapterMap[adapterIDStr] + if !adapterExists { + logx.Errorf("HPC adapter with ID %s not found, skipping task %s", adapterIDStr, hpc.Name) + continue + } + + // 4. 从 HPC 集群获取最新状态 + hpcTaskInfo, err := adapter.GetTask(ctx, hpc.JobId, utils.Int64ToString(hpc.ClusterId)) if err != nil { - logx.Errorf(err.Error()) - break + logx.Errorf("Failed to get task status from HPC executor for job %s: %v", hpc.JobId, err) + continue // 继续处理下一个任务 } - switch hpcTask.Status { - case constants.Running: - if hpc.Status != hpcTask.Status { - svc.Scheduler.HpcStorages.AddNoticeInfo(strconv.FormatInt(hpc.AdapterId, 10), hpc.AdapterName, strconv.FormatInt(hpc.ClusterId, 10), hpc.ClusterName, hpc.Name, "running", "任务运行中") - hpc.Status = hpcTask.Status - task.Status = hpcTask.Status - } - case constants.Failed: - if hpc.Status != hpcTask.Status { - svc.Scheduler.HpcStorages.AddNoticeInfo(strconv.FormatInt(hpc.AdapterId, 10), hpc.AdapterName, strconv.FormatInt(hpc.ClusterId, 10), hpc.ClusterName, hpc.Name, "failed", "任务失败") - hpc.Status = hpcTask.Status - task.Status = hpcTask.Status - logx.Infof("[%v]:任务执行失败,发送通知, 任务状态: [%v]", hpcTask, hpcTask.Status) - _ = reportHpcStatusMessages(svc, task, hpc, false, "任务失败") - } - case constants.Completed: - if hpc.Status != hpcTask.Status { - svc.Scheduler.HpcStorages.AddNoticeInfo(strconv.FormatInt(hpc.AdapterId, 10), hpc.AdapterName, strconv.FormatInt(hpc.ClusterId, 10), hpc.ClusterName, hpc.Name, "completed", "任务完成") - hpc.Status = hpcTask.Status - task.Status = hpcTask.Status - logx.Infof("[%v]:任务执行完成,发送通知, 任务状态: [%v]", hpcTask, hpcTask.Status) - _ = reportHpcStatusMessages(svc, task, hpc, true, "任务完成") - } - default: - if hpc.Status != hpcTask.Status { - svc.Scheduler.HpcStorages.AddNoticeInfo(strconv.FormatInt(hpc.AdapterId, 10), hpc.AdapterName, strconv.FormatInt(hpc.ClusterId, 10), hpc.ClusterName, hpc.Name, "pending", "任务pending") - hpc.Status = hpcTask.Status - task.Status = hpcTask.Status - } + + // 如果状态没有变化,则跳过 + if hpc.Status == hpcTaskInfo.Status { + continue } - task.StartTime = hpcTask.Start - task.EndTime = hpcTask.End - hpc.StartTime = hpcTask.Start - hpc.EndTime = hpcTask.End - logx.Info("# task 开始时间: %v, 结束时间: %v", task.StartTime, task.EndTime) - err = svc.Scheduler.HpcStorages.UpdateTask(task) + + // 5. 准备更新 + previousStatus := hpc.Status + hpc.Status = hpcTaskInfo.Status + hpc.StartTime = hpcTaskInfo.Start + hpc.EndTime = hpcTaskInfo.End + + task.Status = hpcTaskInfo.Status + task.StartTime = hpcTaskInfo.Start + task.EndTime = hpcTaskInfo.End + + logx.Infof("HPC task status change detected for job %s: %s -> %s", hpc.JobId, previousStatus, hpc.Status) + + // 6. 在事务中更新数据库 + err = svc.DbEngin.Transaction(func(tx *gorm.DB) error { + if err := tx.Save(task).Error; err != nil { + return fmt.Errorf("failed to update task table: %w", err) + } + if err := tx.Save(hpc).Error; err != nil { + return fmt.Errorf("failed to update hpc_task table: %w", err) + } + return nil + }) + if err != nil { - logx.Errorf(err.Error()) - break + logx.Errorf("Failed to update database in transaction for job %s: %v", hpc.JobId, err) + // 事务失败,回滚状态,继续处理下一个任务 + hpc.Status = previousStatus + task.Status = previousStatus + continue } - err = svc.Scheduler.HpcStorages.UpdateHpcTask(hpc) - if err != nil { - logx.Errorf(err.Error()) - break + + // 7. 根据新状态执行后续操作 (通知、报告等) + handleStatusChange(svc, task, hpc, hpcTaskInfo.Status) + } +} + +// handleStatusChange 根据新状态执行后续操作 +func handleStatusChange(svc *svc.ServiceContext, task *types.TaskModel, hpc *models.TaskHpc, newStatus string) { + adapterIDStr := strconv.FormatInt(hpc.AdapterId, 10) + clusterIDStr := strconv.FormatInt(hpc.ClusterId, 10) + var noticeType, noticeMessage string + var reportSuccess bool + var shouldReport bool + + switch newStatus { + case constants.Running: + noticeType = "running" + noticeMessage = "任务运行中" + case constants.Failed: + noticeType = "failed" + noticeMessage = "任务失败" + reportSuccess = false + shouldReport = true + case constants.Completed: + noticeType = "completed" + noticeMessage = "任务完成" + reportSuccess = true + shouldReport = true + case constants.Pending: + noticeType = "pending" + noticeMessage = "任务pending" + default: + // 对于其他未知状态,可以选择记录日志并返回 + logx.Errorf("Unhandled HPC task status '%s' for job %s", newStatus, hpc.JobId) + return + } + + // 发送通知 + svc.Scheduler.HpcStorages.AddNoticeInfo(adapterIDStr, hpc.AdapterName, clusterIDStr, hpc.ClusterName, hpc.Name, noticeType, noticeMessage) + logx.Infof("[%s]: 任务状态变更为 [%s],发送通知。", hpc.Name, newStatus) + + // 上报状态 + if shouldReport { + if err := reportHpcStatusMessages(svc, task, hpc, reportSuccess, noticeMessage); err != nil { + logx.Errorf("Failed to report HPC status for job %s: %v", hpc.JobId, err) } } } From 855d02da5c157147c31422c83068ba190d2d4669 Mon Sep 17 00:00:00 2001 From: jagger Date: Wed, 30 Jul 2025 11:44:05 +0800 Subject: [PATCH 6/6] refactor UpdateHpcTaskStatus; streamline task synchronization logic, improve error handling, and enhance status reporting Signed-off-by: jagger --- internal/scheduler/service/utils/status/hpc_task_sync.go | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/internal/scheduler/service/utils/status/hpc_task_sync.go b/internal/scheduler/service/utils/status/hpc_task_sync.go index 2e4d1539d..3ceef8956 100644 --- a/internal/scheduler/service/utils/status/hpc_task_sync.go +++ b/internal/scheduler/service/utils/status/hpc_task_sync.go @@ -128,7 +128,7 @@ func reportHpcStatusMessages(svc *svc.ServiceContext, task *types.TaskModel, hpc func UpdateHpcTaskStatus(svc *svc.ServiceContext) { // 1. 查询需要同步的 HPC 任务 var hpcTasks []*models.TaskHpc - sqlStr := `SELECT * FROM task_hpc WHERE job_id != '' AND status NOT IN ('Failed', 'Completed', 'Cancelled') OR start_time < created_time ORDER BY created_time DESC LIMIT 10` + sqlStr := `SELECT * FROM task_hpc WHERE job_id != '' AND status NOT IN ('Failed', 'Completed', 'Cancelled') ORDER BY created_time DESC LIMIT 10` if err := svc.DbEngin.Raw(sqlStr).Scan(&hpcTasks).Error; err != nil { logx.Errorf("Failed to query HPC tasks for sync: %v", err) return @@ -199,10 +199,11 @@ func UpdateHpcTaskStatus(svc *svc.ServiceContext) { // 6. 在事务中更新数据库 err = svc.DbEngin.Transaction(func(tx *gorm.DB) error { - if err := tx.Save(task).Error; err != nil { + task.UpdatedTime = time.Now().Format(constants.Layout) + if err := tx.Table("task").Updates(task).Error; err != nil { return fmt.Errorf("failed to update task table: %w", err) } - if err := tx.Save(hpc).Error; err != nil { + if err := tx.Table("task_hpc").Updates(hpc).Error; err != nil { return fmt.Errorf("failed to update hpc_task table: %w", err) } return nil