diff --git a/advisor/internal/scheduler/scheduler.go b/advisor/internal/scheduler/scheduler.go index 4685ebe..313a71e 100644 --- a/advisor/internal/scheduler/scheduler.go +++ b/advisor/internal/scheduler/scheduler.go @@ -130,9 +130,23 @@ func NewDefaultSchedule() *DefaultScheduler { } func (s *DefaultScheduler) Schedule(dump *jobmod.JobDump) (*jobmod.JobScheduleScheme, error) { - norJob, ok := dump.Body.(*jobmod.NormalJobDump) - if !ok { - return nil, fmt.Errorf("only normal job can be scheduled, but got %T", dump.Body) + //norJob, ok := dump.Body.(*jobmod.NormalJobDump) + + var jobResourceInfo schsdk.JobResourcesInfo + var jobFiles *jobmod.JobFiles + var targetCCID schsdk.CCID + + switch jobDump := dump.Body.(type) { + case *jobmod.NormalJobDump: + normalJobInfo := dump.Info.(*schsdk.NormalJobInfo) + jobResourceInfo = normalJobInfo.Resources + jobFiles = &jobDump.Files + targetCCID = jobDump.TargetCCID + case *jobmod.InstanceJobDump: + instanceJobInfo := dump.Info.(*schsdk.InstanceJobInfo) + jobResourceInfo = instanceJobInfo.Resources + jobFiles = &jobDump.Files + targetCCID = jobDump.TargetCCID } mgrCli, err := schglb.ManagerMQPool.Acquire() @@ -156,17 +170,17 @@ func (s *DefaultScheduler) Schedule(dump *jobmod.JobDump) (*jobmod.JobScheduleSc for _, cc := range allCC.ComputingCenters { allCCs[cc.CCID] = &candidate{ CC: cc, - IsPreScheduled: cc.CCID == norJob.TargetCCID, + IsPreScheduled: cc.CCID == targetCCID, } } // 计算 - err = s.calcFileScore(norJob.Files, allCCs) + err = s.calcFileScore(*jobFiles, allCCs) if err != nil { return nil, err } - err = s.calcResourceScore(dump.Info.(*schsdk.NormalJobInfo), allCCs) + err = s.calcResourceScore(jobResourceInfo, allCCs) if err != nil { return nil, err } @@ -209,9 +223,9 @@ func (s *DefaultScheduler) makeSchemeForNode(targetCC *candidate) jobmod.JobSche return scheme } -func (s *DefaultScheduler) calcResourceScore(info *schsdk.NormalJobInfo, allCCs map[schsdk.CCID]*candidate) error { +func (s *DefaultScheduler) calcResourceScore(jobResource schsdk.JobResourcesInfo, allCCs map[schsdk.CCID]*candidate) error { for _, cc := range allCCs { - res, err := s.calcOneResourceScore(info.Resources, &cc.CC) + res, err := s.calcOneResourceScore(jobResource, &cc.CC) if err != nil { return err } diff --git a/client/internal/http/job.go b/client/internal/http/job.go index b429c44..489f9cc 100644 --- a/client/internal/http/job.go +++ b/client/internal/http/job.go @@ -20,8 +20,8 @@ type CreateInstanceResp struct { } type CreateInstanceReq struct { - LocalJobID string `json:"localJobID" binding:"required"` - LocalPath schsdk.JobFileInfo `json:"localPath" binding:"required"` + JobID schsdk.JobID `json:"jobID" binding:"required"` + LocalPath schsdk.JobFileInfo `json:"localPath" binding:"required"` } func (s *Server) JobSvc() *JobService { @@ -54,7 +54,7 @@ func (s *JobService) CreateInstance(ctx *gin.Context) { return } - jobID, filesUploadScheme, err := s.svc.JobSetSvc().CreateInstance(req.LocalJobID, req.LocalPath) + jobID, filesUploadScheme, err := s.svc.JobSetSvc().CreateInstance(req.JobID, req.LocalPath) if err != nil { log.Warnf("create job instance: %s", err.Error()) ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "create job instance failed")) diff --git a/client/internal/services/job.go b/client/internal/services/job.go index 688ca58..c164b72 100644 --- a/client/internal/services/job.go +++ b/client/internal/services/job.go @@ -8,7 +8,7 @@ import ( ) // Create 创建多实例任务中的实例任务 -func (svc *JobSetService) CreateInstance(LocalJobID string, LocalPath schsdk.JobFileInfo) (schsdk.JobID, schsdk.JobFilesUploadScheme, error) { +func (svc *JobSetService) CreateInstance(jobID schsdk.JobID, LocalPath schsdk.JobFileInfo) (schsdk.JobID, schsdk.JobFilesUploadScheme, error) { scheme := new(schsdk.JobFilesUploadScheme) @@ -18,7 +18,7 @@ func (svc *JobSetService) CreateInstance(LocalJobID string, LocalPath schsdk.Job } defer schglb.ManagerMQPool.Release(mgrCli) - resp, err := mgrCli.CreateInstance(mgrmq.NewCreateInstance(LocalJobID, LocalPath)) + resp, err := mgrCli.CreateInstance(mgrmq.NewCreateInstance(jobID, LocalPath)) if err != nil { return "", *scheme, fmt.Errorf("submitting job set to manager: %w", err) } diff --git a/client/internal/services/service.go b/client/internal/services/service.go index 06a73dd..7eb124a 100644 --- a/client/internal/services/service.go +++ b/client/internal/services/service.go @@ -1,7 +1,7 @@ package services import ( - "gitlink.org.cn/cloudream/common/pkgs/prescheduler" + "gitlink.org.cn/cloudream/scheduler/common/pkgs/prescheduler" ) type Service struct { diff --git a/client/main.go b/client/main.go index 7dfbdf6..34089fc 100644 --- a/client/main.go +++ b/client/main.go @@ -2,7 +2,7 @@ package main import ( "fmt" - "gitlink.org.cn/cloudream/common/pkgs/prescheduler" + "gitlink.org.cn/cloudream/scheduler/common/pkgs/prescheduler" "os" _ "google.golang.org/grpc/balancer/grpclb" diff --git a/common/models/job/body.go b/common/models/job/body.go index 1d91981..9ac5a6a 100644 --- a/common/models/job/body.go +++ b/common/models/job/body.go @@ -18,6 +18,8 @@ type JobBodyDump interface { var _ = serder.UseTypeUnionExternallyTagged(types.Ref(types.NewTypeUnion[JobBodyDump]( (*NormalJobDump)(nil), (*DataReturnJobDump)(nil), + (*InstanceJobDump)(nil), + (*MultiInstanceJobDump)(nil), ))) type NormalJobDump struct { @@ -42,7 +44,7 @@ func (d *DataReturnJobDump) getType() JobBodyDumpType { } type InstanceJobDump struct { - serder.Metadata `union:"NormalJob"` + serder.Metadata `union:"InstanceJob"` Type JobBodyDumpType `json:"type"` TargetCCID schsdk.CCID `json:"targetCCID"` Files JobFiles `json:"files"` @@ -53,7 +55,7 @@ func (d *InstanceJobDump) getType() JobBodyDumpType { } type MultiInstanceJobDump struct { - serder.Metadata `union:"NormalJob"` + serder.Metadata `union:"MultiInstanceJob"` Type JobBodyDumpType `json:"type"` TargetCCID schsdk.CCID `json:"targetCCID"` Files JobFiles `json:"files"` diff --git a/common/models/job/state.go b/common/models/job/state.go index 68c827c..70f9038 100644 --- a/common/models/job/state.go +++ b/common/models/job/state.go @@ -49,6 +49,24 @@ func (dump *CompletedDump) getType() JobStateDumpType { return dump.Type } +type MultiInstCreateInitDump struct { + serder.Metadata `union:"MultiInstCreateInit"` + Type JobStateDumpType `json:"type"` +} + +func (dump *MultiInstCreateInitDump) getType() JobStateDumpType { + return dump.Type +} + +type MultiInstCreateRunningDump struct { + serder.Metadata `union:"MultiInstCreateRunning"` + Type JobStateDumpType `json:"type"` +} + +func (dump *MultiInstCreateRunningDump) getType() JobStateDumpType { + return dump.Type +} + // 普通任务执行中 type NormalJobExecutingDump struct { serder.Metadata `union:"NormalJobExecuting"` diff --git a/common/pkgs/mq/manager/job.go b/common/pkgs/mq/manager/job.go index d8394d6..dd60450 100644 --- a/common/pkgs/mq/manager/job.go +++ b/common/pkgs/mq/manager/job.go @@ -51,8 +51,8 @@ func (c *Client) SubmitJobSet(msg *SubmitJobSet, opts ...mq.RequestOption) (*Sub type CreateInstance struct { mq.MessageBodyBase - LocalJobID string - LocalPath schsdk.JobFileInfo + JobID schsdk.JobID + LocalPath schsdk.JobFileInfo } type CreateInstanceResp struct { @@ -61,10 +61,10 @@ type CreateInstanceResp struct { UploadScheme schsdk.JobFilesUploadScheme `json:"uploadScheme"` } -func NewCreateInstance(LocalJobID string, LocalPath schsdk.JobFileInfo) *CreateInstance { +func NewCreateInstance(jobID schsdk.JobID, LocalPath schsdk.JobFileInfo) *CreateInstance { return &CreateInstance{ - LocalJobID: LocalJobID, - LocalPath: LocalPath, + JobID: jobID, + LocalPath: LocalPath, } } diff --git a/common/pkgs/prescheduler/calc_score.go b/common/pkgs/prescheduler/calc_score.go new file mode 100644 index 0000000..68cc801 --- /dev/null +++ b/common/pkgs/prescheduler/calc_score.go @@ -0,0 +1,337 @@ +package prescheduler + +import ( + "fmt" + "github.com/inhies/go-bytesize" + schsdk "gitlink.org.cn/cloudream/common/sdks/scheduler" + cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" + uopsdk "gitlink.org.cn/cloudream/common/sdks/unifyops" + "gitlink.org.cn/cloudream/common/utils/math2" + schglb "gitlink.org.cn/cloudream/scheduler/common/globals" + schmod "gitlink.org.cn/cloudream/scheduler/common/models" + "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq/collector" + mgrmq "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq/manager" +) + +func (s *DefaultPreScheduler) calcResourceScore(jobResource schsdk.JobResourcesInfo, allCCs map[schsdk.CCID]*candidate) error { + for _, cc := range allCCs { + res, err := s.calcOneResourceScore(jobResource, &cc.CC) + if err != nil { + return err + } + + cc.Resource = *res + } + + return nil +} + +// 划分节点资源等级,并计算资源得分 +func (s *DefaultPreScheduler) calcOneResourceScore(requires schsdk.JobResourcesInfo, cc *schmod.ComputingCenter) (*resourcesDetail, error) { + colCli, err := schglb.CollectorMQPool.Acquire() + if err != nil { + return nil, fmt.Errorf("new collector client: %w", err) + } + defer schglb.CollectorMQPool.Release(colCli) + + getResDataResp, err := colCli.GetAllResourceData(collector.NewGetAllResourceData(cc.UOPSlwNodeID)) + if err != nil { + return nil, err + } + + var resDetail resourcesDetail + + //计算资源得分 + totalScore := 0.0 + maxLevel := 0 + resKinds := 0 + + if requires.CPU > 0 { + res := findResuorce[*uopsdk.CPUResourceData](getResDataResp.Datas) + if res == nil { + resDetail.CPU.Level = ResourceLevel3 + resDetail.CPU.Score = 0 + } else { + resDetail.CPU.Level = s.calcResourceLevel(float64(res.Available.Value), requires.CPU) + resDetail.CPU.Score = (float64(res.Available.Value) / requires.CPU) * CpuResourceWeight + } + + maxLevel = math2.Max(maxLevel, resDetail.CPU.Level) + totalScore += resDetail.CPU.Score + resKinds++ + } + + if requires.GPU > 0 { + res := findResuorce[*uopsdk.GPUResourceData](getResDataResp.Datas) + if res == nil { + resDetail.GPU.Level = ResourceLevel3 + resDetail.GPU.Score = 0 + } else { + resDetail.GPU.Level = s.calcResourceLevel(float64(res.Available.Value), requires.GPU) + resDetail.GPU.Score = (float64(res.Available.Value) / requires.GPU) * CpuResourceWeight + } + + maxLevel = math2.Max(maxLevel, resDetail.GPU.Level) + totalScore += resDetail.GPU.Score + resKinds++ + } + + if requires.NPU > 0 { + res := findResuorce[*uopsdk.NPUResourceData](getResDataResp.Datas) + if res == nil { + resDetail.NPU.Level = ResourceLevel3 + resDetail.NPU.Score = 0 + } else { + resDetail.NPU.Level = s.calcResourceLevel(float64(res.Available.Value), requires.NPU) + resDetail.NPU.Score = (float64(res.Available.Value) / requires.NPU) * CpuResourceWeight + } + + maxLevel = math2.Max(maxLevel, resDetail.NPU.Level) + totalScore += resDetail.NPU.Score + resKinds++ + } + + if requires.MLU > 0 { + res := findResuorce[*uopsdk.MLUResourceData](getResDataResp.Datas) + if res == nil { + resDetail.MLU.Level = ResourceLevel3 + resDetail.MLU.Score = 0 + } else { + resDetail.MLU.Level = s.calcResourceLevel(float64(res.Available.Value), requires.MLU) + resDetail.MLU.Score = (float64(res.Available.Value) / requires.MLU) * CpuResourceWeight + } + + maxLevel = math2.Max(maxLevel, resDetail.MLU.Level) + totalScore += resDetail.MLU.Score + resKinds++ + } + + if requires.Storage > 0 { + res := findResuorce[*uopsdk.StorageResourceData](getResDataResp.Datas) + if res == nil { + resDetail.Storage.Level = ResourceLevel3 + resDetail.Storage.Score = 0 + } else { + bytes, err := bytesize.Parse(fmt.Sprintf("%f%s", res.Available.Value, res.Available.Unit)) + if err != nil { + return nil, err + } + + resDetail.Storage.Level = s.calcResourceLevel(float64(bytes), float64(requires.Storage)) + resDetail.Storage.Score = (float64(bytes) / float64(requires.Storage)) * StgResourceWeight + } + + maxLevel = math2.Max(maxLevel, resDetail.Storage.Level) + totalScore += resDetail.Storage.Score + resKinds++ + } + + if requires.Memory > 0 { + res := findResuorce[*uopsdk.MemoryResourceData](getResDataResp.Datas) + if res == nil { + resDetail.Memory.Level = ResourceLevel3 + resDetail.Memory.Score = 0 + } else { + bytes, err := bytesize.Parse(fmt.Sprintf("%f%s", res.Available.Value, res.Available.Unit)) + if err != nil { + return nil, err + } + + resDetail.Memory.Level = s.calcResourceLevel(float64(bytes), float64(requires.Memory)) + resDetail.Memory.Score = (float64(bytes) / float64(requires.Memory)) * StgResourceWeight + } + + maxLevel = math2.Max(maxLevel, resDetail.Memory.Level) + totalScore += resDetail.Memory.Score + resKinds++ + } + + if resKinds == 0 { + return &resDetail, nil + } + + resDetail.TotalScore = totalScore + resDetail.AvgScore = resDetail.AvgScore / float64(resKinds) + resDetail.MaxLevel = maxLevel + + return &resDetail, nil +} + +func (s *DefaultPreScheduler) calcResourceLevel(avai float64, need float64) int { + if avai >= 1.5*need { + return ResourceLevel1 + } + + if avai >= need { + return ResourceLevel2 + } + + return ResourceLevel3 +} + +// 计算节点得分情况 +func (s *DefaultPreScheduler) calcFileScore(files schsdk.JobFilesInfo, allCCs map[schsdk.CCID]*candidate) error { + // 只计算运控返回的可用计算中心上的存储服务的数据权重 + cdsNodeToCC := make(map[cdssdk.NodeID]*candidate) + for _, cc := range allCCs { + cdsNodeToCC[cc.CC.CDSNodeID] = cc + } + + //计算code相关得分 + if pkgFile, ok := files.Code.(*schsdk.PackageJobFileInfo); ok { + codeFileScores, err := s.calcPackageFileScore(pkgFile.PackageID, cdsNodeToCC) + if err != nil { + return fmt.Errorf("calc code file score: %w", err) + } + for id, score := range codeFileScores { + allCCs[id].Files.Code = *score + } + } + + //计算dataset相关得分 + if pkgFile, ok := files.Dataset.(*schsdk.PackageJobFileInfo); ok { + datasetFileScores, err := s.calcPackageFileScore(pkgFile.PackageID, cdsNodeToCC) + if err != nil { + return fmt.Errorf("calc dataset file score: %w", err) + } + for id, score := range datasetFileScores { + allCCs[id].Files.Dataset = *score + } + } + + //计算image相关得分 + if imgFile, ok := files.Image.(*schsdk.ImageJobFileInfo); ok { + //计算image相关得分 + imageFileScores, err := s.calcImageFileScore(imgFile.ImageID, allCCs, cdsNodeToCC) + if err != nil { + return fmt.Errorf("calc image file score: %w", err) + } + for id, score := range imageFileScores { + allCCs[id].Files.Image = *score + } + } + + for _, cc := range allCCs { + cc.Files.TotalScore = cc.Files.Code.CachingScore + + cc.Files.Code.LoadingScore + + cc.Files.Dataset.CachingScore + + cc.Files.Dataset.LoadingScore + + cc.Files.Image.CachingScore + + cc.Files.Image.LoadingScore + } + + return nil +} + +// 计算package在各节点的得分情况 +func (s *DefaultPreScheduler) calcPackageFileScore(packageID cdssdk.PackageID, cdsNodeToCC map[cdssdk.NodeID]*candidate) (map[schsdk.CCID]*fileDetail, error) { + colCli, err := schglb.CollectorMQPool.Acquire() + if err != nil { + return nil, fmt.Errorf("new collector client: %w", err) + } + defer schglb.CollectorMQPool.Release(colCli) + + ccFileScores := make(map[schsdk.CCID]*fileDetail) + + // TODO UserID + cachedResp, err := colCli.PackageGetCachedStgNodes(collector.NewPackageGetCachedStgNodes(1, packageID)) + if err != nil { + return nil, err + } + + for _, cdsNodeCacheInfo := range cachedResp.NodeInfos { + cc, ok := cdsNodeToCC[cdsNodeCacheInfo.NodeID] + if !ok { + continue + } + + ccFileScores[cc.CC.CCID] = &fileDetail{ + //TODO 根据缓存方式不同,可能会有不同的计算方式 + CachingScore: float64(cdsNodeCacheInfo.FileSize) / float64(cachedResp.PackageSize) * CachingWeight, + } + } + + // TODO UserID + loadedResp, err := colCli.PackageGetLoadedStgNodes(collector.NewPackageGetLoadedStgNodes(1, packageID)) + if err != nil { + return nil, err + } + + for _, cdsNodeID := range loadedResp.StgNodeIDs { + cc, ok := cdsNodeToCC[cdsNodeID] + if !ok { + continue + } + + sfc, ok := ccFileScores[cc.CC.CCID] + if !ok { + sfc = &fileDetail{} + ccFileScores[cc.CC.CCID] = sfc + } + + sfc.LoadingScore = 1 * LoadedWeight + sfc.IsLoaded = true + } + + return ccFileScores, nil +} + +// 计算package在各节点的得分情况 +func (s *DefaultPreScheduler) calcImageFileScore(imageID schsdk.ImageID, allCCs map[schsdk.CCID]*candidate, cdsNodeToCC map[cdssdk.NodeID]*candidate) (map[schsdk.CCID]*fileDetail, error) { + colCli, err := schglb.CollectorMQPool.Acquire() + if err != nil { + return nil, fmt.Errorf("new collector client: %w", err) + } + defer schglb.CollectorMQPool.Release(colCli) + + magCli, err := schglb.ManagerMQPool.Acquire() + if err != nil { + return nil, fmt.Errorf("new manager client: %w", err) + } + defer schglb.ManagerMQPool.Release(magCli) + + imageInfoResp, err := magCli.GetImageInfo(mgrmq.NewGetImageInfo(imageID)) + if err != nil { + return nil, fmt.Errorf("getting image info: %w", err) + } + + ccFileScores := make(map[schsdk.CCID]*fileDetail) + + if imageInfoResp.Image.CDSPackageID != nil { + cachedResp, err := colCli.PackageGetCachedStgNodes(collector.NewPackageGetCachedStgNodes(1, *imageInfoResp.Image.CDSPackageID)) + if err != nil { + return nil, err + } + + for _, cdsNodeCacheInfo := range cachedResp.NodeInfos { + cc, ok := cdsNodeToCC[cdsNodeCacheInfo.NodeID] + if !ok { + continue + } + + ccFileScores[cc.CC.CCID] = &fileDetail{ + //TODO 根据缓存方式不同,可能会有不同的计算方式 + CachingScore: float64(cdsNodeCacheInfo.FileSize) / float64(cachedResp.PackageSize) * CachingWeight, + } + } + } + + // 镜像的LoadingScore是判断是否导入到算力中心 + for _, pcmImg := range imageInfoResp.PCMImages { + _, ok := allCCs[pcmImg.CCID] + if !ok { + continue + } + + fsc, ok := ccFileScores[pcmImg.CCID] + if !ok { + fsc = &fileDetail{} + ccFileScores[pcmImg.CCID] = fsc + } + + fsc.LoadingScore = 1 * LoadedWeight + fsc.IsLoaded = true + } + + return ccFileScores, nil +} diff --git a/common/pkgs/prescheduler/default_prescheduler.go b/common/pkgs/prescheduler/default_prescheduler.go new file mode 100644 index 0000000..98f6e03 --- /dev/null +++ b/common/pkgs/prescheduler/default_prescheduler.go @@ -0,0 +1,517 @@ +package prescheduler + +import ( + "fmt" + "sort" + + "github.com/samber/lo" + + schsdk "gitlink.org.cn/cloudream/common/sdks/scheduler" + uopsdk "gitlink.org.cn/cloudream/common/sdks/unifyops" + schglb "gitlink.org.cn/cloudream/scheduler/common/globals" + schmod "gitlink.org.cn/cloudream/scheduler/common/models" + jobmod "gitlink.org.cn/cloudream/scheduler/common/models/job" + mgrmq "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq/manager" +) + +const ( + //每个节点划分的资源等级: + // ResourceLevel1:表示所有资源类型均满足 大于等于1.5倍 + ResourceLevel1 = 1 + // ResourceLevel2:表示不满足Level1,但所有资源类型均满足 大于等于1倍 + ResourceLevel2 = 2 + // ResourceLevel3: 表示某些资源类型 小于一倍 + ResourceLevel3 = 3 + + CpuResourceWeight float64 = 1 + StgResourceWeight float64 = 1.2 + + CachingWeight float64 = 1 + LoadedWeight float64 = 2 +) + +var ErrNoAvailableScheme = fmt.Errorf("no appropriate scheduling node found, please wait") + +type candidate struct { + CC schmod.ComputingCenter + IsReferencedJobTarget bool // 这个节点是否是所依赖的任务所选择的节点 + Resource resourcesDetail + Files filesDetail +} + +type resourcesDetail struct { + CPU resourceDetail + GPU resourceDetail + NPU resourceDetail + MLU resourceDetail + Storage resourceDetail + Memory resourceDetail + + TotalScore float64 + AvgScore float64 + MaxLevel int +} +type resourceDetail struct { + Level int + Score float64 +} + +type filesDetail struct { + Dataset fileDetail + Code fileDetail + Image fileDetail + + TotalScore float64 +} +type fileDetail struct { + CachingScore float64 + LoadingScore float64 + IsLoaded bool //表示storage是否已经调度到该节点, image表示镜像是否已经加载到该算力中心 +} + +type schedulingJob struct { + Job schsdk.JobInfo + Afters []string +} + +type CandidateArr []*candidate + +func (a CandidateArr) Len() int { return len(a) } +func (a CandidateArr) Swap(i, j int) { a[i], a[j] = a[j], a[i] } +func (a CandidateArr) Less(i, j int) bool { + n1 := a[i] + n2 := a[j] + + // 优先与所依赖的任务放到一起,但要求那个节点的资源足够 + if n1.IsReferencedJobTarget && n1.Resource.MaxLevel < ResourceLevel3 { + return true + } + if n2.IsReferencedJobTarget && n2.Resource.MaxLevel < ResourceLevel3 { + return true + } + + // 优先判断资源等级,资源等级越低,代表越满足需求 + if n1.Resource.MaxLevel < n2.Resource.MaxLevel { + return true + } + if n1.Resource.MaxLevel > n2.Resource.MaxLevel { + return false + } + + // 等级相同时,根据单项分值比较 + switch n1.Resource.MaxLevel { + case ResourceLevel1: + // 数据文件总分越高,代表此节点上拥有的数据文件越完整,则越优先考虑 + return n1.Files.TotalScore > n2.Files.TotalScore + + case ResourceLevel2: + // 资源分的平均值越高,代表资源越空余,则越优先考虑 + return n1.Resource.AvgScore > n2.Resource.AvgScore + + case ResourceLevel3: + // 资源分的平均值越高,代表资源越空余,则越优先考虑 + return n1.Resource.AvgScore > n2.Resource.AvgScore + } + + return false +} + +type DefaultPreScheduler struct { +} + +func NewDefaultPreScheduler() *DefaultPreScheduler { + return &DefaultPreScheduler{} +} + +// ScheduleJobSet 任务集预调度 +func (s *DefaultPreScheduler) ScheduleJobSet(info *schsdk.JobSetInfo) (*jobmod.JobSetPreScheduleScheme, *schsdk.JobSetFilesUploadScheme, error) { + jobSetScheme := &jobmod.JobSetPreScheduleScheme{ + JobSchemes: make(map[string]jobmod.JobScheduleScheme), + } + filesUploadSchemes := make(map[string]schsdk.LocalFileUploadScheme) + + mgrCli, err := schglb.ManagerMQPool.Acquire() + if err != nil { + return nil, nil, fmt.Errorf("new collector client: %w", err) + } + defer schglb.ManagerMQPool.Release(mgrCli) + + // 查询有哪些算力中心可用 + + allCC, err := mgrCli.GetAllComputingCenter(mgrmq.NewGetAllComputingCenter()) + if err != nil { + return nil, nil, fmt.Errorf("getting all computing center info: %w", err) + } + + ccs := make(map[schsdk.CCID]schmod.ComputingCenter) + for _, node := range allCC.ComputingCenters { + ccs[node.CCID] = node + } + + if len(ccs) == 0 { + return nil, nil, ErrNoAvailableScheme + } + + // 先根据任务配置,收集它们依赖的任务的LocalID + var schJobs []*schedulingJob + for _, job := range info.Jobs { + j := &schedulingJob{ + Job: job, + } + + if norJob, ok := job.(*schsdk.NormalJobInfo); ok { + if resFile, ok := norJob.Files.Dataset.(*schsdk.DataReturnJobFileInfo); ok { + j.Afters = append(j.Afters, resFile.DataReturnLocalJobID) + } + + if resFile, ok := norJob.Files.Code.(*schsdk.DataReturnJobFileInfo); ok { + j.Afters = append(j.Afters, resFile.DataReturnLocalJobID) + } + } else if resJob, ok := job.(*schsdk.DataReturnJobInfo); ok { + j.Afters = append(j.Afters, resJob.TargetLocalJobID) + } + + schJobs = append(schJobs, j) + } + + // 然后根据依赖进行排序 + schJobs, ok := s.orderByAfters(schJobs) + if !ok { + return nil, nil, fmt.Errorf("circular reference detected between jobs in the job set") + } + + // 经过排序后,按顺序生成调度方案 + for _, job := range schJobs { + if norJob, ok := job.Job.(*schsdk.NormalJobInfo); ok { + scheme, err := s.scheduleForNormalOrMultiJob(info, job, ccs, jobSetScheme.JobSchemes) + if err != nil { + return nil, nil, err + } + + jobSetScheme.JobSchemes[job.Job.GetLocalJobID()] = *scheme + + // 检查数据文件的配置项,生成上传文件方案 + s.fillNormarlJobLocalUploadScheme(norJob.Files, scheme.TargetCCID, filesUploadSchemes, ccs) + } + + if mulJob, ok := job.Job.(*schsdk.MultiInstanceJobInfo); ok { + scheme, err := s.scheduleForNormalOrMultiJob(info, job, ccs, jobSetScheme.JobSchemes) + if err != nil { + return nil, nil, err + } + + jobSetScheme.JobSchemes[job.Job.GetLocalJobID()] = *scheme + + // 检查数据文件的配置项,生成上传文件方案 + s.fillNormarlJobLocalUploadScheme(mulJob.Files, scheme.TargetCCID, filesUploadSchemes, ccs) + } + + // 回源任务目前不需要生成调度方案 + } + + return jobSetScheme, &schsdk.JobSetFilesUploadScheme{ + LocalFileSchemes: lo.Values(filesUploadSchemes), + }, nil +} + +// ScheduleJob 单个任务预调度 +func (s *DefaultPreScheduler) ScheduleJob(instJobInfo *schsdk.InstanceJobInfo) (*jobmod.JobScheduleScheme, *schsdk.JobFilesUploadScheme, error) { + filesUploadSchemes := make(map[string]schsdk.LocalFileUploadScheme) + + mgrCli, err := schglb.ManagerMQPool.Acquire() + if err != nil { + return nil, nil, fmt.Errorf("new collector client: %w", err) + } + defer schglb.ManagerMQPool.Release(mgrCli) + + // 查询有哪些算力中心可用 + + allCC, err := mgrCli.GetAllComputingCenter(mgrmq.NewGetAllComputingCenter()) + if err != nil { + return nil, nil, fmt.Errorf("getting all computing center info: %w", err) + } + + ccs := make(map[schsdk.CCID]schmod.ComputingCenter) + for _, node := range allCC.ComputingCenters { + ccs[node.CCID] = node + } + + if len(ccs) == 0 { + return nil, nil, ErrNoAvailableScheme + } + + info := &schsdk.NormalJobInfo{ + Files: instJobInfo.Files, + Runtime: instJobInfo.Runtime, + Resources: instJobInfo.Resources, + } + + job := &schedulingJob{ + Job: info, + } + scheme, err := s.scheduleForSingleJob(job, ccs) + if err != nil { + return nil, nil, err + } + + // 检查数据文件的配置项,生成上传文件方案 + s.fillNormarlJobLocalUploadScheme(info.Files, scheme.TargetCCID, filesUploadSchemes, ccs) + + return scheme, &schsdk.JobFilesUploadScheme{ + LocalFileSchemes: lo.Values(filesUploadSchemes), + }, nil +} + +func (s *DefaultPreScheduler) orderByAfters(jobs []*schedulingJob) ([]*schedulingJob, bool) { + type jobOrder struct { + Job *schedulingJob + Afters []string + } + + var jobOrders []*jobOrder + for _, job := range jobs { + od := &jobOrder{ + Job: job, + Afters: make([]string, len(job.Afters)), + } + + copy(od.Afters, job.Afters) + + jobOrders = append(jobOrders, od) + } + + // 然后排序 + var orderedJob []*schedulingJob + for { + rm := 0 + for i, jo := range jobOrders { + // 找到没有依赖的任务,然后将其取出 + if len(jo.Afters) == 0 { + orderedJob = append(orderedJob, jo.Job) + + // 删除其他任务对它的引用 + for _, job2 := range jobOrders { + job2.Afters = lo.Reject(job2.Afters, func(item string, idx int) bool { return item == jo.Job.Job.GetLocalJobID() }) + } + + rm++ + continue + } + + jobOrders[i-rm] = jobOrders[i] + } + + jobOrders = jobOrders[:len(jobOrders)-rm] + if len(jobOrders) == 0 { + break + } + + // 遍历一轮后没有找到无依赖的任务,那么就是存在循环引用,排序失败 + if rm == 0 { + return nil, false + } + } + + return orderedJob, true +} + +func (s *DefaultPreScheduler) scheduleForNormalOrMultiJob(jobSet *schsdk.JobSetInfo, job *schedulingJob, ccs map[schsdk.CCID]schmod.ComputingCenter, jobSchemes map[string]jobmod.JobScheduleScheme) (*jobmod.JobScheduleScheme, error) { + allCCs := make(map[schsdk.CCID]*candidate) + + // 初始化备选节点信息 + for _, cc := range ccs { + caNode := &candidate{ + CC: cc, + } + + // 检查此节点是否是它所引用的任务所选的节点 + for _, af := range job.Afters { + resJob := findJobInfo[*schsdk.DataReturnJobInfo](jobSet.Jobs, af) + if resJob == nil { + return nil, fmt.Errorf("resource job %s not found in the job set", af) + } + + // 由于jobs已经按照引用排序,所以正常情况下这里肯定能取到值 + scheme, ok := jobSchemes[resJob.TargetLocalJobID] + if !ok { + continue + } + + if scheme.TargetCCID == cc.CCID { + caNode.IsReferencedJobTarget = true + break + } + } + + allCCs[cc.CCID] = caNode + } + + //norJob := job.Job.(*schsdk.NormalJobInfo) + + var jobFiles *schsdk.JobFilesInfo + var jobResource *schsdk.JobResourcesInfo + + switch runningJob := job.Job.(type) { + case *schsdk.NormalJobInfo: + jobFiles = &runningJob.Files + jobResource = &runningJob.Resources + case *schsdk.MultiInstanceJobInfo: + jobFiles = &runningJob.Files + jobResource = &runningJob.Resources + } + + // 计算文件占有量得分 + err := s.calcFileScore(*jobFiles, allCCs) + if err != nil { + return nil, err + } + + // 计算资源余量得分 + err = s.calcResourceScore(*jobResource, allCCs) + if err != nil { + return nil, err + } + + allCCsArr := lo.Values(allCCs) + sort.Sort(CandidateArr(allCCsArr)) + + targetNode := allCCsArr[0] + if targetNode.Resource.MaxLevel == ResourceLevel3 { + return nil, ErrNoAvailableScheme + } + + scheme := s.makeSchemeForNode(jobFiles, targetNode) + return &scheme, nil +} + +func (s *DefaultPreScheduler) scheduleForSingleJob(job *schedulingJob, ccs map[schsdk.CCID]schmod.ComputingCenter) (*jobmod.JobScheduleScheme, error) { + allCCs := make(map[schsdk.CCID]*candidate) + + // 初始化备选节点信息 + for _, cc := range ccs { + caNode := &candidate{ + CC: cc, + } + + allCCs[cc.CCID] = caNode + } + + //norJob := job.Job.(*schsdk.NormalJobInfo) + + var jobFiles *schsdk.JobFilesInfo + var jobResource *schsdk.JobResourcesInfo + + switch runningJob := job.Job.(type) { + case *schsdk.NormalJobInfo: + jobFiles = &runningJob.Files + jobResource = &runningJob.Resources + case *schsdk.MultiInstanceJobInfo: + jobFiles = &runningJob.Files + jobResource = &runningJob.Resources + } + + // 计算文件占有量得分 + err := s.calcFileScore(*jobFiles, allCCs) + if err != nil { + return nil, err + } + + // 计算资源余量得分 + err = s.calcResourceScore(*jobResource, allCCs) + if err != nil { + return nil, err + } + + allCCsArr := lo.Values(allCCs) + sort.Sort(CandidateArr(allCCsArr)) + + targetNode := allCCsArr[0] + if targetNode.Resource.MaxLevel == ResourceLevel3 { + return nil, ErrNoAvailableScheme + } + + scheme := s.makeSchemeForNode(jobFiles, targetNode) + return &scheme, nil +} + +func (s *DefaultPreScheduler) fillNormarlJobLocalUploadScheme(files schsdk.JobFilesInfo, targetCCID schsdk.CCID, schemes map[string]schsdk.LocalFileUploadScheme, ccs map[schsdk.CCID]schmod.ComputingCenter) { + if localFile, ok := files.Dataset.(*schsdk.LocalJobFileInfo); ok { + if _, ok := schemes[localFile.LocalPath]; !ok { + cdsNodeID := ccs[targetCCID].CDSNodeID + schemes[localFile.LocalPath] = schsdk.LocalFileUploadScheme{ + LocalPath: localFile.LocalPath, + UploadToCDSNodeID: &cdsNodeID, + } + } + } + + if localFile, ok := files.Code.(*schsdk.LocalJobFileInfo); ok { + if _, ok := schemes[localFile.LocalPath]; !ok { + cdsNodeID := ccs[targetCCID].CDSNodeID + schemes[localFile.LocalPath] = schsdk.LocalFileUploadScheme{ + LocalPath: localFile.LocalPath, + UploadToCDSNodeID: &cdsNodeID, + } + } + } + + if localFile, ok := files.Image.(*schsdk.LocalJobFileInfo); ok { + if _, ok := schemes[localFile.LocalPath]; !ok { + cdsNodeID := ccs[targetCCID].CDSNodeID + schemes[localFile.LocalPath] = schsdk.LocalFileUploadScheme{ + LocalPath: localFile.LocalPath, + UploadToCDSNodeID: &cdsNodeID, + } + } + } +} + +func (s *DefaultPreScheduler) makeSchemeForNode(jobFiles *schsdk.JobFilesInfo, targetCC *candidate) jobmod.JobScheduleScheme { + scheme := jobmod.JobScheduleScheme{ + TargetCCID: targetCC.CC.CCID, + } + + // TODO 根据实际情况选择Move或者Load + + if _, ok := jobFiles.Dataset.(*schsdk.PackageJobFileInfo); ok && !targetCC.Files.Dataset.IsLoaded { + scheme.Dataset.Action = jobmod.ActionLoad + } else { + scheme.Dataset.Action = jobmod.ActionNo + } + + if _, ok := jobFiles.Code.(*schsdk.PackageJobFileInfo); ok && !targetCC.Files.Code.IsLoaded { + scheme.Code.Action = jobmod.ActionLoad + } else { + scheme.Code.Action = jobmod.ActionNo + } + + if _, ok := jobFiles.Image.(*schsdk.PackageJobFileInfo); ok && !targetCC.Files.Image.IsLoaded { + scheme.Image.Action = jobmod.ActionImportImage + } else { + scheme.Image.Action = jobmod.ActionNo + } + + return scheme +} + +func findResuorce[T uopsdk.ResourceData](all []uopsdk.ResourceData) T { + for _, data := range all { + if ret, ok := data.(T); ok { + return ret + } + } + + var def T + return def +} + +func findJobInfo[T schsdk.JobInfo](jobs []schsdk.JobInfo, localJobID string) T { + for _, job := range jobs { + if ret, ok := job.(T); ok && job.GetLocalJobID() == localJobID { + return ret + } + } + + var def T + return def +} diff --git a/common/pkgs/prescheduler/default_prescheduler_test.go b/common/pkgs/prescheduler/default_prescheduler_test.go new file mode 100644 index 0000000..e10ae82 --- /dev/null +++ b/common/pkgs/prescheduler/default_prescheduler_test.go @@ -0,0 +1,117 @@ +package prescheduler + +import ( + "testing" + + "github.com/samber/lo" + . "github.com/smartystreets/goconvey/convey" + + schsdk "gitlink.org.cn/cloudream/common/sdks/scheduler" +) + +func TestOrderByAfters(t *testing.T) { + cases := []struct { + title string + jobs []*schedulingJob + wants []string + }{ + { + title: "所有Job都有直接或间接的依赖关系", + jobs: []*schedulingJob{ + { + Job: &schsdk.NormalJobInfo{JobInfoBase: schsdk.JobInfoBase{LocalJobID: "1"}}, + Afters: []string{"2"}, + }, + + { + Job: &schsdk.NormalJobInfo{JobInfoBase: schsdk.JobInfoBase{LocalJobID: "2"}}, + Afters: []string{}, + }, + + { + Job: &schsdk.NormalJobInfo{JobInfoBase: schsdk.JobInfoBase{LocalJobID: "3"}}, + Afters: []string{"1"}, + }, + }, + wants: []string{"2", "1", "3"}, + }, + + { + title: "部分Job之间无依赖关系", + jobs: []*schedulingJob{ + { + Job: &schsdk.NormalJobInfo{JobInfoBase: schsdk.JobInfoBase{LocalJobID: "1"}}, + Afters: []string{"2"}, + }, + + { + Job: &schsdk.NormalJobInfo{JobInfoBase: schsdk.JobInfoBase{LocalJobID: "2"}}, + Afters: []string{}, + }, + + { + Job: &schsdk.NormalJobInfo{JobInfoBase: schsdk.JobInfoBase{LocalJobID: "3"}}, + Afters: []string{"1"}, + }, + + { + Job: &schsdk.NormalJobInfo{JobInfoBase: schsdk.JobInfoBase{LocalJobID: "4"}}, + Afters: []string{"5"}, + }, + + { + Job: &schsdk.NormalJobInfo{JobInfoBase: schsdk.JobInfoBase{LocalJobID: "5"}}, + Afters: []string{}, + }, + }, + wants: []string{"2", "5", "1", "3", "4"}, + }, + + { + title: "存在循环依赖", + jobs: []*schedulingJob{ + { + Job: &schsdk.NormalJobInfo{JobInfoBase: schsdk.JobInfoBase{LocalJobID: "1"}}, + Afters: []string{"2"}, + }, + + { + Job: &schsdk.NormalJobInfo{JobInfoBase: schsdk.JobInfoBase{LocalJobID: "2"}}, + Afters: []string{"1"}, + }, + }, + wants: nil, + }, + + { + title: "完全不依赖", + jobs: []*schedulingJob{ + { + Job: &schsdk.NormalJobInfo{JobInfoBase: schsdk.JobInfoBase{LocalJobID: "1"}}, + Afters: []string{}, + }, + + { + Job: &schsdk.NormalJobInfo{JobInfoBase: schsdk.JobInfoBase{LocalJobID: "2"}}, + Afters: []string{}, + }, + }, + wants: []string{"1", "2"}, + }, + } + + sch := NewDefaultPreScheduler() + for _, c := range cases { + Convey(c.title, t, func() { + ordered, ok := sch.orderByAfters(c.jobs) + if c.wants == nil { + So(ok, ShouldBeFalse) + } else { + So(ok, ShouldBeTrue) + + ids := lo.Map(ordered, func(item *schedulingJob, idx int) string { return item.Job.GetLocalJobID() }) + So(ids, ShouldResemble, c.wants) + } + }) + } +} diff --git a/common/pkgs/prescheduler/prescheduler.go b/common/pkgs/prescheduler/prescheduler.go new file mode 100644 index 0000000..7aa3f18 --- /dev/null +++ b/common/pkgs/prescheduler/prescheduler.go @@ -0,0 +1,11 @@ +package prescheduler + +import ( + schsdk "gitlink.org.cn/cloudream/common/sdks/scheduler" + jobmod "gitlink.org.cn/cloudream/scheduler/common/models/job" +) + +type PreScheduler interface { + ScheduleJobSet(info *schsdk.JobSetInfo) (*jobmod.JobSetPreScheduleScheme, *schsdk.JobSetFilesUploadScheme, error) + ScheduleJob(info *schsdk.InstanceJobInfo) (*jobmod.JobScheduleScheme, *schsdk.JobFilesUploadScheme, error) +} diff --git a/manager/internal/jobmgr/event.go b/manager/internal/jobmgr/event.go deleted file mode 100644 index 1b9f6d2..0000000 --- a/manager/internal/jobmgr/event.go +++ /dev/null @@ -1,62 +0,0 @@ -package jobmgr - -import ( - "errors" - - schsdk "gitlink.org.cn/cloudream/common/sdks/scheduler" -) - -// 不是所关注的Task上报的进度 -var ErrUnconcernedTask = errors.New("unconcerned task") - -var ErrTaskTimeout = errors.New("task timeout") - -var ErrJobCancelled = errors.New("job cancelled") - -type Event interface{} - -type BroadcastType string - -const ( - BroadcastAll BroadcastType = "All" - BroadcastJobSet BroadcastType = "JobSet" - BroadcastJob BroadcastType = "Job" -) - -type Broadcast struct { - Type BroadcastType - JobSetID schsdk.JobSetID - JobID schsdk.JobID -} - -func (b *Broadcast) ToAll() bool { - return b.Type == BroadcastAll -} - -func (b *Broadcast) ToJobSet() bool { - return b.Type == BroadcastJobSet -} - -func (b *Broadcast) ToJob() bool { - return b.Type == BroadcastJob -} - -func ToAll() Broadcast { - return Broadcast{ - Type: BroadcastAll, - } -} - -func ToJobSet(jobSetID schsdk.JobSetID) Broadcast { - return Broadcast{ - Type: BroadcastJobSet, - JobSetID: jobSetID, - } -} - -func ToJob(jobID schsdk.JobID) Broadcast { - return Broadcast{ - Type: BroadcastJob, - JobID: jobID, - } -} diff --git a/manager/internal/jobmgr/event/cancel.go b/manager/internal/jobmgr/event/cancel.go index 1eede5b..7341a7b 100644 --- a/manager/internal/jobmgr/event/cancel.go +++ b/manager/internal/jobmgr/event/cancel.go @@ -2,3 +2,6 @@ package event type Cancel struct { } + +func (s *Cancel) Noop() { +} diff --git a/manager/internal/jobmgr/event/instance_create.go b/manager/internal/jobmgr/event/instance_create.go index 4a2e59e..8242b91 100644 --- a/manager/internal/jobmgr/event/instance_create.go +++ b/manager/internal/jobmgr/event/instance_create.go @@ -23,3 +23,6 @@ func NewInstanceCreate(LocalPath schsdk.JobFileInfo, future CreateInstanceFuture Result: future, } } + +func (s *InstanceCreate) Noop() { +} diff --git a/manager/internal/jobmgr/event/job_completed.go b/manager/internal/jobmgr/event/job_completed.go index 2452134..e9231ad 100644 --- a/manager/internal/jobmgr/event/job_completed.go +++ b/manager/internal/jobmgr/event/job_completed.go @@ -16,3 +16,6 @@ func NewJobCompleted(job *jobmgr.Job, err error) *JobCompleted { Err: err, } } + +func (s *JobCompleted) Noop() { +} diff --git a/manager/internal/jobmgr/event/local_file_uploaded.go b/manager/internal/jobmgr/event/local_file_uploaded.go index 9b81ad3..6ecebb8 100644 --- a/manager/internal/jobmgr/event/local_file_uploaded.go +++ b/manager/internal/jobmgr/event/local_file_uploaded.go @@ -18,3 +18,6 @@ func NewLocalFileUploaded(localPath string, err error, packageID cdssdk.PackageI PackageID: packageID, } } + +func (s *LocalFileUploaded) Noop() { +} diff --git a/manager/internal/jobmgr/event_set.go b/manager/internal/jobmgr/event_set.go index a26ec55..1792365 100644 --- a/manager/internal/jobmgr/event_set.go +++ b/manager/internal/jobmgr/event_set.go @@ -2,6 +2,7 @@ package jobmgr import ( "context" + "errors" "sync" "gitlink.org.cn/cloudream/common/pkgs/future" @@ -10,6 +11,12 @@ import ( type EventWaitCondition func(evt Event) bool +var ErrJobCancelled = errors.New("job cancelled") + +type Event interface { + Noop() +} + type EventWaiter struct { condition EventWaitCondition future *future.SetValueFuture[Event] @@ -52,12 +59,13 @@ func (s *EventSet) Post(evt Event) { func (s *EventSet) Wait(ctx context.Context, cond EventWaitCondition) (Event, bool) { s.lock.Lock() - defer s.lock.Unlock() + //defer s.lock.Unlock() // 一个等待者只能等待一个事件 for i, evt := range s.events { if cond(evt) { s.events = lo2.RemoveAt(s.events, i) + s.lock.Unlock() return evt, true } } @@ -67,9 +75,13 @@ func (s *EventSet) Wait(ctx context.Context, cond EventWaitCondition) (Event, bo condition: cond, future: fut, } - s.events = append(s.events, waiter) + //s.events = append(s.events, waiter) + s.waiters = append(s.waiters, waiter) + + s.lock.Unlock() val, err := fut.WaitValue(ctx) + if err != nil { return nil, false } diff --git a/manager/internal/jobmgr/job/state/adjusting.go b/manager/internal/jobmgr/job/state/adjusting.go index 19d1ac1..8ff8462 100644 --- a/manager/internal/jobmgr/job/state/adjusting.go +++ b/manager/internal/jobmgr/job/state/adjusting.go @@ -49,21 +49,20 @@ func (s *Adjusting) do(rtx jobmgr.JobStateRunContext, jo *jobmgr.Job) error { //norJob := jo.Body.(*job.NormalJob) var jobFilesInfo schsdk.JobFilesInfo var jobFiles *jobmod.JobFiles - var targetCCID schsdk.CCID switch runningJob := jo.Body.(type) { case *job.NormalJob: jobFilesInfo = runningJob.Info.Files jobFiles = &runningJob.Files - targetCCID = runningJob.TargetCCID + runningJob.TargetCCID = s.scheme.TargetCCID case *job.MultiInstanceJob: jobFilesInfo = runningJob.Info.Files jobFiles = &runningJob.Files - targetCCID = runningJob.TargetCCID + runningJob.TargetCCID = s.scheme.TargetCCID case *job.InstanceJob: jobFilesInfo = runningJob.Info.Files jobFiles = &runningJob.Files - targetCCID = runningJob.TargetCCID + runningJob.TargetCCID = s.scheme.TargetCCID } ctx, cancel := context.WithCancel(context.Background()) @@ -71,7 +70,7 @@ func (s *Adjusting) do(rtx jobmgr.JobStateRunContext, jo *jobmgr.Job) error { // 监听取消事件 go func() { - event.WaitType[event.Cancel](ctx, rtx.EventSet) + event.WaitType[*event.Cancel](ctx, rtx.EventSet) cancel() }() @@ -121,19 +120,21 @@ func (s *Adjusting) do(rtx jobmgr.JobStateRunContext, jo *jobmgr.Job) error { go func() { defer wg.Done() - e3 = s.doImageScheduling(ctx, rtx, targetCCID, jobFilesInfo.Image, &jobFiles.Image, &s.scheme.Image) + e3 = s.doImageScheduling(ctx, rtx, s.scheme.TargetCCID, jobFilesInfo.Image, &jobFiles.Image, &s.scheme.Image) if e3 != nil { cancel() } }() + wg.Wait() + return errors.Join(e1, e2, e3) } func (s *Adjusting) doPackageScheduling(ctx context.Context, rtx jobmgr.JobStateRunContext, fileInfo schsdk.JobFileInfo, file *jobmod.PackageJobFile, scheme *jobmod.FileScheduleScheme) error { switch info := fileInfo.(type) { case *schsdk.LocalJobFileInfo: - evt, ok := event.WaitTypeAnd[event.LocalFileUploaded](ctx, rtx.EventSet, func(e event.LocalFileUploaded) bool { + evt, ok := event.WaitTypeAnd[*event.LocalFileUploaded](ctx, rtx.EventSet, func(e *event.LocalFileUploaded) bool { return e.LocalPath == info.LocalPath }) if !ok { @@ -195,7 +196,7 @@ func (s *Adjusting) doPackageScheduling(ctx context.Context, rtx jobmgr.JobState func (s *Adjusting) doImageScheduling(ctx context.Context, rtx jobmgr.JobStateRunContext, targetCCID schsdk.CCID, fileInfo schsdk.JobFileInfo, file *jobmod.ImageJobFile, scheme *jobmod.FileScheduleScheme) error { switch info := fileInfo.(type) { case *schsdk.LocalJobFileInfo: - evt, ok := event.WaitTypeAnd[event.LocalFileUploaded](ctx, rtx.EventSet, func(e event.LocalFileUploaded) bool { + evt, ok := event.WaitTypeAnd[*event.LocalFileUploaded](ctx, rtx.EventSet, func(e *event.LocalFileUploaded) bool { return e.LocalPath == info.LocalPath }) if !ok { diff --git a/manager/internal/jobmgr/job/state/executing.go b/manager/internal/jobmgr/job/state/executing.go index f77a02c..218dca4 100644 --- a/manager/internal/jobmgr/job/state/executing.go +++ b/manager/internal/jobmgr/job/state/executing.go @@ -145,7 +145,7 @@ func (s *DataReturnJobExecuting) do(rtx jobmgr.JobStateRunContext, jo *jobmgr.Jo // 监听取消事件 go func() { - event.WaitType[event.Cancel](ctx, rtx.EventSet) + event.WaitType[*event.Cancel](ctx, rtx.EventSet) cancel() }() @@ -154,6 +154,8 @@ func (s *DataReturnJobExecuting) do(rtx jobmgr.JobStateRunContext, jo *jobmgr.Jo return fmt.Errorf("getting computing center info: %w", err) } + logger.Infof("submited computer center name: %s, id: %s", ccInfo.Name, ccInfo.CCID) + wt := rtx.Mgr.ExecMgr.StartTask(exetsk.NewStorageCreatePackage( 1, // TOOD 用户ID ccInfo.CDSStorageID, diff --git a/manager/internal/jobmgr/job/state/making_adjust_scheme.go b/manager/internal/jobmgr/job/state/making_adjust_scheme.go index 19ffada..c857feb 100644 --- a/manager/internal/jobmgr/job/state/making_adjust_scheme.go +++ b/manager/internal/jobmgr/job/state/making_adjust_scheme.go @@ -32,7 +32,7 @@ func (s *MakingAdjustScheme) do(rtx jobmgr.JobStateRunContext, jo *jobmgr.Job) ( // 监听取消事件 go func() { - event.WaitType[event.Cancel](ctx, rtx.EventSet) + event.WaitType[*event.Cancel](ctx, rtx.EventSet) cancel() }() diff --git a/manager/internal/jobmgr/job/state/multiInstance_init.go b/manager/internal/jobmgr/job/state/multiInstance_init.go index 99c8785..7bdee80 100644 --- a/manager/internal/jobmgr/job/state/multiInstance_init.go +++ b/manager/internal/jobmgr/job/state/multiInstance_init.go @@ -5,6 +5,7 @@ import ( "gitlink.org.cn/cloudream/common/pkgs/logger" schsdk "gitlink.org.cn/cloudream/common/sdks/scheduler" jobmod "gitlink.org.cn/cloudream/scheduler/common/models/job" + "gitlink.org.cn/cloudream/scheduler/common/pkgs/prescheduler" "gitlink.org.cn/cloudream/scheduler/manager/internal/jobmgr" "gitlink.org.cn/cloudream/scheduler/manager/internal/jobmgr/event" "gitlink.org.cn/cloudream/scheduler/manager/internal/jobmgr/job" @@ -13,6 +14,10 @@ import ( type MultiInstanceInit struct { } +func NewMultiInstanceInit() *MultiInstanceInit { + return &MultiInstanceInit{} +} + func (s *MultiInstanceInit) Run(rtx jobmgr.JobStateRunContext, job *jobmgr.Job) { s.do(rtx, job) } @@ -24,11 +29,12 @@ func (s *MultiInstanceInit) do(rtx jobmgr.JobStateRunContext, jo *jobmgr.Job) { defer cancel() go func() { - event.WaitType[event.Cancel](ctx, rtx.EventSet) + event.WaitType[*event.Cancel](ctx, rtx.EventSet) cancel() }() instJobInfo := &schsdk.InstanceJobInfo{ + Type: schsdk.JobTypeInstance, LocalJobID: multInstJob.Info.LocalJobID, Files: multInstJob.Info.Files, Runtime: multInstJob.Info.Runtime, @@ -49,5 +55,16 @@ func (s *MultiInstanceInit) do(rtx jobmgr.JobStateRunContext, jo *jobmgr.Job) { // 在多实例任务中新增这个实例的任务ID multInstJob.SubJobs = append(multInstJob.SubJobs, jobID) - rtx.Mgr.ChangeState(jo, NewMultiInstanceRunning()) + job := &jobmgr.Job{ + JobSetID: jo.JobSetID, + JobID: jo.JobID, + Body: multInstJob, + } + + rtx.Mgr.ChangeState(job, NewMultiInstanceRunning(prescheduler.NewDefaultPreScheduler())) + logger.Info("Create multiInstance job success, jobID: " + job.JobID) +} + +func (s *MultiInstanceInit) Dump(ctx jobmgr.JobStateRunContext, job *jobmgr.Job) jobmod.JobStateDump { + return &jobmod.MultiInstCreateInitDump{} } diff --git a/manager/internal/jobmgr/job/state/multiInstance_running.go b/manager/internal/jobmgr/job/state/multiInstance_running.go index 5786cac..dfad01e 100644 --- a/manager/internal/jobmgr/job/state/multiInstance_running.go +++ b/manager/internal/jobmgr/job/state/multiInstance_running.go @@ -3,9 +3,9 @@ package state import ( "context" "gitlink.org.cn/cloudream/common/pkgs/logger" - "gitlink.org.cn/cloudream/common/pkgs/prescheduler" schsdk "gitlink.org.cn/cloudream/common/sdks/scheduler" jobmod "gitlink.org.cn/cloudream/scheduler/common/models/job" + "gitlink.org.cn/cloudream/scheduler/common/pkgs/prescheduler" "gitlink.org.cn/cloudream/scheduler/manager/internal/jobmgr" "gitlink.org.cn/cloudream/scheduler/manager/internal/jobmgr/event" "gitlink.org.cn/cloudream/scheduler/manager/internal/jobmgr/job" @@ -15,31 +15,28 @@ type MultiInstanceRunning struct { preScheduler prescheduler.PreScheduler } -func (s *MultiInstanceRunning) Dump(ctx jobmgr.JobStateRunContext, job *jobmgr.Job) jobmod.JobStateDump { - //TODO implement me - panic("implement me") +func NewMultiInstanceRunning(preScheduler prescheduler.PreScheduler) *MultiInstanceRunning { + return &MultiInstanceRunning{ + preScheduler: preScheduler, + } } -//func NewMultiInstanceRunning(preScheduler prescheduler.PreScheduler) *MultiInstanceRunning { -// return &MultiInstanceRunning{ -// preScheduler: preScheduler, -// } +//func NewMultiInstanceRunning() *MultiInstanceRunning { +// return &MultiInstanceRunning{} //} -func NewMultiInstanceRunning() *MultiInstanceRunning { - return &MultiInstanceRunning{} -} - func (s *MultiInstanceRunning) Run(rtx jobmgr.JobStateRunContext, job *jobmgr.Job) { s.do(rtx, job) } func (s *MultiInstanceRunning) do(rtx jobmgr.JobStateRunContext, jo *jobmgr.Job) { + logger.Info("start run multiInstanceRunning, jobID: " + jo.JobID) + ctx, cancel := context.WithCancel(context.Background()) defer cancel() go func() { - event.WaitType[event.Cancel](ctx, rtx.EventSet) + event.WaitType[*event.Cancel](ctx, rtx.EventSet) cancel() }() @@ -47,10 +44,12 @@ func (s *MultiInstanceRunning) do(rtx jobmgr.JobStateRunContext, jo *jobmgr.Job) for { // 监听创建实例事件 - ic, ok := event.WaitType[event.InstanceCreate](ctx, rtx.EventSet) + ic, ok := event.WaitType[*event.InstanceCreate](ctx, rtx.EventSet) if !ok { + logger.Info("MultiInstanceRunning canceled") break } + logger.Info("wait a event happened") // 构建InstanceJobInfo infoFiles := schsdk.JobFilesInfo{ @@ -60,6 +59,7 @@ func (s *MultiInstanceRunning) do(rtx jobmgr.JobStateRunContext, jo *jobmgr.Job) } instJobInfo := &schsdk.InstanceJobInfo{ + Type: schsdk.JobTypeInstance, LocalJobID: multInstJob.Info.LocalJobID, Files: infoFiles, Runtime: multInstJob.Info.Runtime, @@ -94,3 +94,7 @@ func (s *MultiInstanceRunning) do(rtx jobmgr.JobStateRunContext, jo *jobmgr.Job) } } + +func (s *MultiInstanceRunning) Dump(ctx jobmgr.JobStateRunContext, job *jobmgr.Job) jobmod.JobStateDump { + return &jobmod.MultiInstCreateRunningDump{} +} diff --git a/manager/internal/jobmgr/job/state/prescheduling.go b/manager/internal/jobmgr/job/state/prescheduling.go index efb246c..647c834 100644 --- a/manager/internal/jobmgr/job/state/prescheduling.go +++ b/manager/internal/jobmgr/job/state/prescheduling.go @@ -4,6 +4,7 @@ import ( "context" "errors" "fmt" + "gitlink.org.cn/cloudream/common/pkgs/logger" "sync" "time" @@ -30,23 +31,24 @@ func NewPreSchuduling(scheme jobmod.JobScheduleScheme) *PreScheduling { } func (s *PreScheduling) Run(rtx jobmgr.JobStateRunContext, jo *jobmgr.Job) { + logger.Info("start run preScheduling, jobID: " + jo.JobID) + var jobFilesInfo schsdk.JobFilesInfo var jobFiles *jobmod.JobFiles - var targetCCID schsdk.CCID switch runningJob := jo.Body.(type) { case *job.NormalJob: jobFilesInfo = runningJob.Info.Files jobFiles = &runningJob.Files - targetCCID = runningJob.TargetCCID + runningJob.TargetCCID = s.scheme.TargetCCID case *job.MultiInstanceJob: jobFilesInfo = runningJob.Info.Files jobFiles = &runningJob.Files - targetCCID = runningJob.TargetCCID + runningJob.TargetCCID = s.scheme.TargetCCID case *job.InstanceJob: jobFilesInfo = runningJob.Info.Files jobFiles = &runningJob.Files - targetCCID = runningJob.TargetCCID + runningJob.TargetCCID = s.scheme.TargetCCID } ctx, cancel := context.WithCancel(context.Background()) @@ -54,7 +56,7 @@ func (s *PreScheduling) Run(rtx jobmgr.JobStateRunContext, jo *jobmgr.Job) { // 监听取消事件 go func() { - event.WaitType[event.Cancel](ctx, rtx.EventSet) + event.WaitType[*event.Cancel](ctx, rtx.EventSet) cancel() }() @@ -88,12 +90,14 @@ func (s *PreScheduling) Run(rtx jobmgr.JobStateRunContext, jo *jobmgr.Job) { go func() { defer wg.Done() - e3 = s.doImageScheduling(ctx, rtx, targetCCID, jobFilesInfo.Image, &jobFiles.Image, &s.scheme.Image) + e3 = s.doImageScheduling(ctx, rtx, s.scheme.TargetCCID, jobFilesInfo.Image, &jobFiles.Image, &s.scheme.Image) if e3 != nil { cancel() } }() + wg.Wait() + allErr := errors.Join(e1, e2, e3) if allErr != nil { rtx.Mgr.ChangeState(jo, FailureComplete(err)) @@ -111,7 +115,7 @@ func (s *PreScheduling) Dump(rtx jobmgr.JobStateRunContext, jo *jobmgr.Job) jobm func (s *PreScheduling) doPackageScheduling(ctx context.Context, rtx jobmgr.JobStateRunContext, fileInfo schsdk.JobFileInfo, file *jobmod.PackageJobFile, scheme *jobmod.FileScheduleScheme) error { switch info := fileInfo.(type) { case *schsdk.LocalJobFileInfo: - evt, ok := event.WaitTypeAnd[event.LocalFileUploaded](ctx, rtx.EventSet, func(e event.LocalFileUploaded) bool { + evt, ok := event.WaitTypeAnd[*event.LocalFileUploaded](ctx, rtx.EventSet, func(e *event.LocalFileUploaded) bool { return e.LocalPath == info.LocalPath }) if !ok { @@ -173,7 +177,7 @@ func (s *PreScheduling) doPackageScheduling(ctx context.Context, rtx jobmgr.JobS func (s *PreScheduling) doImageScheduling(ctx context.Context, rtx jobmgr.JobStateRunContext, targetCCID schsdk.CCID, fileInfo schsdk.JobFileInfo, file *jobmod.ImageJobFile, scheme *jobmod.FileScheduleScheme) error { switch info := fileInfo.(type) { case *schsdk.LocalJobFileInfo: - evt, ok := event.WaitTypeAnd[event.LocalFileUploaded](ctx, rtx.EventSet, func(e event.LocalFileUploaded) bool { + evt, ok := event.WaitTypeAnd[*event.LocalFileUploaded](ctx, rtx.EventSet, func(e *event.LocalFileUploaded) bool { return e.LocalPath == info.LocalPath }) if !ok { diff --git a/manager/internal/jobmgr/job/state/ready_to_adjust.go b/manager/internal/jobmgr/job/state/ready_to_adjust.go index d2914a1..25b6dc9 100644 --- a/manager/internal/jobmgr/job/state/ready_to_adjust.go +++ b/manager/internal/jobmgr/job/state/ready_to_adjust.go @@ -45,12 +45,12 @@ func (s *ReadyToAdjust) do(rtx jobmgr.JobStateRunContext, jo *jobmgr.Job) error // 监听取消事件 go func() { - event.WaitType[event.Cancel](ctx, rtx.EventSet) + event.WaitType[*event.Cancel](ctx, rtx.EventSet) cancel() }() if rt, ok := jobFilesInfo.Dataset.(*schsdk.DataReturnJobFileInfo); ok { - evt, ok := event.WaitTypeAnd[event.JobCompleted](ctx, rtx.EventSet, func(val event.JobCompleted) bool { + evt, ok := event.WaitTypeAnd[*event.JobCompleted](ctx, rtx.EventSet, func(val *event.JobCompleted) bool { return val.Job.GetInfo().GetLocalJobID() == rt.DataReturnLocalJobID }) if !ok { diff --git a/manager/internal/jobmgr/job/state/wait_target_complete.go b/manager/internal/jobmgr/job/state/wait_target_complete.go index c0f4d5c..aeb826e 100644 --- a/manager/internal/jobmgr/job/state/wait_target_complete.go +++ b/manager/internal/jobmgr/job/state/wait_target_complete.go @@ -34,11 +34,11 @@ func (s *WaitTargetComplete) do(rtx jobmgr.JobStateRunContext, jo *jobmgr.Job) e // 监听取消事件 go func() { - event.WaitType[event.Cancel](ctx, rtx.EventSet) + event.WaitType[*event.Cancel](ctx, rtx.EventSet) cancel() }() - evt, ok := event.WaitTypeAnd[event.JobCompleted](ctx, rtx.EventSet, func(val event.JobCompleted) bool { + evt, ok := event.WaitTypeAnd[*event.JobCompleted](ctx, rtx.EventSet, func(val *event.JobCompleted) bool { return val.Job.GetInfo().GetLocalJobID() == reJob.Info.TargetLocalJobID }) if !ok { diff --git a/manager/internal/jobmgr/jobmgr.go b/manager/internal/jobmgr/jobmgr.go index 2a9f810..0370119 100644 --- a/manager/internal/jobmgr/jobmgr.go +++ b/manager/internal/jobmgr/jobmgr.go @@ -158,8 +158,10 @@ func (m *Manager) SubmitJobSet(jobs []SubmittingJob) schsdk.JobSetID { Body: subJob.Body, }, eventSet: NewEventSet(), + state: subJob.InitState, } jobSet.jobs[jobID] = job + m.jobs[jobID] = job // 更改作业的初始状态 //m.ChangeState(&job.job, subJob.InitState) @@ -167,7 +169,7 @@ func (m *Manager) SubmitJobSet(jobs []SubmittingJob) schsdk.JobSetID { subJob.InitState.Run(JobStateRunContext{ Mgr: m, EventSet: &job.eventSet, - LastState: job.state, + LastState: nil, }, &job.job) }() } @@ -205,16 +207,34 @@ type PreSchedulerInstJob struct { } // AddJob 添加一个作业到指定的作业集。 -func (m *Manager) AddJob(jobSetID schsdk.JobSetID, jobBody JobBody, State JobState) schsdk.JobID { - jobID := schsdk.JobID(fmt.Sprintf("%d", m.jobIDIndex+1)) +func (m *Manager) AddJob(jobSetID schsdk.JobSetID, jobBody JobBody, jobState JobState) schsdk.JobID { + m.pubLock.Lock() + defer m.pubLock.Unlock() - job := Job{ - JobSetID: jobSetID, - JobID: jobID, - Body: jobBody, + jobID := schsdk.JobID(fmt.Sprintf("%d", m.jobIDIndex+1)) + m.jobIDIndex += 1 + + job := &mgrJob{ + job: Job{ + JobSetID: jobSetID, + JobID: jobID, + Body: jobBody, + }, + state: jobState, + eventSet: NewEventSet(), } - m.ChangeState(&job, State) + m.jobs[jobID] = job + jobSet := m.jobSets[jobSetID] + jobSet.jobs[jobID] = job + + go func() { + jobState.Run(JobStateRunContext{ + Mgr: m, + EventSet: &job.eventSet, + LastState: nil, + }, &job.job) + }() return jobID } diff --git a/manager/internal/mq/job.go b/manager/internal/mq/job.go index bd45610..38835a2 100644 --- a/manager/internal/mq/job.go +++ b/manager/internal/mq/job.go @@ -58,7 +58,7 @@ func (svc *Service) SubmitJobSet(msg *mgrmq.SubmitJobSet) (*mgrmq.SubmitJobSetRe jobs = append(jobs, jobmgr.SubmittingJob{ Body: job, - InitState: state.NewPreSchuduling(preSch), + InitState: state.NewMultiInstanceInit(), }) } @@ -71,7 +71,7 @@ func (svc *Service) CreateInstance(instInfo *mgrmq.CreateInstance) (*mgrmq.Creat logger.Debugf("start create instance") fut := future.NewSetValue[event.CreateInstanceResult]() - svc.jobMgr.PostEvent(schsdk.JobID(instInfo.LocalJobID), event.NewInstanceCreate(instInfo.LocalPath, fut)) + svc.jobMgr.PostEvent(instInfo.JobID, event.NewInstanceCreate(instInfo.LocalPath, fut)) result, err := fut.WaitValue(context.TODO())