增加Job的结构体定义;优化TypeUnion的描述方式

This commit is contained in:
Sydonian 2023-09-06 10:43:49 +08:00
parent 4da3fc9042
commit cf756c3cd6
6 changed files with 102 additions and 42 deletions

View File

@ -46,7 +46,7 @@ func (svc *JobSetService) SetLocalFile(jobSetID string, localPath string, packag
for i := 0; i < len(info.Jobs); i++ {
switch rjob := info.Jobs[i].(type) {
case models.NormalJobInfo:
localFileInfo, ok := rjob.Files.Dataset.(models.LocalFileInfo)
localFileInfo, ok := rjob.Files.Dataset.(models.LocalJobFileInfo)
if ok && localFileInfo.LocalPath != "" {
needUpload = true
@ -56,7 +56,7 @@ func (svc *JobSetService) SetLocalFile(jobSetID string, localPath string, packag
}
}
localFileInfo, ok = rjob.Files.Code.(models.LocalFileInfo)
localFileInfo, ok = rjob.Files.Code.(models.LocalJobFileInfo)
if ok && localFileInfo.LocalPath != "" {
needUpload = true
@ -66,7 +66,7 @@ func (svc *JobSetService) SetLocalFile(jobSetID string, localPath string, packag
}
}
localFileInfo, ok = rjob.Files.Image.(models.LocalFileInfo)
localFileInfo, ok = rjob.Files.Image.(models.LocalJobFileInfo)
if ok && localFileInfo.LocalPath != "" {
needUpload = true

View File

@ -6,7 +6,6 @@ import (
"gitlink.org.cn/cloudream/common/models"
"gitlink.org.cn/cloudream/common/pkgs/logger"
"gitlink.org.cn/cloudream/common/pkgs/mq"
"gitlink.org.cn/cloudream/common/utils/serder"
"gitlink.org.cn/cloudream/scheduler/common/globals"
colmq "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq/collector"
)
@ -90,28 +89,5 @@ func (svc *Service) GetAllResourceData(msg *colmq.GetAllResourceData) (*colmq.Ge
return mq.ReplyFailed[colmq.GetAllResourceDataResp](errorcode.OperationFailed, "get all resource data failed")
}
var resourceTypeToModel = map[string]models.ResourceData{
models.ResourceTypeCPU: &models.CPUResourceData{},
models.ResourceTypeNPU: &models.NPUResourceData{},
models.ResourceTypeGPU: &models.GPUResourceData{},
models.ResourceTypeMLU: &models.MLUResourceData{},
models.ResourceTypeStorage: &models.StorageResourceData{},
models.ResourceTypeMemory: &models.MemoryResourceData{},
}
var datas []models.ResourceData
for _, resp := range *resps {
data, exists := resourceTypeToModel[resp.Name]
if !exists {
return mq.ReplyFailed[colmq.GetAllResourceDataResp](errorcode.OperationFailed, "invalid resource type")
}
if err = serder.AnyToAny(resp, data); err != nil {
logger.Warnf("get all resource data failed, err: %s", err.Error())
return mq.ReplyFailed[colmq.GetAllResourceDataResp](errorcode.OperationFailed, "get all resource data failed")
}
datas = append(datas, data)
}
return mq.ReplyOK(colmq.NewGetAllResourceDataResp(datas))
return mq.ReplyOK(colmq.NewGetAllResourceDataResp(*resps))
}

46
common/models/job/job.go Normal file
View File

@ -0,0 +1,46 @@
package job
import "gitlink.org.cn/cloudream/common/models"
type JobSet struct {
JobSetID string // 全局唯一的任务集ID
Jobs []JobSetJobRef // 任务集中包含的任务,只是一个引用
}
type JobSetJobRef struct {
JobID string // 任务ID
LocalJobID string // 在当前任务集内的任务ID
}
type NormalJob struct {
JobSetID string // 任务集ID
JobID string // 全局唯一任务ID
Info models.NormalJobInfo // 提交任务时提供的任务描述信息
State JobState // 任务当前的状态。包含当前在状态下执行操作所需的数据
Files JobFiles // 任务需要的文件
}
type JobFiles struct {
Dataset PackageJobFile
Code PackageJobFile
Image ImageJobFile
}
type PackageJobFile struct {
PackageID int64
}
type ImageJobFile struct {
ImageID string
}
type ResourceJob struct {
JobSetID string
JobID string
Info models.ResourceJobInfo
State JobState
TargetJobID string // 要进行回源操作的任务ID
ResourcePackageID int64 // 回源之后得到的PackageID
}

View File

@ -0,0 +1,31 @@
package job
type JobState interface{}
type StateSubmitted struct {
}
type StateReadyToAdjust struct {
}
type StateMakingAdjustPlan struct {
}
type StateAdjusting struct {
}
type StateReadyToExecute struct {
}
type StateRunning struct {
}
type StateResourcing struct {
}
type StateFailed struct {
LastState JobState
}
type StateFinished struct {
}

View File

@ -1,11 +1,27 @@
package task
import (
"gitlink.org.cn/cloudream/common/pkgs/mq"
"gitlink.org.cn/cloudream/common/pkgs/types"
myreflect "gitlink.org.cn/cloudream/common/utils/reflect"
)
type TaskStatus interface{}
// 增加了新类型后需要在这里也同步添加
type TaskStatusConst interface {
TaskStatus | ScheduleTaskStatus | UploadImageTaskStatus
TaskStatus | ScheduleTaskStatus | UploadImageTaskStatus | CacheMovePackageTaskStatus | CreatePackageTaskStatus | LoadPackageTaskStatus
}
// 增加了新类型后需要在这里也同步添加
var TaskStatusTypeUnion = types.NewTypeUnion[TaskStatus](
myreflect.TypeOf[ScheduleTaskStatus](),
myreflect.TypeOf[UploadImageTaskStatus](),
myreflect.TypeOf[CacheMovePackageTaskStatus](),
myreflect.TypeOf[CreatePackageTaskStatus](),
myreflect.TypeOf[LoadPackageTaskStatus](),
)
type ScheduleTaskStatus struct {
Status string `json:"status"`
Error string `json:"error"`
@ -71,3 +87,7 @@ func NewLoadPackageTaskStatus(status string, err string) LoadPackageTaskStatus {
Error: err,
}
}
func init() {
mq.RegisterUnionType(TaskStatusTypeUnion)
}

View File

@ -6,12 +6,9 @@ import (
"time"
"gitlink.org.cn/cloudream/common/pkgs/logger"
"gitlink.org.cn/cloudream/common/pkgs/mq"
"gitlink.org.cn/cloudream/scheduler/common/globals"
exectsk "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq/executor/task"
mgrmq "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq/manager"
myreflect "gitlink.org.cn/cloudream/common/utils/reflect"
)
type Reporter struct {
@ -85,13 +82,3 @@ func (r *Reporter) Serve() error {
}
}
}
func init() {
mq.RegisterTypeSet[exectsk.TaskStatus](
myreflect.TypeOf[exectsk.ScheduleTaskStatus](),
myreflect.TypeOf[exectsk.UploadImageTaskStatus](),
myreflect.TypeOf[exectsk.CacheMovePackageTaskStatus](),
myreflect.TypeOf[exectsk.CreatePackageTaskStatus](),
myreflect.TypeOf[exectsk.LoadPackageTaskStatus](),
)
}