diff --git a/advisor/internal/scheduler/scheduler.go b/advisor/internal/scheduler/scheduler.go index 4685ebe..313a71e 100644 --- a/advisor/internal/scheduler/scheduler.go +++ b/advisor/internal/scheduler/scheduler.go @@ -130,9 +130,23 @@ func NewDefaultSchedule() *DefaultScheduler { } func (s *DefaultScheduler) Schedule(dump *jobmod.JobDump) (*jobmod.JobScheduleScheme, error) { - norJob, ok := dump.Body.(*jobmod.NormalJobDump) - if !ok { - return nil, fmt.Errorf("only normal job can be scheduled, but got %T", dump.Body) + //norJob, ok := dump.Body.(*jobmod.NormalJobDump) + + var jobResourceInfo schsdk.JobResourcesInfo + var jobFiles *jobmod.JobFiles + var targetCCID schsdk.CCID + + switch jobDump := dump.Body.(type) { + case *jobmod.NormalJobDump: + normalJobInfo := dump.Info.(*schsdk.NormalJobInfo) + jobResourceInfo = normalJobInfo.Resources + jobFiles = &jobDump.Files + targetCCID = jobDump.TargetCCID + case *jobmod.InstanceJobDump: + instanceJobInfo := dump.Info.(*schsdk.InstanceJobInfo) + jobResourceInfo = instanceJobInfo.Resources + jobFiles = &jobDump.Files + targetCCID = jobDump.TargetCCID } mgrCli, err := schglb.ManagerMQPool.Acquire() @@ -156,17 +170,17 @@ func (s *DefaultScheduler) Schedule(dump *jobmod.JobDump) (*jobmod.JobScheduleSc for _, cc := range allCC.ComputingCenters { allCCs[cc.CCID] = &candidate{ CC: cc, - IsPreScheduled: cc.CCID == norJob.TargetCCID, + IsPreScheduled: cc.CCID == targetCCID, } } // 计算 - err = s.calcFileScore(norJob.Files, allCCs) + err = s.calcFileScore(*jobFiles, allCCs) if err != nil { return nil, err } - err = s.calcResourceScore(dump.Info.(*schsdk.NormalJobInfo), allCCs) + err = s.calcResourceScore(jobResourceInfo, allCCs) if err != nil { return nil, err } @@ -209,9 +223,9 @@ func (s *DefaultScheduler) makeSchemeForNode(targetCC *candidate) jobmod.JobSche return scheme } -func (s *DefaultScheduler) calcResourceScore(info *schsdk.NormalJobInfo, allCCs map[schsdk.CCID]*candidate) error { +func (s *DefaultScheduler) calcResourceScore(jobResource schsdk.JobResourcesInfo, allCCs map[schsdk.CCID]*candidate) error { for _, cc := range allCCs { - res, err := s.calcOneResourceScore(info.Resources, &cc.CC) + res, err := s.calcOneResourceScore(jobResource, &cc.CC) if err != nil { return err } diff --git a/client/internal/http/job.go b/client/internal/http/job.go new file mode 100644 index 0000000..b4052f7 --- /dev/null +++ b/client/internal/http/job.go @@ -0,0 +1,62 @@ +package http + +import ( + "github.com/gin-gonic/gin" + "gitlink.org.cn/cloudream/common/consts/errorcode" + "gitlink.org.cn/cloudream/common/pkgs/logger" + schsdk "gitlink.org.cn/cloudream/common/sdks/scheduler" + "gitlink.org.cn/cloudream/common/utils/serder" + "io" + "net/http" +) + +type JobService struct { + *Server +} + +type CreateInstanceResp struct { + InstanceID schsdk.JobID `json:"instanceID"` + UploadScheme schsdk.JobFilesUploadScheme `json:"uploadScheme"` +} + +type CreateInstanceReq struct { + JobID schsdk.JobID `json:"jobID" binding:"required"` + DataSet schsdk.JobFileInfo `json:"localPath" binding:"required"` +} + +func (s *Server) JobSvc() *JobService { + return &JobService{ + Server: s, + } +} + +func (s *JobService) CreateInstance(ctx *gin.Context) { + log := logger.WithField("HTTP", "JobSet.HTTP") + + bodyData, err := io.ReadAll(ctx.Request.Body) + if err != nil { + log.Warnf("reading request body: %s", err.Error()) + ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "read request body failed")) + return + } + + req, err := serder.JSONToObjectEx[CreateInstanceReq](bodyData) + if err != nil { + log.Warnf("parsing request body: %s", err.Error()) + ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "parse request body failed")) + return + } + + jobID, filesUploadScheme, err := s.svc.JobSetSvc().CreateInstance(req.JobID, req.DataSet) + if err != nil { + log.Warnf("create job instance: %s", err.Error()) + ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "create job instance failed")) + return + } + + ctx.JSON(http.StatusOK, OK(CreateInstanceResp{ + InstanceID: jobID, + UploadScheme: filesUploadScheme, + })) + +} diff --git a/client/internal/http/server.go b/client/internal/http/server.go index 8e2d284..932977c 100644 --- a/client/internal/http/server.go +++ b/client/internal/http/server.go @@ -39,6 +39,7 @@ func (s *Server) Serve() error { func (s *Server) initRouters() { s.engine.POST("/jobSet/submit", s.JobSetSvc().Submit) + s.engine.POST("/job/createInstance", s.JobSvc().CreateInstance) s.engine.POST("/jobSet/localFileUploaded", s.JobSetSvc().LocalFileUploaded) s.engine.GET("/jobSet/getServiceList", s.JobSetSvc().GetServiceList) } diff --git a/client/internal/prescheduler/default_prescheduler.go b/client/internal/prescheduler/default_prescheduler.go deleted file mode 100644 index 65f0b76..0000000 --- a/client/internal/prescheduler/default_prescheduler.go +++ /dev/null @@ -1,721 +0,0 @@ -package prescheduler - -import ( - "fmt" - "sort" - - "github.com/inhies/go-bytesize" - "github.com/samber/lo" - - schsdk "gitlink.org.cn/cloudream/common/sdks/scheduler" - cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" - uopsdk "gitlink.org.cn/cloudream/common/sdks/unifyops" - "gitlink.org.cn/cloudream/common/utils/math2" - - schglb "gitlink.org.cn/cloudream/scheduler/common/globals" - schmod "gitlink.org.cn/cloudream/scheduler/common/models" - jobmod "gitlink.org.cn/cloudream/scheduler/common/models/job" - "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq/collector" - mgrmq "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq/manager" -) - -const ( - //每个节点划分的资源等级: - // ResourceLevel1:表示所有资源类型均满足 大于等于1.5倍 - ResourceLevel1 = 1 - // ResourceLevel2:表示不满足Level1,但所有资源类型均满足 大于等于1倍 - ResourceLevel2 = 2 - // ResourceLevel3: 表示某些资源类型 小于一倍 - ResourceLevel3 = 3 - - CpuResourceWeight float64 = 1 - StgResourceWeight float64 = 1.2 - - CachingWeight float64 = 1 - LoadedWeight float64 = 2 -) - -var ErrNoAvailableScheme = fmt.Errorf("no appropriate scheduling node found, please wait") - -type candidate struct { - CC schmod.ComputingCenter - IsReferencedJobTarget bool // 这个节点是否是所依赖的任务所选择的节点 - Resource resourcesDetail - Files filesDetail -} - -type resourcesDetail struct { - CPU resourceDetail - GPU resourceDetail - NPU resourceDetail - MLU resourceDetail - Storage resourceDetail - Memory resourceDetail - - TotalScore float64 - AvgScore float64 - MaxLevel int -} -type resourceDetail struct { - Level int - Score float64 -} - -type filesDetail struct { - Dataset fileDetail - Code fileDetail - Image fileDetail - - TotalScore float64 -} -type fileDetail struct { - CachingScore float64 - LoadingScore float64 - IsLoaded bool //表示storage是否已经调度到该节点, image表示镜像是否已经加载到该算力中心 -} - -type schedulingJob struct { - Job schsdk.JobInfo - Afters []string -} - -type CandidateArr []*candidate - -func (a CandidateArr) Len() int { return len(a) } -func (a CandidateArr) Swap(i, j int) { a[i], a[j] = a[j], a[i] } -func (a CandidateArr) Less(i, j int) bool { - n1 := a[i] - n2 := a[j] - - // 优先与所依赖的任务放到一起,但要求那个节点的资源足够 - if n1.IsReferencedJobTarget && n1.Resource.MaxLevel < ResourceLevel3 { - return true - } - if n2.IsReferencedJobTarget && n2.Resource.MaxLevel < ResourceLevel3 { - return true - } - - // 优先判断资源等级,资源等级越低,代表越满足需求 - if n1.Resource.MaxLevel < n2.Resource.MaxLevel { - return true - } - if n1.Resource.MaxLevel > n2.Resource.MaxLevel { - return false - } - - // 等级相同时,根据单项分值比较 - switch n1.Resource.MaxLevel { - case ResourceLevel1: - // 数据文件总分越高,代表此节点上拥有的数据文件越完整,则越优先考虑 - return n1.Files.TotalScore > n2.Files.TotalScore - - case ResourceLevel2: - // 资源分的平均值越高,代表资源越空余,则越优先考虑 - return n1.Resource.AvgScore > n2.Resource.AvgScore - - case ResourceLevel3: - // 资源分的平均值越高,代表资源越空余,则越优先考虑 - return n1.Resource.AvgScore > n2.Resource.AvgScore - } - - return false -} - -type DefaultPreScheduler struct { -} - -func NewDefaultPreScheduler() *DefaultPreScheduler { - return &DefaultPreScheduler{} -} - -func (s *DefaultPreScheduler) Schedule(info *schsdk.JobSetInfo) (*jobmod.JobSetPreScheduleScheme, *schsdk.JobSetFilesUploadScheme, error) { - jobSetScheme := &jobmod.JobSetPreScheduleScheme{ - JobSchemes: make(map[string]jobmod.JobScheduleScheme), - } - filesUploadSchemes := make(map[string]schsdk.LocalFileUploadScheme) - - mgrCli, err := schglb.ManagerMQPool.Acquire() - if err != nil { - return nil, nil, fmt.Errorf("new collector client: %w", err) - } - defer schglb.ManagerMQPool.Release(mgrCli) - - // 查询有哪些算力中心可用 - - allCC, err := mgrCli.GetAllComputingCenter(mgrmq.NewGetAllComputingCenter()) - if err != nil { - return nil, nil, fmt.Errorf("getting all computing center info: %w", err) - } - - ccs := make(map[schsdk.CCID]schmod.ComputingCenter) - for _, node := range allCC.ComputingCenters { - ccs[node.CCID] = node - } - - if len(ccs) == 0 { - return nil, nil, ErrNoAvailableScheme - } - - // 先根据任务配置,收集它们依赖的任务的LocalID - var schJobs []*schedulingJob - for _, job := range info.Jobs { - j := &schedulingJob{ - Job: job, - } - - if norJob, ok := job.(*schsdk.NormalJobInfo); ok { - if resFile, ok := norJob.Files.Dataset.(*schsdk.ResourceJobFileInfo); ok { - j.Afters = append(j.Afters, resFile.ResourceLocalJobID) - } - - if resFile, ok := norJob.Files.Code.(*schsdk.ResourceJobFileInfo); ok { - j.Afters = append(j.Afters, resFile.ResourceLocalJobID) - } - } else if resJob, ok := job.(*schsdk.DataReturnJobInfo); ok { - j.Afters = append(j.Afters, resJob.TargetLocalJobID) - } - - schJobs = append(schJobs, j) - } - - // 然后根据依赖进行排序 - schJobs, ok := s.orderByAfters(schJobs) - if !ok { - return nil, nil, fmt.Errorf("circular reference detected between jobs in the job set") - } - - // 经过排序后,按顺序生成调度方案 - for _, job := range schJobs { - if norJob, ok := job.Job.(*schsdk.NormalJobInfo); ok { - scheme, err := s.scheduleForNormalJob(info, job, ccs, jobSetScheme.JobSchemes) - if err != nil { - return nil, nil, err - } - - jobSetScheme.JobSchemes[job.Job.GetLocalJobID()] = *scheme - - // 检查数据文件的配置项,生成上传文件方案 - s.fillNormarlJobLocalUploadScheme(norJob, scheme.TargetCCID, filesUploadSchemes, ccs) - } - - // 回源任务目前不需要生成调度方案 - } - - return jobSetScheme, &schsdk.JobSetFilesUploadScheme{ - LocalFileSchemes: lo.Values(filesUploadSchemes), - }, nil -} - -func (s *DefaultPreScheduler) orderByAfters(jobs []*schedulingJob) ([]*schedulingJob, bool) { - type jobOrder struct { - Job *schedulingJob - Afters []string - } - - var jobOrders []*jobOrder - for _, job := range jobs { - od := &jobOrder{ - Job: job, - Afters: make([]string, len(job.Afters)), - } - - copy(od.Afters, job.Afters) - - jobOrders = append(jobOrders, od) - } - - // 然后排序 - var orderedJob []*schedulingJob - for { - rm := 0 - for i, jo := range jobOrders { - // 找到没有依赖的任务,然后将其取出 - if len(jo.Afters) == 0 { - orderedJob = append(orderedJob, jo.Job) - - // 删除其他任务对它的引用 - for _, job2 := range jobOrders { - job2.Afters = lo.Reject(job2.Afters, func(item string, idx int) bool { return item == jo.Job.Job.GetLocalJobID() }) - } - - rm++ - continue - } - - jobOrders[i-rm] = jobOrders[i] - } - - jobOrders = jobOrders[:len(jobOrders)-rm] - if len(jobOrders) == 0 { - break - } - - // 遍历一轮后没有找到无依赖的任务,那么就是存在循环引用,排序失败 - if rm == 0 { - return nil, false - } - } - - return orderedJob, true -} - -func (s *DefaultPreScheduler) scheduleForNormalJob(jobSet *schsdk.JobSetInfo, job *schedulingJob, ccs map[schsdk.CCID]schmod.ComputingCenter, jobSchemes map[string]jobmod.JobScheduleScheme) (*jobmod.JobScheduleScheme, error) { - allCCs := make(map[schsdk.CCID]*candidate) - - // 初始化备选节点信息 - for _, cc := range ccs { - caNode := &candidate{ - CC: cc, - } - - // 检查此节点是否是它所引用的任务所选的节点 - for _, af := range job.Afters { - resJob := findJobInfo[*schsdk.DataReturnJobInfo](jobSet.Jobs, af) - if resJob == nil { - return nil, fmt.Errorf("resource job %s not found in the job set", af) - } - - // 由于jobs已经按照引用排序,所以正常情况下这里肯定能取到值 - scheme, ok := jobSchemes[resJob.TargetLocalJobID] - if !ok { - continue - } - - if scheme.TargetCCID == cc.CCID { - caNode.IsReferencedJobTarget = true - break - } - } - - allCCs[cc.CCID] = caNode - } - - norJob := job.Job.(*schsdk.NormalJobInfo) - - // 计算文件占有量得分 - err := s.calcFileScore(norJob.Files, allCCs) - if err != nil { - return nil, err - } - - // 计算资源余量得分 - err = s.calcResourceScore(norJob, allCCs) - if err != nil { - return nil, err - } - - allCCsArr := lo.Values(allCCs) - sort.Sort(CandidateArr(allCCsArr)) - - targetNode := allCCsArr[0] - if targetNode.Resource.MaxLevel == ResourceLevel3 { - return nil, ErrNoAvailableScheme - } - - scheme := s.makeSchemeForNode(norJob, targetNode) - return &scheme, nil -} - -func (s *DefaultPreScheduler) fillNormarlJobLocalUploadScheme(norJob *schsdk.NormalJobInfo, targetCCID schsdk.CCID, schemes map[string]schsdk.LocalFileUploadScheme, ccs map[schsdk.CCID]schmod.ComputingCenter) { - if localFile, ok := norJob.Files.Dataset.(*schsdk.LocalJobFileInfo); ok { - if _, ok := schemes[localFile.LocalPath]; !ok { - cdsNodeID := ccs[targetCCID].CDSNodeID - schemes[localFile.LocalPath] = schsdk.LocalFileUploadScheme{ - LocalPath: localFile.LocalPath, - UploadToCDSNodeID: &cdsNodeID, - } - } - } - - if localFile, ok := norJob.Files.Code.(*schsdk.LocalJobFileInfo); ok { - if _, ok := schemes[localFile.LocalPath]; !ok { - cdsNodeID := ccs[targetCCID].CDSNodeID - schemes[localFile.LocalPath] = schsdk.LocalFileUploadScheme{ - LocalPath: localFile.LocalPath, - UploadToCDSNodeID: &cdsNodeID, - } - } - } - - if localFile, ok := norJob.Files.Image.(*schsdk.LocalJobFileInfo); ok { - if _, ok := schemes[localFile.LocalPath]; !ok { - cdsNodeID := ccs[targetCCID].CDSNodeID - schemes[localFile.LocalPath] = schsdk.LocalFileUploadScheme{ - LocalPath: localFile.LocalPath, - UploadToCDSNodeID: &cdsNodeID, - } - } - } -} - -func (s *DefaultPreScheduler) makeSchemeForNode(job *schsdk.NormalJobInfo, targetCC *candidate) jobmod.JobScheduleScheme { - scheme := jobmod.JobScheduleScheme{ - TargetCCID: targetCC.CC.CCID, - } - - // TODO 根据实际情况选择Move或者Load - - if _, ok := job.Files.Dataset.(*schsdk.PackageJobFileInfo); ok && !targetCC.Files.Dataset.IsLoaded { - scheme.Dataset.Action = jobmod.ActionLoad - } else { - scheme.Dataset.Action = jobmod.ActionNo - } - - if _, ok := job.Files.Code.(*schsdk.PackageJobFileInfo); ok && !targetCC.Files.Code.IsLoaded { - scheme.Code.Action = jobmod.ActionLoad - } else { - scheme.Code.Action = jobmod.ActionNo - } - - if _, ok := job.Files.Image.(*schsdk.PackageJobFileInfo); ok && !targetCC.Files.Image.IsLoaded { - scheme.Image.Action = jobmod.ActionImportImage - } else { - scheme.Image.Action = jobmod.ActionNo - } - - return scheme -} - -func (s *DefaultPreScheduler) calcResourceScore(job *schsdk.NormalJobInfo, allCCs map[schsdk.CCID]*candidate) error { - for _, cc := range allCCs { - res, err := s.calcOneResourceScore(job.Resources, &cc.CC) - if err != nil { - return err - } - - cc.Resource = *res - } - - return nil -} - -// 划分节点资源等级,并计算资源得分 -func (s *DefaultPreScheduler) calcOneResourceScore(requires schsdk.JobResourcesInfo, cc *schmod.ComputingCenter) (*resourcesDetail, error) { - colCli, err := schglb.CollectorMQPool.Acquire() - if err != nil { - return nil, fmt.Errorf("new collector client: %w", err) - } - defer schglb.CollectorMQPool.Release(colCli) - - getResDataResp, err := colCli.GetAllResourceData(collector.NewGetAllResourceData(cc.UOPSlwNodeID)) - if err != nil { - return nil, err - } - - var resDetail resourcesDetail - - //计算资源得分 - totalScore := 0.0 - maxLevel := 0 - resKinds := 0 - - if requires.CPU > 0 { - res := findResuorce[*uopsdk.CPUResourceData](getResDataResp.Datas) - if res == nil { - resDetail.CPU.Level = ResourceLevel3 - resDetail.CPU.Score = 0 - } else { - resDetail.CPU.Level = s.calcResourceLevel(float64(res.Available.Value), requires.CPU) - resDetail.CPU.Score = (float64(res.Available.Value) / requires.CPU) * CpuResourceWeight - } - - maxLevel = math2.Max(maxLevel, resDetail.CPU.Level) - totalScore += resDetail.CPU.Score - resKinds++ - } - - if requires.GPU > 0 { - res := findResuorce[*uopsdk.GPUResourceData](getResDataResp.Datas) - if res == nil { - resDetail.GPU.Level = ResourceLevel3 - resDetail.GPU.Score = 0 - } else { - resDetail.GPU.Level = s.calcResourceLevel(float64(res.Available.Value), requires.GPU) - resDetail.GPU.Score = (float64(res.Available.Value) / requires.GPU) * CpuResourceWeight - } - - maxLevel = math2.Max(maxLevel, resDetail.GPU.Level) - totalScore += resDetail.GPU.Score - resKinds++ - } - - if requires.NPU > 0 { - res := findResuorce[*uopsdk.NPUResourceData](getResDataResp.Datas) - if res == nil { - resDetail.NPU.Level = ResourceLevel3 - resDetail.NPU.Score = 0 - } else { - resDetail.NPU.Level = s.calcResourceLevel(float64(res.Available.Value), requires.NPU) - resDetail.NPU.Score = (float64(res.Available.Value) / requires.NPU) * CpuResourceWeight - } - - maxLevel = math2.Max(maxLevel, resDetail.NPU.Level) - totalScore += resDetail.NPU.Score - resKinds++ - } - - if requires.MLU > 0 { - res := findResuorce[*uopsdk.MLUResourceData](getResDataResp.Datas) - if res == nil { - resDetail.MLU.Level = ResourceLevel3 - resDetail.MLU.Score = 0 - } else { - resDetail.MLU.Level = s.calcResourceLevel(float64(res.Available.Value), requires.MLU) - resDetail.MLU.Score = (float64(res.Available.Value) / requires.MLU) * CpuResourceWeight - } - - maxLevel = math2.Max(maxLevel, resDetail.MLU.Level) - totalScore += resDetail.MLU.Score - resKinds++ - } - - if requires.Storage > 0 { - res := findResuorce[*uopsdk.StorageResourceData](getResDataResp.Datas) - if res == nil { - resDetail.Storage.Level = ResourceLevel3 - resDetail.Storage.Score = 0 - } else { - bytes, err := bytesize.Parse(fmt.Sprintf("%f%s", res.Available.Value, res.Available.Unit)) - if err != nil { - return nil, err - } - - resDetail.Storage.Level = s.calcResourceLevel(float64(bytes), float64(requires.Storage)) - resDetail.Storage.Score = (float64(bytes) / float64(requires.Storage)) * StgResourceWeight - } - - maxLevel = math2.Max(maxLevel, resDetail.Storage.Level) - totalScore += resDetail.Storage.Score - resKinds++ - } - - if requires.Memory > 0 { - res := findResuorce[*uopsdk.MemoryResourceData](getResDataResp.Datas) - if res == nil { - resDetail.Memory.Level = ResourceLevel3 - resDetail.Memory.Score = 0 - } else { - bytes, err := bytesize.Parse(fmt.Sprintf("%f%s", res.Available.Value, res.Available.Unit)) - if err != nil { - return nil, err - } - - resDetail.Memory.Level = s.calcResourceLevel(float64(bytes), float64(requires.Memory)) - resDetail.Memory.Score = (float64(bytes) / float64(requires.Memory)) * StgResourceWeight - } - - maxLevel = math2.Max(maxLevel, resDetail.Memory.Level) - totalScore += resDetail.Memory.Score - resKinds++ - } - - if resKinds == 0 { - return &resDetail, nil - } - - resDetail.TotalScore = totalScore - resDetail.AvgScore = resDetail.AvgScore / float64(resKinds) - resDetail.MaxLevel = maxLevel - - return &resDetail, nil -} - -func (s *DefaultPreScheduler) calcResourceLevel(avai float64, need float64) int { - if avai >= 1.5*need { - return ResourceLevel1 - } - - if avai >= need { - return ResourceLevel2 - } - - return ResourceLevel3 -} - -// 计算节点得分情况 -func (s *DefaultPreScheduler) calcFileScore(files schsdk.JobFilesInfo, allCCs map[schsdk.CCID]*candidate) error { - // 只计算运控返回的可用计算中心上的存储服务的数据权重 - cdsNodeToCC := make(map[cdssdk.NodeID]*candidate) - for _, cc := range allCCs { - cdsNodeToCC[cc.CC.CDSNodeID] = cc - } - - //计算code相关得分 - if pkgFile, ok := files.Code.(*schsdk.PackageJobFileInfo); ok { - codeFileScores, err := s.calcPackageFileScore(pkgFile.PackageID, cdsNodeToCC) - if err != nil { - return fmt.Errorf("calc code file score: %w", err) - } - for id, score := range codeFileScores { - allCCs[id].Files.Code = *score - } - } - - //计算dataset相关得分 - if pkgFile, ok := files.Dataset.(*schsdk.PackageJobFileInfo); ok { - datasetFileScores, err := s.calcPackageFileScore(pkgFile.PackageID, cdsNodeToCC) - if err != nil { - return fmt.Errorf("calc dataset file score: %w", err) - } - for id, score := range datasetFileScores { - allCCs[id].Files.Dataset = *score - } - } - - //计算image相关得分 - if imgFile, ok := files.Image.(*schsdk.ImageJobFileInfo); ok { - //计算image相关得分 - imageFileScores, err := s.calcImageFileScore(imgFile.ImageID, allCCs, cdsNodeToCC) - if err != nil { - return fmt.Errorf("calc image file score: %w", err) - } - for id, score := range imageFileScores { - allCCs[id].Files.Image = *score - } - } - - for _, cc := range allCCs { - cc.Files.TotalScore = cc.Files.Code.CachingScore + - cc.Files.Code.LoadingScore + - cc.Files.Dataset.CachingScore + - cc.Files.Dataset.LoadingScore + - cc.Files.Image.CachingScore + - cc.Files.Image.LoadingScore - } - - return nil -} - -// 计算package在各节点的得分情况 -func (s *DefaultPreScheduler) calcPackageFileScore(packageID cdssdk.PackageID, cdsNodeToCC map[cdssdk.NodeID]*candidate) (map[schsdk.CCID]*fileDetail, error) { - colCli, err := schglb.CollectorMQPool.Acquire() - if err != nil { - return nil, fmt.Errorf("new collector client: %w", err) - } - defer schglb.CollectorMQPool.Release(colCli) - - ccFileScores := make(map[schsdk.CCID]*fileDetail) - - // TODO UserID - cachedResp, err := colCli.PackageGetCachedStgNodes(collector.NewPackageGetCachedStgNodes(1, packageID)) - if err != nil { - return nil, err - } - - for _, cdsNodeCacheInfo := range cachedResp.NodeInfos { - cc, ok := cdsNodeToCC[cdsNodeCacheInfo.NodeID] - if !ok { - continue - } - - ccFileScores[cc.CC.CCID] = &fileDetail{ - //TODO 根据缓存方式不同,可能会有不同的计算方式 - CachingScore: float64(cdsNodeCacheInfo.FileSize) / float64(cachedResp.PackageSize) * CachingWeight, - } - } - - // TODO UserID - loadedResp, err := colCli.PackageGetLoadedStgNodes(collector.NewPackageGetLoadedStgNodes(1, packageID)) - if err != nil { - return nil, err - } - - for _, cdsNodeID := range loadedResp.StgNodeIDs { - cc, ok := cdsNodeToCC[cdsNodeID] - if !ok { - continue - } - - sfc, ok := ccFileScores[cc.CC.CCID] - if !ok { - sfc = &fileDetail{} - ccFileScores[cc.CC.CCID] = sfc - } - - sfc.LoadingScore = 1 * LoadedWeight - sfc.IsLoaded = true - } - - return ccFileScores, nil -} - -// 计算package在各节点的得分情况 -func (s *DefaultPreScheduler) calcImageFileScore(imageID schsdk.ImageID, allCCs map[schsdk.CCID]*candidate, cdsNodeToCC map[cdssdk.NodeID]*candidate) (map[schsdk.CCID]*fileDetail, error) { - colCli, err := schglb.CollectorMQPool.Acquire() - if err != nil { - return nil, fmt.Errorf("new collector client: %w", err) - } - defer schglb.CollectorMQPool.Release(colCli) - - magCli, err := schglb.ManagerMQPool.Acquire() - if err != nil { - return nil, fmt.Errorf("new manager client: %w", err) - } - defer schglb.ManagerMQPool.Release(magCli) - - imageInfoResp, err := magCli.GetImageInfo(mgrmq.NewGetImageInfo(imageID)) - if err != nil { - return nil, fmt.Errorf("getting image info: %w", err) - } - - ccFileScores := make(map[schsdk.CCID]*fileDetail) - - if imageInfoResp.Image.CDSPackageID != nil { - cachedResp, err := colCli.PackageGetCachedStgNodes(collector.NewPackageGetCachedStgNodes(1, *imageInfoResp.Image.CDSPackageID)) - if err != nil { - return nil, err - } - - for _, cdsNodeCacheInfo := range cachedResp.NodeInfos { - cc, ok := cdsNodeToCC[cdsNodeCacheInfo.NodeID] - if !ok { - continue - } - - ccFileScores[cc.CC.CCID] = &fileDetail{ - //TODO 根据缓存方式不同,可能会有不同的计算方式 - CachingScore: float64(cdsNodeCacheInfo.FileSize) / float64(cachedResp.PackageSize) * CachingWeight, - } - } - } - - // 镜像的LoadingScore是判断是否导入到算力中心 - for _, pcmImg := range imageInfoResp.PCMImages { - _, ok := allCCs[pcmImg.CCID] - if !ok { - continue - } - - fsc, ok := ccFileScores[pcmImg.CCID] - if !ok { - fsc = &fileDetail{} - ccFileScores[pcmImg.CCID] = fsc - } - - fsc.LoadingScore = 1 * LoadedWeight - fsc.IsLoaded = true - } - - return ccFileScores, nil -} -func findResuorce[T uopsdk.ResourceData](all []uopsdk.ResourceData) T { - for _, data := range all { - if ret, ok := data.(T); ok { - return ret - } - } - - var def T - return def -} - -func findJobInfo[T schsdk.JobInfo](jobs []schsdk.JobInfo, localJobID string) T { - for _, job := range jobs { - if ret, ok := job.(T); ok && job.GetLocalJobID() == localJobID { - return ret - } - } - - var def T - return def -} diff --git a/client/internal/prescheduler/prescheduler.go b/client/internal/prescheduler/prescheduler.go deleted file mode 100644 index a9dc5c6..0000000 --- a/client/internal/prescheduler/prescheduler.go +++ /dev/null @@ -1,10 +0,0 @@ -package prescheduler - -import ( - schsdk "gitlink.org.cn/cloudream/common/sdks/scheduler" - jobmod "gitlink.org.cn/cloudream/scheduler/common/models/job" -) - -type PreScheduler interface { - Schedule(info *schsdk.JobSetInfo) (*jobmod.JobSetPreScheduleScheme, *schsdk.JobSetFilesUploadScheme, error) -} diff --git a/client/internal/services/job.go b/client/internal/services/job.go new file mode 100644 index 0000000..f670bc6 --- /dev/null +++ b/client/internal/services/job.go @@ -0,0 +1,27 @@ +package services + +import ( + "fmt" + schsdk "gitlink.org.cn/cloudream/common/sdks/scheduler" + schglb "gitlink.org.cn/cloudream/scheduler/common/globals" + mgrmq "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq/manager" +) + +// Create 创建多实例任务中的实例任务 +func (svc *JobSetService) CreateInstance(jobID schsdk.JobID, dataSet schsdk.JobFileInfo) (schsdk.JobID, schsdk.JobFilesUploadScheme, error) { + + scheme := new(schsdk.JobFilesUploadScheme) + + mgrCli, err := schglb.ManagerMQPool.Acquire() + if err != nil { + return "", *scheme, fmt.Errorf("new manager client: %w", err) + } + defer schglb.ManagerMQPool.Release(mgrCli) + + resp, err := mgrCli.CreateInstance(mgrmq.NewCreateInstance(jobID, dataSet)) + if err != nil { + return "", *scheme, fmt.Errorf("submitting job set to manager: %w", err) + } + + return resp.InstanceID, resp.UploadScheme, nil +} diff --git a/client/internal/services/jobset.go b/client/internal/services/jobset.go index 50e3d31..f3f361e 100644 --- a/client/internal/services/jobset.go +++ b/client/internal/services/jobset.go @@ -25,7 +25,7 @@ func (svc *JobSetService) Submit(info schsdk.JobSetInfo) (schsdk.JobSetID, *schs } defer schglb.ManagerMQPool.Release(mgrCli) - schScheme, uploadScheme, err := svc.preScheduler.Schedule(&info) + schScheme, uploadScheme, err := svc.preScheduler.ScheduleJobSet(&info) if err != nil { return "", nil, fmt.Errorf("pre scheduling: %w", err) } diff --git a/client/internal/services/service.go b/client/internal/services/service.go index 1d2029c..7eb124a 100644 --- a/client/internal/services/service.go +++ b/client/internal/services/service.go @@ -1,6 +1,8 @@ package services -import "gitlink.org.cn/cloudream/scheduler/client/internal/prescheduler" +import ( + "gitlink.org.cn/cloudream/scheduler/common/pkgs/prescheduler" +) type Service struct { preScheduler prescheduler.PreScheduler diff --git a/client/main.go b/client/main.go index 8a24c72..34089fc 100644 --- a/client/main.go +++ b/client/main.go @@ -2,6 +2,7 @@ package main import ( "fmt" + "gitlink.org.cn/cloudream/scheduler/common/pkgs/prescheduler" "os" _ "google.golang.org/grpc/balancer/grpclb" @@ -9,7 +10,6 @@ import ( "gitlink.org.cn/cloudream/common/pkgs/logger" "gitlink.org.cn/cloudream/scheduler/client/internal/cmdline" "gitlink.org.cn/cloudream/scheduler/client/internal/config" - "gitlink.org.cn/cloudream/scheduler/client/internal/prescheduler" "gitlink.org.cn/cloudream/scheduler/client/internal/services" schglb "gitlink.org.cn/cloudream/scheduler/common/globals" ) diff --git a/common/assets/confs/advisor.config.json b/common/assets/confs/advisor.config.json index e0eb871..d545a8f 100644 --- a/common/assets/confs/advisor.config.json +++ b/common/assets/confs/advisor.config.json @@ -1,6 +1,6 @@ { "logger": { - "output": "file", + "output": "stdout", "outputFileName": "advisor", "outputDirectory": "log", "level": "debug" diff --git a/common/assets/confs/collector.config.json b/common/assets/confs/collector.config.json index e97e9ee..b1b1b63 100644 --- a/common/assets/confs/collector.config.json +++ b/common/assets/confs/collector.config.json @@ -1,6 +1,6 @@ { "logger": { - "output": "file", + "output": "stdout", "outputFileName": "collector", "outputDirectory": "log", "level": "debug" diff --git a/common/assets/confs/manager.config.json b/common/assets/confs/manager.config.json index 3d6603f..61746f6 100644 --- a/common/assets/confs/manager.config.json +++ b/common/assets/confs/manager.config.json @@ -1,6 +1,6 @@ { "logger": { - "output": "file", + "output": "stdout", "outputFileName": "manager", "outputDirectory": "log", "level": "debug" diff --git a/common/models/job/body.go b/common/models/job/body.go index 8b99381..9ac5a6a 100644 --- a/common/models/job/body.go +++ b/common/models/job/body.go @@ -18,6 +18,8 @@ type JobBodyDump interface { var _ = serder.UseTypeUnionExternallyTagged(types.Ref(types.NewTypeUnion[JobBodyDump]( (*NormalJobDump)(nil), (*DataReturnJobDump)(nil), + (*InstanceJobDump)(nil), + (*MultiInstanceJobDump)(nil), ))) type NormalJobDump struct { @@ -40,3 +42,25 @@ type DataReturnJobDump struct { func (d *DataReturnJobDump) getType() JobBodyDumpType { return d.Type } + +type InstanceJobDump struct { + serder.Metadata `union:"InstanceJob"` + Type JobBodyDumpType `json:"type"` + TargetCCID schsdk.CCID `json:"targetCCID"` + Files JobFiles `json:"files"` +} + +func (d *InstanceJobDump) getType() JobBodyDumpType { + return d.Type +} + +type MultiInstanceJobDump struct { + serder.Metadata `union:"MultiInstanceJob"` + Type JobBodyDumpType `json:"type"` + TargetCCID schsdk.CCID `json:"targetCCID"` + Files JobFiles `json:"files"` +} + +func (d *MultiInstanceJobDump) getType() JobBodyDumpType { + return d.Type +} diff --git a/common/models/job/state.go b/common/models/job/state.go index 68c827c..a3b884a 100644 --- a/common/models/job/state.go +++ b/common/models/job/state.go @@ -27,7 +27,6 @@ var _ = serder.UseTypeUnionExternallyTagged(types.Ref(types.NewTypeUnion[JobStat (*WaitTargetCompleteDump)(nil), ))) -// 调整中 type AdjustingDump struct { serder.Metadata `union:"Adjusting"` Type JobStateDumpType `json:"type"` @@ -38,7 +37,6 @@ func (dump *AdjustingDump) getType() JobStateDumpType { return dump.Type } -// 任务结束 type CompletedDump struct { serder.Metadata `union:"Completed"` Type JobStateDumpType `json:"type"` @@ -49,7 +47,24 @@ func (dump *CompletedDump) getType() JobStateDumpType { return dump.Type } -// 普通任务执行中 +type MultiInstCreateInitDump struct { + serder.Metadata `union:"MultiInstCreateInit"` + Type JobStateDumpType `json:"type"` +} + +func (dump *MultiInstCreateInitDump) getType() JobStateDumpType { + return dump.Type +} + +type MultiInstCreateRunningDump struct { + serder.Metadata `union:"MultiInstCreateRunning"` + Type JobStateDumpType `json:"type"` +} + +func (dump *MultiInstCreateRunningDump) getType() JobStateDumpType { + return dump.Type +} + type NormalJobExecutingDump struct { serder.Metadata `union:"NormalJobExecuting"` Type JobStateDumpType `json:"type"` @@ -60,7 +75,6 @@ func (dump *NormalJobExecutingDump) getType() JobStateDumpType { return dump.Type } -// 回源任务执行中 type DataReturnExecutingDump struct { serder.Metadata `union:"DataReturnExecuting"` Type JobStateDumpType `json:"type"` @@ -70,7 +84,6 @@ func (dump *DataReturnExecutingDump) getType() JobStateDumpType { return dump.Type } -// 制作调整方案中 type MakeingAdjustSchemeDump struct { serder.Metadata `union:"MakeingAdjustScheme"` Type JobStateDumpType `json:"type"` @@ -80,7 +93,6 @@ func (dump *MakeingAdjustSchemeDump) getType() JobStateDumpType { return dump.Type } -// 预调度中 type PreSchedulingDump struct { serder.Metadata `union:"PreScheduling"` Type JobStateDumpType `json:"type"` @@ -91,7 +103,6 @@ func (dump *PreSchedulingDump) getType() JobStateDumpType { return dump.Type } -// 准备调整中 type ReadyToAdjustDump struct { serder.Metadata `union:"ReadyToAdjust"` Type JobStateDumpType `json:"type"` @@ -101,7 +112,6 @@ func (dump *ReadyToAdjustDump) getType() JobStateDumpType { return dump.Type } -// 普通任务准备执行中 type NormalJobReadyToExecuteDump struct { serder.Metadata `union:"NormalJobReadyToExecute"` Type JobStateDumpType `json:"type"` @@ -111,7 +121,6 @@ func (dump *NormalJobReadyToExecuteDump) getType() JobStateDumpType { return dump.Type } -// 回源任务准备执行中 type DataReturnReadyToExecuteDump struct { serder.Metadata `union:"DataReturnReadyToExecute"` Type JobStateDumpType `json:"type"` @@ -121,7 +130,6 @@ func (dump *DataReturnReadyToExecuteDump) getType() JobStateDumpType { return dump.Type } -// 等待回源目标完成中 type WaitTargetCompleteDump struct { serder.Metadata `union:"WaitTargetComplete"` Type JobStateDumpType `json:"type"` diff --git a/common/pkgs/mq/manager/job.go b/common/pkgs/mq/manager/job.go index 9e89540..9a7ab29 100644 --- a/common/pkgs/mq/manager/job.go +++ b/common/pkgs/mq/manager/job.go @@ -15,11 +15,15 @@ type JobService interface { GetServiceList(msg *GetServiceList) (*GetServiceListResp, *mq.CodeMessage) GetJobSetDump(msg *GetJobSetDump) (*GetJobSetDumpResp, *mq.CodeMessage) + + CreateInstance(msg *CreateInstance) (*CreateInstanceResp, *mq.CodeMessage) } // 提交任务集 var _ = Register(Service.SubmitJobSet) +var _ = Register(Service.CreateInstance) + type SubmitJobSet struct { mq.MessageBodyBase JobSet schsdk.JobSetInfo `json:"jobSet"` @@ -45,6 +49,36 @@ func (c *Client) SubmitJobSet(msg *SubmitJobSet, opts ...mq.RequestOption) (*Sub return mq.Request(Service.SubmitJobSet, c.roundTripper, msg, opts...) } +type CreateInstance struct { + mq.MessageBodyBase + JobID schsdk.JobID + DataSet schsdk.JobFileInfo +} + +type CreateInstanceResp struct { + mq.MessageBodyBase + InstanceID schsdk.JobID `json:"instanceID"` + UploadScheme schsdk.JobFilesUploadScheme `json:"uploadScheme"` +} + +func NewCreateInstance(jobID schsdk.JobID, dataSet schsdk.JobFileInfo) *CreateInstance { + return &CreateInstance{ + JobID: jobID, + DataSet: dataSet, + } +} + +func NewCreateInstanceResp(InstanceID schsdk.JobID, UploadScheme schsdk.JobFilesUploadScheme) *CreateInstanceResp { + return &CreateInstanceResp{ + InstanceID: InstanceID, + UploadScheme: UploadScheme, + } +} + +func (c *Client) CreateInstance(instance *CreateInstance, opts ...mq.RequestOption) (*CreateInstanceResp, error) { + return mq.Request(Service.CreateInstance, c.roundTripper, instance, opts...) +} + // JobSet中需要使用的一个文件上传完成 var _ = Register(Service.JobSetLocalFileUploaded) diff --git a/common/pkgs/prescheduler/calc_score.go b/common/pkgs/prescheduler/calc_score.go new file mode 100644 index 0000000..68cc801 --- /dev/null +++ b/common/pkgs/prescheduler/calc_score.go @@ -0,0 +1,337 @@ +package prescheduler + +import ( + "fmt" + "github.com/inhies/go-bytesize" + schsdk "gitlink.org.cn/cloudream/common/sdks/scheduler" + cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" + uopsdk "gitlink.org.cn/cloudream/common/sdks/unifyops" + "gitlink.org.cn/cloudream/common/utils/math2" + schglb "gitlink.org.cn/cloudream/scheduler/common/globals" + schmod "gitlink.org.cn/cloudream/scheduler/common/models" + "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq/collector" + mgrmq "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq/manager" +) + +func (s *DefaultPreScheduler) calcResourceScore(jobResource schsdk.JobResourcesInfo, allCCs map[schsdk.CCID]*candidate) error { + for _, cc := range allCCs { + res, err := s.calcOneResourceScore(jobResource, &cc.CC) + if err != nil { + return err + } + + cc.Resource = *res + } + + return nil +} + +// 划分节点资源等级,并计算资源得分 +func (s *DefaultPreScheduler) calcOneResourceScore(requires schsdk.JobResourcesInfo, cc *schmod.ComputingCenter) (*resourcesDetail, error) { + colCli, err := schglb.CollectorMQPool.Acquire() + if err != nil { + return nil, fmt.Errorf("new collector client: %w", err) + } + defer schglb.CollectorMQPool.Release(colCli) + + getResDataResp, err := colCli.GetAllResourceData(collector.NewGetAllResourceData(cc.UOPSlwNodeID)) + if err != nil { + return nil, err + } + + var resDetail resourcesDetail + + //计算资源得分 + totalScore := 0.0 + maxLevel := 0 + resKinds := 0 + + if requires.CPU > 0 { + res := findResuorce[*uopsdk.CPUResourceData](getResDataResp.Datas) + if res == nil { + resDetail.CPU.Level = ResourceLevel3 + resDetail.CPU.Score = 0 + } else { + resDetail.CPU.Level = s.calcResourceLevel(float64(res.Available.Value), requires.CPU) + resDetail.CPU.Score = (float64(res.Available.Value) / requires.CPU) * CpuResourceWeight + } + + maxLevel = math2.Max(maxLevel, resDetail.CPU.Level) + totalScore += resDetail.CPU.Score + resKinds++ + } + + if requires.GPU > 0 { + res := findResuorce[*uopsdk.GPUResourceData](getResDataResp.Datas) + if res == nil { + resDetail.GPU.Level = ResourceLevel3 + resDetail.GPU.Score = 0 + } else { + resDetail.GPU.Level = s.calcResourceLevel(float64(res.Available.Value), requires.GPU) + resDetail.GPU.Score = (float64(res.Available.Value) / requires.GPU) * CpuResourceWeight + } + + maxLevel = math2.Max(maxLevel, resDetail.GPU.Level) + totalScore += resDetail.GPU.Score + resKinds++ + } + + if requires.NPU > 0 { + res := findResuorce[*uopsdk.NPUResourceData](getResDataResp.Datas) + if res == nil { + resDetail.NPU.Level = ResourceLevel3 + resDetail.NPU.Score = 0 + } else { + resDetail.NPU.Level = s.calcResourceLevel(float64(res.Available.Value), requires.NPU) + resDetail.NPU.Score = (float64(res.Available.Value) / requires.NPU) * CpuResourceWeight + } + + maxLevel = math2.Max(maxLevel, resDetail.NPU.Level) + totalScore += resDetail.NPU.Score + resKinds++ + } + + if requires.MLU > 0 { + res := findResuorce[*uopsdk.MLUResourceData](getResDataResp.Datas) + if res == nil { + resDetail.MLU.Level = ResourceLevel3 + resDetail.MLU.Score = 0 + } else { + resDetail.MLU.Level = s.calcResourceLevel(float64(res.Available.Value), requires.MLU) + resDetail.MLU.Score = (float64(res.Available.Value) / requires.MLU) * CpuResourceWeight + } + + maxLevel = math2.Max(maxLevel, resDetail.MLU.Level) + totalScore += resDetail.MLU.Score + resKinds++ + } + + if requires.Storage > 0 { + res := findResuorce[*uopsdk.StorageResourceData](getResDataResp.Datas) + if res == nil { + resDetail.Storage.Level = ResourceLevel3 + resDetail.Storage.Score = 0 + } else { + bytes, err := bytesize.Parse(fmt.Sprintf("%f%s", res.Available.Value, res.Available.Unit)) + if err != nil { + return nil, err + } + + resDetail.Storage.Level = s.calcResourceLevel(float64(bytes), float64(requires.Storage)) + resDetail.Storage.Score = (float64(bytes) / float64(requires.Storage)) * StgResourceWeight + } + + maxLevel = math2.Max(maxLevel, resDetail.Storage.Level) + totalScore += resDetail.Storage.Score + resKinds++ + } + + if requires.Memory > 0 { + res := findResuorce[*uopsdk.MemoryResourceData](getResDataResp.Datas) + if res == nil { + resDetail.Memory.Level = ResourceLevel3 + resDetail.Memory.Score = 0 + } else { + bytes, err := bytesize.Parse(fmt.Sprintf("%f%s", res.Available.Value, res.Available.Unit)) + if err != nil { + return nil, err + } + + resDetail.Memory.Level = s.calcResourceLevel(float64(bytes), float64(requires.Memory)) + resDetail.Memory.Score = (float64(bytes) / float64(requires.Memory)) * StgResourceWeight + } + + maxLevel = math2.Max(maxLevel, resDetail.Memory.Level) + totalScore += resDetail.Memory.Score + resKinds++ + } + + if resKinds == 0 { + return &resDetail, nil + } + + resDetail.TotalScore = totalScore + resDetail.AvgScore = resDetail.AvgScore / float64(resKinds) + resDetail.MaxLevel = maxLevel + + return &resDetail, nil +} + +func (s *DefaultPreScheduler) calcResourceLevel(avai float64, need float64) int { + if avai >= 1.5*need { + return ResourceLevel1 + } + + if avai >= need { + return ResourceLevel2 + } + + return ResourceLevel3 +} + +// 计算节点得分情况 +func (s *DefaultPreScheduler) calcFileScore(files schsdk.JobFilesInfo, allCCs map[schsdk.CCID]*candidate) error { + // 只计算运控返回的可用计算中心上的存储服务的数据权重 + cdsNodeToCC := make(map[cdssdk.NodeID]*candidate) + for _, cc := range allCCs { + cdsNodeToCC[cc.CC.CDSNodeID] = cc + } + + //计算code相关得分 + if pkgFile, ok := files.Code.(*schsdk.PackageJobFileInfo); ok { + codeFileScores, err := s.calcPackageFileScore(pkgFile.PackageID, cdsNodeToCC) + if err != nil { + return fmt.Errorf("calc code file score: %w", err) + } + for id, score := range codeFileScores { + allCCs[id].Files.Code = *score + } + } + + //计算dataset相关得分 + if pkgFile, ok := files.Dataset.(*schsdk.PackageJobFileInfo); ok { + datasetFileScores, err := s.calcPackageFileScore(pkgFile.PackageID, cdsNodeToCC) + if err != nil { + return fmt.Errorf("calc dataset file score: %w", err) + } + for id, score := range datasetFileScores { + allCCs[id].Files.Dataset = *score + } + } + + //计算image相关得分 + if imgFile, ok := files.Image.(*schsdk.ImageJobFileInfo); ok { + //计算image相关得分 + imageFileScores, err := s.calcImageFileScore(imgFile.ImageID, allCCs, cdsNodeToCC) + if err != nil { + return fmt.Errorf("calc image file score: %w", err) + } + for id, score := range imageFileScores { + allCCs[id].Files.Image = *score + } + } + + for _, cc := range allCCs { + cc.Files.TotalScore = cc.Files.Code.CachingScore + + cc.Files.Code.LoadingScore + + cc.Files.Dataset.CachingScore + + cc.Files.Dataset.LoadingScore + + cc.Files.Image.CachingScore + + cc.Files.Image.LoadingScore + } + + return nil +} + +// 计算package在各节点的得分情况 +func (s *DefaultPreScheduler) calcPackageFileScore(packageID cdssdk.PackageID, cdsNodeToCC map[cdssdk.NodeID]*candidate) (map[schsdk.CCID]*fileDetail, error) { + colCli, err := schglb.CollectorMQPool.Acquire() + if err != nil { + return nil, fmt.Errorf("new collector client: %w", err) + } + defer schglb.CollectorMQPool.Release(colCli) + + ccFileScores := make(map[schsdk.CCID]*fileDetail) + + // TODO UserID + cachedResp, err := colCli.PackageGetCachedStgNodes(collector.NewPackageGetCachedStgNodes(1, packageID)) + if err != nil { + return nil, err + } + + for _, cdsNodeCacheInfo := range cachedResp.NodeInfos { + cc, ok := cdsNodeToCC[cdsNodeCacheInfo.NodeID] + if !ok { + continue + } + + ccFileScores[cc.CC.CCID] = &fileDetail{ + //TODO 根据缓存方式不同,可能会有不同的计算方式 + CachingScore: float64(cdsNodeCacheInfo.FileSize) / float64(cachedResp.PackageSize) * CachingWeight, + } + } + + // TODO UserID + loadedResp, err := colCli.PackageGetLoadedStgNodes(collector.NewPackageGetLoadedStgNodes(1, packageID)) + if err != nil { + return nil, err + } + + for _, cdsNodeID := range loadedResp.StgNodeIDs { + cc, ok := cdsNodeToCC[cdsNodeID] + if !ok { + continue + } + + sfc, ok := ccFileScores[cc.CC.CCID] + if !ok { + sfc = &fileDetail{} + ccFileScores[cc.CC.CCID] = sfc + } + + sfc.LoadingScore = 1 * LoadedWeight + sfc.IsLoaded = true + } + + return ccFileScores, nil +} + +// 计算package在各节点的得分情况 +func (s *DefaultPreScheduler) calcImageFileScore(imageID schsdk.ImageID, allCCs map[schsdk.CCID]*candidate, cdsNodeToCC map[cdssdk.NodeID]*candidate) (map[schsdk.CCID]*fileDetail, error) { + colCli, err := schglb.CollectorMQPool.Acquire() + if err != nil { + return nil, fmt.Errorf("new collector client: %w", err) + } + defer schglb.CollectorMQPool.Release(colCli) + + magCli, err := schglb.ManagerMQPool.Acquire() + if err != nil { + return nil, fmt.Errorf("new manager client: %w", err) + } + defer schglb.ManagerMQPool.Release(magCli) + + imageInfoResp, err := magCli.GetImageInfo(mgrmq.NewGetImageInfo(imageID)) + if err != nil { + return nil, fmt.Errorf("getting image info: %w", err) + } + + ccFileScores := make(map[schsdk.CCID]*fileDetail) + + if imageInfoResp.Image.CDSPackageID != nil { + cachedResp, err := colCli.PackageGetCachedStgNodes(collector.NewPackageGetCachedStgNodes(1, *imageInfoResp.Image.CDSPackageID)) + if err != nil { + return nil, err + } + + for _, cdsNodeCacheInfo := range cachedResp.NodeInfos { + cc, ok := cdsNodeToCC[cdsNodeCacheInfo.NodeID] + if !ok { + continue + } + + ccFileScores[cc.CC.CCID] = &fileDetail{ + //TODO 根据缓存方式不同,可能会有不同的计算方式 + CachingScore: float64(cdsNodeCacheInfo.FileSize) / float64(cachedResp.PackageSize) * CachingWeight, + } + } + } + + // 镜像的LoadingScore是判断是否导入到算力中心 + for _, pcmImg := range imageInfoResp.PCMImages { + _, ok := allCCs[pcmImg.CCID] + if !ok { + continue + } + + fsc, ok := ccFileScores[pcmImg.CCID] + if !ok { + fsc = &fileDetail{} + ccFileScores[pcmImg.CCID] = fsc + } + + fsc.LoadingScore = 1 * LoadedWeight + fsc.IsLoaded = true + } + + return ccFileScores, nil +} diff --git a/common/pkgs/prescheduler/default_prescheduler.go b/common/pkgs/prescheduler/default_prescheduler.go new file mode 100644 index 0000000..4a9471f --- /dev/null +++ b/common/pkgs/prescheduler/default_prescheduler.go @@ -0,0 +1,513 @@ +package prescheduler + +import ( + "fmt" + "sort" + + "github.com/samber/lo" + + schsdk "gitlink.org.cn/cloudream/common/sdks/scheduler" + uopsdk "gitlink.org.cn/cloudream/common/sdks/unifyops" + schglb "gitlink.org.cn/cloudream/scheduler/common/globals" + schmod "gitlink.org.cn/cloudream/scheduler/common/models" + jobmod "gitlink.org.cn/cloudream/scheduler/common/models/job" + mgrmq "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq/manager" +) + +const ( + //每个节点划分的资源等级: + // ResourceLevel1:表示所有资源类型均满足 大于等于1.5倍 + ResourceLevel1 = 1 + // ResourceLevel2:表示不满足Level1,但所有资源类型均满足 大于等于1倍 + ResourceLevel2 = 2 + // ResourceLevel3: 表示某些资源类型 小于一倍 + ResourceLevel3 = 3 + + CpuResourceWeight float64 = 1 + StgResourceWeight float64 = 1.2 + + CachingWeight float64 = 1 + LoadedWeight float64 = 2 +) + +var ErrNoAvailableScheme = fmt.Errorf("no appropriate scheduling node found, please wait") + +type candidate struct { + CC schmod.ComputingCenter + IsReferencedJobTarget bool // 这个节点是否是所依赖的任务所选择的节点 + Resource resourcesDetail + Files filesDetail +} + +type resourcesDetail struct { + CPU resourceDetail + GPU resourceDetail + NPU resourceDetail + MLU resourceDetail + Storage resourceDetail + Memory resourceDetail + + TotalScore float64 + AvgScore float64 + MaxLevel int +} +type resourceDetail struct { + Level int + Score float64 +} + +type filesDetail struct { + Dataset fileDetail + Code fileDetail + Image fileDetail + + TotalScore float64 +} +type fileDetail struct { + CachingScore float64 + LoadingScore float64 + IsLoaded bool //表示storage是否已经调度到该节点, image表示镜像是否已经加载到该算力中心 +} + +type schedulingJob struct { + Job schsdk.JobInfo + Afters []string +} + +type CandidateArr []*candidate + +func (a CandidateArr) Len() int { return len(a) } +func (a CandidateArr) Swap(i, j int) { a[i], a[j] = a[j], a[i] } +func (a CandidateArr) Less(i, j int) bool { + n1 := a[i] + n2 := a[j] + + // 优先与所依赖的任务放到一起,但要求那个节点的资源足够 + if n1.IsReferencedJobTarget && n1.Resource.MaxLevel < ResourceLevel3 { + return true + } + if n2.IsReferencedJobTarget && n2.Resource.MaxLevel < ResourceLevel3 { + return true + } + + // 优先判断资源等级,资源等级越低,代表越满足需求 + if n1.Resource.MaxLevel < n2.Resource.MaxLevel { + return true + } + if n1.Resource.MaxLevel > n2.Resource.MaxLevel { + return false + } + + // 等级相同时,根据单项分值比较 + switch n1.Resource.MaxLevel { + case ResourceLevel1: + // 数据文件总分越高,代表此节点上拥有的数据文件越完整,则越优先考虑 + return n1.Files.TotalScore > n2.Files.TotalScore + + case ResourceLevel2: + // 资源分的平均值越高,代表资源越空余,则越优先考虑 + return n1.Resource.AvgScore > n2.Resource.AvgScore + + case ResourceLevel3: + // 资源分的平均值越高,代表资源越空余,则越优先考虑 + return n1.Resource.AvgScore > n2.Resource.AvgScore + } + + return false +} + +type DefaultPreScheduler struct { +} + +func NewDefaultPreScheduler() *DefaultPreScheduler { + return &DefaultPreScheduler{} +} + +// ScheduleJobSet 任务集预调度 +func (s *DefaultPreScheduler) ScheduleJobSet(info *schsdk.JobSetInfo) (*jobmod.JobSetPreScheduleScheme, *schsdk.JobSetFilesUploadScheme, error) { + jobSetScheme := &jobmod.JobSetPreScheduleScheme{ + JobSchemes: make(map[string]jobmod.JobScheduleScheme), + } + filesUploadSchemes := make(map[string]schsdk.LocalFileUploadScheme) + + mgrCli, err := schglb.ManagerMQPool.Acquire() + if err != nil { + return nil, nil, fmt.Errorf("new collector client: %w", err) + } + defer schglb.ManagerMQPool.Release(mgrCli) + + // 查询有哪些算力中心可用 + + allCC, err := mgrCli.GetAllComputingCenter(mgrmq.NewGetAllComputingCenter()) + if err != nil { + return nil, nil, fmt.Errorf("getting all computing center info: %w", err) + } + + ccs := make(map[schsdk.CCID]schmod.ComputingCenter) + for _, node := range allCC.ComputingCenters { + ccs[node.CCID] = node + } + + if len(ccs) == 0 { + return nil, nil, ErrNoAvailableScheme + } + + // 先根据任务配置,收集它们依赖的任务的LocalID + var schJobs []*schedulingJob + for _, job := range info.Jobs { + j := &schedulingJob{ + Job: job, + } + + if norJob, ok := job.(*schsdk.NormalJobInfo); ok { + if resFile, ok := norJob.Files.Dataset.(*schsdk.DataReturnJobFileInfo); ok { + j.Afters = append(j.Afters, resFile.DataReturnLocalJobID) + } + + if resFile, ok := norJob.Files.Code.(*schsdk.DataReturnJobFileInfo); ok { + j.Afters = append(j.Afters, resFile.DataReturnLocalJobID) + } + } else if resJob, ok := job.(*schsdk.DataReturnJobInfo); ok { + j.Afters = append(j.Afters, resJob.TargetLocalJobID) + } + + schJobs = append(schJobs, j) + } + + // 然后根据依赖进行排序 + schJobs, ok := s.orderByAfters(schJobs) + if !ok { + return nil, nil, fmt.Errorf("circular reference detected between jobs in the job set") + } + + // 经过排序后,按顺序生成调度方案 + for _, job := range schJobs { + if norJob, ok := job.Job.(*schsdk.NormalJobInfo); ok { + scheme, err := s.scheduleForNormalOrMultiJob(info, job, ccs, jobSetScheme.JobSchemes) + if err != nil { + return nil, nil, err + } + + jobSetScheme.JobSchemes[job.Job.GetLocalJobID()] = *scheme + + // 检查数据文件的配置项,生成上传文件方案 + s.fillNormarlJobLocalUploadScheme(norJob.Files, scheme.TargetCCID, filesUploadSchemes, ccs) + } + + if mulJob, ok := job.Job.(*schsdk.MultiInstanceJobInfo); ok { + scheme, err := s.scheduleForNormalOrMultiJob(info, job, ccs, jobSetScheme.JobSchemes) + if err != nil { + return nil, nil, err + } + + jobSetScheme.JobSchemes[job.Job.GetLocalJobID()] = *scheme + + // 检查数据文件的配置项,生成上传文件方案 + s.fillNormarlJobLocalUploadScheme(mulJob.Files, scheme.TargetCCID, filesUploadSchemes, ccs) + } + + // 回源任务目前不需要生成调度方案 + } + + return jobSetScheme, &schsdk.JobSetFilesUploadScheme{ + LocalFileSchemes: lo.Values(filesUploadSchemes), + }, nil +} + +// ScheduleJob 单个任务预调度 +func (s *DefaultPreScheduler) ScheduleJob(instJobInfo *schsdk.InstanceJobInfo) (*jobmod.JobScheduleScheme, *schsdk.JobFilesUploadScheme, error) { + filesUploadSchemes := make(map[string]schsdk.LocalFileUploadScheme) + + mgrCli, err := schglb.ManagerMQPool.Acquire() + if err != nil { + return nil, nil, fmt.Errorf("new collector client: %w", err) + } + defer schglb.ManagerMQPool.Release(mgrCli) + + // 查询有哪些算力中心可用 + + allCC, err := mgrCli.GetAllComputingCenter(mgrmq.NewGetAllComputingCenter()) + if err != nil { + return nil, nil, fmt.Errorf("getting all computing center info: %w", err) + } + + ccs := make(map[schsdk.CCID]schmod.ComputingCenter) + for _, node := range allCC.ComputingCenters { + ccs[node.CCID] = node + } + + if len(ccs) == 0 { + return nil, nil, ErrNoAvailableScheme + } + + info := &schsdk.NormalJobInfo{ + Files: instJobInfo.Files, + Runtime: instJobInfo.Runtime, + Resources: instJobInfo.Resources, + } + + job := &schedulingJob{ + Job: info, + } + scheme, err := s.scheduleForSingleJob(job, ccs) + if err != nil { + return nil, nil, err + } + + // 检查数据文件的配置项,生成上传文件方案 + s.fillNormarlJobLocalUploadScheme(info.Files, scheme.TargetCCID, filesUploadSchemes, ccs) + + return scheme, &schsdk.JobFilesUploadScheme{ + LocalFileSchemes: lo.Values(filesUploadSchemes), + }, nil +} + +func (s *DefaultPreScheduler) orderByAfters(jobs []*schedulingJob) ([]*schedulingJob, bool) { + type jobOrder struct { + Job *schedulingJob + Afters []string + } + + var jobOrders []*jobOrder + for _, job := range jobs { + od := &jobOrder{ + Job: job, + Afters: make([]string, len(job.Afters)), + } + + copy(od.Afters, job.Afters) + + jobOrders = append(jobOrders, od) + } + + // 然后排序 + var orderedJob []*schedulingJob + for { + rm := 0 + for i, jo := range jobOrders { + // 找到没有依赖的任务,然后将其取出 + if len(jo.Afters) == 0 { + orderedJob = append(orderedJob, jo.Job) + + // 删除其他任务对它的引用 + for _, job2 := range jobOrders { + job2.Afters = lo.Reject(job2.Afters, func(item string, idx int) bool { return item == jo.Job.Job.GetLocalJobID() }) + } + + rm++ + continue + } + + jobOrders[i-rm] = jobOrders[i] + } + + jobOrders = jobOrders[:len(jobOrders)-rm] + if len(jobOrders) == 0 { + break + } + + // 遍历一轮后没有找到无依赖的任务,那么就是存在循环引用,排序失败 + if rm == 0 { + return nil, false + } + } + + return orderedJob, true +} + +func (s *DefaultPreScheduler) scheduleForNormalOrMultiJob(jobSet *schsdk.JobSetInfo, job *schedulingJob, ccs map[schsdk.CCID]schmod.ComputingCenter, jobSchemes map[string]jobmod.JobScheduleScheme) (*jobmod.JobScheduleScheme, error) { + allCCs := make(map[schsdk.CCID]*candidate) + + // 初始化备选节点信息 + for _, cc := range ccs { + caNode := &candidate{ + CC: cc, + } + + // 检查此节点是否是它所引用的任务所选的节点 + for _, af := range job.Afters { + resJob := findJobInfo[*schsdk.DataReturnJobInfo](jobSet.Jobs, af) + if resJob == nil { + return nil, fmt.Errorf("resource job %s not found in the job set", af) + } + + // 由于jobs已经按照引用排序,所以正常情况下这里肯定能取到值 + scheme, ok := jobSchemes[resJob.TargetLocalJobID] + if !ok { + continue + } + + if scheme.TargetCCID == cc.CCID { + caNode.IsReferencedJobTarget = true + break + } + } + + allCCs[cc.CCID] = caNode + } + + var jobFiles *schsdk.JobFilesInfo + var jobResource *schsdk.JobResourcesInfo + + switch runningJob := job.Job.(type) { + case *schsdk.NormalJobInfo: + jobFiles = &runningJob.Files + jobResource = &runningJob.Resources + case *schsdk.MultiInstanceJobInfo: + jobFiles = &runningJob.Files + jobResource = &runningJob.Resources + } + + // 计算文件占有量得分 + err := s.calcFileScore(*jobFiles, allCCs) + if err != nil { + return nil, err + } + + // 计算资源余量得分 + err = s.calcResourceScore(*jobResource, allCCs) + if err != nil { + return nil, err + } + + allCCsArr := lo.Values(allCCs) + sort.Sort(CandidateArr(allCCsArr)) + + targetNode := allCCsArr[0] + if targetNode.Resource.MaxLevel == ResourceLevel3 { + return nil, ErrNoAvailableScheme + } + + scheme := s.makeSchemeForNode(jobFiles, targetNode) + return &scheme, nil +} + +func (s *DefaultPreScheduler) scheduleForSingleJob(job *schedulingJob, ccs map[schsdk.CCID]schmod.ComputingCenter) (*jobmod.JobScheduleScheme, error) { + allCCs := make(map[schsdk.CCID]*candidate) + + // 初始化备选节点信息 + for _, cc := range ccs { + caNode := &candidate{ + CC: cc, + } + + allCCs[cc.CCID] = caNode + } + + var jobFiles *schsdk.JobFilesInfo + var jobResource *schsdk.JobResourcesInfo + + switch runningJob := job.Job.(type) { + case *schsdk.NormalJobInfo: + jobFiles = &runningJob.Files + jobResource = &runningJob.Resources + case *schsdk.MultiInstanceJobInfo: + jobFiles = &runningJob.Files + jobResource = &runningJob.Resources + } + + // 计算文件占有量得分 + err := s.calcFileScore(*jobFiles, allCCs) + if err != nil { + return nil, err + } + + // 计算资源余量得分 + err = s.calcResourceScore(*jobResource, allCCs) + if err != nil { + return nil, err + } + + allCCsArr := lo.Values(allCCs) + sort.Sort(CandidateArr(allCCsArr)) + + targetNode := allCCsArr[0] + if targetNode.Resource.MaxLevel == ResourceLevel3 { + return nil, ErrNoAvailableScheme + } + + scheme := s.makeSchemeForNode(jobFiles, targetNode) + return &scheme, nil +} + +func (s *DefaultPreScheduler) fillNormarlJobLocalUploadScheme(files schsdk.JobFilesInfo, targetCCID schsdk.CCID, schemes map[string]schsdk.LocalFileUploadScheme, ccs map[schsdk.CCID]schmod.ComputingCenter) { + if localFile, ok := files.Dataset.(*schsdk.LocalJobFileInfo); ok { + if _, ok := schemes[localFile.LocalPath]; !ok { + cdsNodeID := ccs[targetCCID].CDSNodeID + schemes[localFile.LocalPath] = schsdk.LocalFileUploadScheme{ + LocalPath: localFile.LocalPath, + UploadToCDSNodeID: &cdsNodeID, + } + } + } + + if localFile, ok := files.Code.(*schsdk.LocalJobFileInfo); ok { + if _, ok := schemes[localFile.LocalPath]; !ok { + cdsNodeID := ccs[targetCCID].CDSNodeID + schemes[localFile.LocalPath] = schsdk.LocalFileUploadScheme{ + LocalPath: localFile.LocalPath, + UploadToCDSNodeID: &cdsNodeID, + } + } + } + + if localFile, ok := files.Image.(*schsdk.LocalJobFileInfo); ok { + if _, ok := schemes[localFile.LocalPath]; !ok { + cdsNodeID := ccs[targetCCID].CDSNodeID + schemes[localFile.LocalPath] = schsdk.LocalFileUploadScheme{ + LocalPath: localFile.LocalPath, + UploadToCDSNodeID: &cdsNodeID, + } + } + } +} + +func (s *DefaultPreScheduler) makeSchemeForNode(jobFiles *schsdk.JobFilesInfo, targetCC *candidate) jobmod.JobScheduleScheme { + scheme := jobmod.JobScheduleScheme{ + TargetCCID: targetCC.CC.CCID, + } + + // TODO 根据实际情况选择Move或者Load + + if _, ok := jobFiles.Dataset.(*schsdk.PackageJobFileInfo); ok && !targetCC.Files.Dataset.IsLoaded { + scheme.Dataset.Action = jobmod.ActionLoad + } else { + scheme.Dataset.Action = jobmod.ActionNo + } + + if _, ok := jobFiles.Code.(*schsdk.PackageJobFileInfo); ok && !targetCC.Files.Code.IsLoaded { + scheme.Code.Action = jobmod.ActionLoad + } else { + scheme.Code.Action = jobmod.ActionNo + } + + if _, ok := jobFiles.Image.(*schsdk.PackageJobFileInfo); ok && !targetCC.Files.Image.IsLoaded { + scheme.Image.Action = jobmod.ActionImportImage + } else { + scheme.Image.Action = jobmod.ActionNo + } + + return scheme +} + +func findResuorce[T uopsdk.ResourceData](all []uopsdk.ResourceData) T { + for _, data := range all { + if ret, ok := data.(T); ok { + return ret + } + } + + var def T + return def +} + +func findJobInfo[T schsdk.JobInfo](jobs []schsdk.JobInfo, localJobID string) T { + for _, job := range jobs { + if ret, ok := job.(T); ok && job.GetLocalJobID() == localJobID { + return ret + } + } + + var def T + return def +} diff --git a/client/internal/prescheduler/default_prescheduler_test.go b/common/pkgs/prescheduler/default_prescheduler_test.go similarity index 100% rename from client/internal/prescheduler/default_prescheduler_test.go rename to common/pkgs/prescheduler/default_prescheduler_test.go diff --git a/common/pkgs/prescheduler/prescheduler.go b/common/pkgs/prescheduler/prescheduler.go new file mode 100644 index 0000000..7aa3f18 --- /dev/null +++ b/common/pkgs/prescheduler/prescheduler.go @@ -0,0 +1,11 @@ +package prescheduler + +import ( + schsdk "gitlink.org.cn/cloudream/common/sdks/scheduler" + jobmod "gitlink.org.cn/cloudream/scheduler/common/models/job" +) + +type PreScheduler interface { + ScheduleJobSet(info *schsdk.JobSetInfo) (*jobmod.JobSetPreScheduleScheme, *schsdk.JobSetFilesUploadScheme, error) + ScheduleJob(info *schsdk.InstanceJobInfo) (*jobmod.JobScheduleScheme, *schsdk.JobFilesUploadScheme, error) +} diff --git a/common/utils/utils.go b/common/utils/utils.go index b302c83..bc4264a 100644 --- a/common/utils/utils.go +++ b/common/utils/utils.go @@ -1,7 +1,10 @@ package utils import ( + "crypto/sha256" + "encoding/hex" "fmt" + "math/rand" "path/filepath" "strconv" "time" @@ -17,3 +20,15 @@ func MakeJobOutputFullPath(stgDir string, userID cdssdk.UserID, jobID schsdk.Job func MakeResourcePackageName(jobID schsdk.JobID) string { return fmt.Sprintf("%s@%s", string(jobID), time.Now().Format("2006-01-02 15:04:05")) } + +func GenerateRandomID() string { + currentTime := time.Now().UnixNano() / int64(time.Millisecond) + rand.Seed(currentTime) + randomNum := rand.Intn(1000) // 0 到 999 之间的随机整数 + idBase := fmt.Sprintf("%d%03d", currentTime, randomNum) + hasher := sha256.New() + hasher.Write([]byte(idBase)) + hashBytes := hasher.Sum(nil) + hashedID := hex.EncodeToString(hashBytes) + return hashedID +} diff --git a/manager/internal/jobmgr/event.go b/manager/internal/jobmgr/event.go deleted file mode 100644 index 1b9f6d2..0000000 --- a/manager/internal/jobmgr/event.go +++ /dev/null @@ -1,62 +0,0 @@ -package jobmgr - -import ( - "errors" - - schsdk "gitlink.org.cn/cloudream/common/sdks/scheduler" -) - -// 不是所关注的Task上报的进度 -var ErrUnconcernedTask = errors.New("unconcerned task") - -var ErrTaskTimeout = errors.New("task timeout") - -var ErrJobCancelled = errors.New("job cancelled") - -type Event interface{} - -type BroadcastType string - -const ( - BroadcastAll BroadcastType = "All" - BroadcastJobSet BroadcastType = "JobSet" - BroadcastJob BroadcastType = "Job" -) - -type Broadcast struct { - Type BroadcastType - JobSetID schsdk.JobSetID - JobID schsdk.JobID -} - -func (b *Broadcast) ToAll() bool { - return b.Type == BroadcastAll -} - -func (b *Broadcast) ToJobSet() bool { - return b.Type == BroadcastJobSet -} - -func (b *Broadcast) ToJob() bool { - return b.Type == BroadcastJob -} - -func ToAll() Broadcast { - return Broadcast{ - Type: BroadcastAll, - } -} - -func ToJobSet(jobSetID schsdk.JobSetID) Broadcast { - return Broadcast{ - Type: BroadcastJobSet, - JobSetID: jobSetID, - } -} - -func ToJob(jobID schsdk.JobID) Broadcast { - return Broadcast{ - Type: BroadcastJob, - JobID: jobID, - } -} diff --git a/manager/internal/jobmgr/event/cancel.go b/manager/internal/jobmgr/event/cancel.go index 1eede5b..7341a7b 100644 --- a/manager/internal/jobmgr/event/cancel.go +++ b/manager/internal/jobmgr/event/cancel.go @@ -2,3 +2,6 @@ package event type Cancel struct { } + +func (s *Cancel) Noop() { +} diff --git a/manager/internal/jobmgr/event/instance_create.go b/manager/internal/jobmgr/event/instance_create.go new file mode 100644 index 0000000..c7e05e3 --- /dev/null +++ b/manager/internal/jobmgr/event/instance_create.go @@ -0,0 +1,28 @@ +package event + +import ( + "gitlink.org.cn/cloudream/common/pkgs/future" + schsdk "gitlink.org.cn/cloudream/common/sdks/scheduler" +) + +type CreateInstanceFuture = *future.SetValueFuture[CreateInstanceResult] + +type InstanceCreate struct { + DataSet schsdk.JobFileInfo + Result CreateInstanceFuture +} + +type CreateInstanceResult struct { + JobID schsdk.JobID + FilesUploadScheme schsdk.JobFilesUploadScheme +} + +func NewInstanceCreate(dataSet schsdk.JobFileInfo, future CreateInstanceFuture) *InstanceCreate { + return &InstanceCreate{ + DataSet: dataSet, + Result: future, + } +} + +func (s *InstanceCreate) Noop() { +} diff --git a/manager/internal/jobmgr/event/job_completed.go b/manager/internal/jobmgr/event/job_completed.go index 2452134..e9231ad 100644 --- a/manager/internal/jobmgr/event/job_completed.go +++ b/manager/internal/jobmgr/event/job_completed.go @@ -16,3 +16,6 @@ func NewJobCompleted(job *jobmgr.Job, err error) *JobCompleted { Err: err, } } + +func (s *JobCompleted) Noop() { +} diff --git a/manager/internal/jobmgr/event/local_file_uploaded.go b/manager/internal/jobmgr/event/local_file_uploaded.go index 9b81ad3..6ecebb8 100644 --- a/manager/internal/jobmgr/event/local_file_uploaded.go +++ b/manager/internal/jobmgr/event/local_file_uploaded.go @@ -18,3 +18,6 @@ func NewLocalFileUploaded(localPath string, err error, packageID cdssdk.PackageI PackageID: packageID, } } + +func (s *LocalFileUploaded) Noop() { +} diff --git a/manager/internal/jobmgr/event/utils.go b/manager/internal/jobmgr/event/utils.go index c2c803d..496ef1f 100644 --- a/manager/internal/jobmgr/event/utils.go +++ b/manager/internal/jobmgr/event/utils.go @@ -6,22 +6,48 @@ import ( "gitlink.org.cn/cloudream/scheduler/manager/internal/jobmgr" ) +// WaitType 等待一个特定类型的事件。 +// 通过给定的上下文和事件集,这个函数会阻塞直到匹配指定类型的事件发生。 +// ctx: 用于控制等待过程的上下文,如果上下文被取消或到期,等待将被终止。 +// set: 指向一个事件集,这个事件集会被用来等待特定类型的事件。 +// 返回值 T: 等待到的事件,它会被强制转换为函数参数类型 T。 +// 返回值 bool: 表示等待操作是否成功。如果成功等到事件,返回 true;如果因为上下文被取消或到期而终止,返回 false。 func WaitType[T jobmgr.Event](ctx context.Context, set *jobmgr.EventSet) (T, bool) { + // 使用 set.Wait 方法等待一个满足给定条件的事件。 + // 条件函数检查事件是否能被转换为类型 T。 ret, ok := set.Wait(ctx, func(evt jobmgr.Event) bool { _, ok := evt.(T) return ok }) + if ret == nil { + var r T + return r, false // 如果事件为空,则返回false。 + } + // 因为 set.Wait 返回的事件类型是 jobmgr.Event,这里将它转换为 T 类型,并返回转换结果及操作成功标志。 return ret.(T), ok } +// WaitTypeAnd 等待一个特定类型的事件并检查该事件是否满足给定的条件。 +// ctx: 上下文,用于控制等待过程的取消或超时。 +// set: 事件集合,从中等待事件发生。 +// cond: 一个函数,用于检查等待的事件是否满足特定条件。 +// 返回值为满足条件的事件和一个布尔值,指示获取事件是否成功。 func WaitTypeAnd[T jobmgr.Event](ctx context.Context, set *jobmgr.EventSet, cond func(val T) bool) (T, bool) { + // 等待一个满足特定类型和条件的事件。 ret, ok := set.Wait(ctx, func(evt jobmgr.Event) bool { + // 尝试将事件断言为特定类型T,并检查断言是否成功。 e, ok := evt.(T) if !ok { - return false + return false // 如果事件不是期望的类型T,则返回false。 } + // 如果事件是类型T且满足给定条件,则返回true。 return cond(e) }) + if ret == nil { + var r T + return r, false // 如果事件为空,则返回false。 + } + // 断言返回的事件为类型T,并返回该事件和操作成功标志。 return ret.(T), ok } diff --git a/manager/internal/jobmgr/event_set.go b/manager/internal/jobmgr/event_set.go index efcce6f..346dcbe 100644 --- a/manager/internal/jobmgr/event_set.go +++ b/manager/internal/jobmgr/event_set.go @@ -2,6 +2,7 @@ package jobmgr import ( "context" + "errors" "sync" "gitlink.org.cn/cloudream/common/pkgs/future" @@ -10,6 +11,12 @@ import ( type EventWaitCondition func(evt Event) bool +var ErrJobCancelled = errors.New("job cancelled") + +type Event interface { + Noop() +} + type EventWaiter struct { condition EventWaitCondition future *future.SetValueFuture[Event] @@ -26,19 +33,20 @@ func NewEventSet() EventSet { } func (s *EventSet) Post(evt Event) { - s.lock.Lock() - defer s.lock.Unlock() + s.lock.Lock() // 加锁保护事件集合 + defer s.lock.Unlock() // 确保在函数结束时释放锁 - // 一个事件能唤醒多个等待者 - used := false + // 遍历等待者列表,查找匹配的等待者。如果找到,从列表中移除,并设置其future的值。 + used := false // 标记当前事件是否已被使用(即是否唤醒了某个等待者) for i, waiter := range s.waiters { - if waiter.condition(evt) { - s.waiters = lo2.RemoveAt(s.waiters, i) - waiter.future.SetValue(evt) - used = true + if waiter.condition(evt) { // 检查当前事件是否满足等待条件 + s.waiters = lo2.RemoveAt(s.waiters, i) // 从等待者列表中移除当前等待者 + waiter.future.SetValue(evt) // 设置等待者的future值为当前事件 + used = true // 标记事件已被使用 } } + // 如果没有匹配的等待者,则将事件添加到事件列表中。 if !used { s.events = append(s.events, evt) } @@ -46,12 +54,11 @@ func (s *EventSet) Post(evt Event) { func (s *EventSet) Wait(ctx context.Context, cond EventWaitCondition) (Event, bool) { s.lock.Lock() - defer s.lock.Unlock() - // 一个等待者只能等待一个事件 for i, evt := range s.events { if cond(evt) { s.events = lo2.RemoveAt(s.events, i) + s.lock.Unlock() return evt, true } } @@ -61,9 +68,12 @@ func (s *EventSet) Wait(ctx context.Context, cond EventWaitCondition) (Event, bo condition: cond, future: fut, } - s.events = append(s.events, waiter) + s.waiters = append(s.waiters, waiter) + + s.lock.Unlock() val, err := fut.WaitValue(ctx) + if err != nil { return nil, false } diff --git a/manager/internal/jobmgr/job/instance_job.go b/manager/internal/jobmgr/job/instance_job.go new file mode 100644 index 0000000..350bf5a --- /dev/null +++ b/manager/internal/jobmgr/job/instance_job.go @@ -0,0 +1,31 @@ +package job + +import ( + schsdk "gitlink.org.cn/cloudream/common/sdks/scheduler" + jobmod "gitlink.org.cn/cloudream/scheduler/common/models/job" +) + +type InstanceJob struct { + Info schsdk.InstanceJobInfo // 提交任务时提供的任务描述信息 + Files jobmod.JobFiles // 任务需要的文件 + TargetCCID schsdk.CCID // 将要运行此任务的算力中心ID + OutputFullPath string // 程序结果的完整输出路径 +} + +func NewInstanceJob(info schsdk.InstanceJobInfo, files jobmod.JobFiles) *InstanceJob { + return &InstanceJob{ + Info: info, + Files: files, + } +} + +func (j *InstanceJob) GetInfo() schsdk.JobInfo { + return &j.Info +} + +func (j *InstanceJob) Dump() jobmod.JobBodyDump { + return &jobmod.InstanceJobDump{ + Files: j.Files, + TargetCCID: j.TargetCCID, + } +} diff --git a/manager/internal/jobmgr/job/multiInstance_job.go b/manager/internal/jobmgr/job/multiInstance_job.go new file mode 100644 index 0000000..bf0827e --- /dev/null +++ b/manager/internal/jobmgr/job/multiInstance_job.go @@ -0,0 +1,32 @@ +package job + +import ( + schsdk "gitlink.org.cn/cloudream/common/sdks/scheduler" + jobmod "gitlink.org.cn/cloudream/scheduler/common/models/job" +) + +type MultiInstanceJob struct { + Info schsdk.MultiInstanceJobInfo + Files jobmod.JobFiles + TargetCCID schsdk.CCID + SubJobs []schsdk.JobID + PreScheduler jobmod.JobScheduleScheme +} + +func NewMultiInstanceJob(info schsdk.MultiInstanceJobInfo, preScheduler jobmod.JobScheduleScheme) *MultiInstanceJob { + return &MultiInstanceJob{ + Info: info, + PreScheduler: preScheduler, + } +} + +func (j *MultiInstanceJob) GetInfo() schsdk.JobInfo { + return &j.Info +} + +func (j *MultiInstanceJob) Dump() jobmod.JobBodyDump { + return &jobmod.MultiInstanceJobDump{ + Files: j.Files, + TargetCCID: j.TargetCCID, + } +} diff --git a/manager/internal/jobmgr/job/state/adjusting.go b/manager/internal/jobmgr/job/state/adjusting.go index 7095cb5..fd0fd0f 100644 --- a/manager/internal/jobmgr/job/state/adjusting.go +++ b/manager/internal/jobmgr/job/state/adjusting.go @@ -46,14 +46,14 @@ func (s *Adjusting) Dump(rtx jobmgr.JobStateRunContext, jo *jobmgr.Job) jobmod.J } func (s *Adjusting) do(rtx jobmgr.JobStateRunContext, jo *jobmgr.Job) error { - norJob := jo.Body.(*job.NormalJob) + userID := cdssdk.UserID(1) ctx, cancel := context.WithCancel(context.Background()) defer cancel() // 监听取消事件 go func() { - event.WaitType[event.Cancel](ctx, rtx.EventSet) + event.WaitType[*event.Cancel](ctx, rtx.EventSet) cancel() }() @@ -71,13 +71,34 @@ func (s *Adjusting) do(rtx jobmgr.JobStateRunContext, jo *jobmgr.Job) error { // 已经确定最终执行的目标计算中心,则可以生成结果输出路径了 stgInfo, err := stgCli.StorageGetInfo(cdssdk.StorageGetInfoReq{ + UserID: userID, StorageID: ccInfo.CDSStorageID, }) if err != nil { return fmt.Errorf("getting cds storage info: %w", err) } // TODO UserID - norJob.OutputFullPath = utils.MakeJobOutputFullPath(stgInfo.Directory, 1, jo.JobID) + outputFullPath := utils.MakeJobOutputFullPath(stgInfo.Directory, 1, jo.JobID) + + var jobFilesInfo schsdk.JobFilesInfo + var jobFiles *jobmod.JobFiles + + switch runningJob := jo.Body.(type) { + case *job.NormalJob: + jobFilesInfo = runningJob.Info.Files + jobFiles = &runningJob.Files + runningJob.TargetCCID = s.scheme.TargetCCID + runningJob.OutputFullPath = outputFullPath + case *job.MultiInstanceJob: + jobFilesInfo = runningJob.Info.Files + jobFiles = &runningJob.Files + runningJob.TargetCCID = s.scheme.TargetCCID + case *job.InstanceJob: + jobFilesInfo = runningJob.Info.Files + jobFiles = &runningJob.Files + runningJob.TargetCCID = s.scheme.TargetCCID + runningJob.OutputFullPath = outputFullPath + } wg := sync.WaitGroup{} wg.Add(3) @@ -86,7 +107,7 @@ func (s *Adjusting) do(rtx jobmgr.JobStateRunContext, jo *jobmgr.Job) error { go func() { defer wg.Done() - e1 = s.doPackageScheduling(ctx, rtx, norJob, norJob.Info.Files.Dataset, &norJob.Files.Dataset, &s.scheme.Dataset) + e1 = s.doPackageScheduling(ctx, rtx, jobFilesInfo.Dataset, &jobFiles.Dataset, &s.scheme.Dataset) if e1 != nil { cancel() } @@ -94,7 +115,7 @@ func (s *Adjusting) do(rtx jobmgr.JobStateRunContext, jo *jobmgr.Job) error { go func() { defer wg.Done() - e2 = s.doPackageScheduling(ctx, rtx, norJob, norJob.Info.Files.Code, &norJob.Files.Code, &s.scheme.Code) + e2 = s.doPackageScheduling(ctx, rtx, jobFilesInfo.Code, &jobFiles.Code, &s.scheme.Code) if e2 != nil { cancel() } @@ -102,19 +123,21 @@ func (s *Adjusting) do(rtx jobmgr.JobStateRunContext, jo *jobmgr.Job) error { go func() { defer wg.Done() - e3 = s.doImageScheduling(ctx, rtx, norJob, norJob.Info.Files.Image, &norJob.Files.Image, &s.scheme.Image) + e3 = s.doImageScheduling(ctx, rtx, s.scheme.TargetCCID, jobFilesInfo.Image, &jobFiles.Image, &s.scheme.Image) if e3 != nil { cancel() } }() + wg.Wait() + return errors.Join(e1, e2, e3) } -func (s *Adjusting) doPackageScheduling(ctx context.Context, rtx jobmgr.JobStateRunContext, job *job.NormalJob, fileInfo schsdk.JobFileInfo, file *jobmod.PackageJobFile, scheme *jobmod.FileScheduleScheme) error { +func (s *Adjusting) doPackageScheduling(ctx context.Context, rtx jobmgr.JobStateRunContext, fileInfo schsdk.JobFileInfo, file *jobmod.PackageJobFile, scheme *jobmod.FileScheduleScheme) error { switch info := fileInfo.(type) { case *schsdk.LocalJobFileInfo: - evt, ok := event.WaitTypeAnd[event.LocalFileUploaded](ctx, rtx.EventSet, func(e event.LocalFileUploaded) bool { + evt, ok := event.WaitTypeAnd[*event.LocalFileUploaded](ctx, rtx.EventSet, func(e *event.LocalFileUploaded) bool { return e.LocalPath == info.LocalPath }) if !ok { @@ -129,7 +152,7 @@ func (s *Adjusting) doPackageScheduling(ctx context.Context, rtx jobmgr.JobState case *schsdk.PackageJobFileInfo: file.PackageID = info.PackageID - case *schsdk.ResourceJobFileInfo: + case *schsdk.DataReturnJobFileInfo: return nil default: @@ -162,7 +185,7 @@ func (s *Adjusting) doPackageScheduling(ctx context.Context, rtx jobmgr.JobState return fmt.Errorf("moving package: %w", err) } - moveStatus := status.(*exectsk.CacheMovePackageStatus) + moveStatus := status.(*exectsk.StorageLoadPackageStatus) if moveStatus.Error != "" { return fmt.Errorf("moving package: %s", moveStatus.Error) } @@ -173,10 +196,10 @@ func (s *Adjusting) doPackageScheduling(ctx context.Context, rtx jobmgr.JobState return nil } -func (s *Adjusting) doImageScheduling(ctx context.Context, rtx jobmgr.JobStateRunContext, job *job.NormalJob, fileInfo schsdk.JobFileInfo, file *jobmod.ImageJobFile, scheme *jobmod.FileScheduleScheme) error { +func (s *Adjusting) doImageScheduling(ctx context.Context, rtx jobmgr.JobStateRunContext, targetCCID schsdk.CCID, fileInfo schsdk.JobFileInfo, file *jobmod.ImageJobFile, scheme *jobmod.FileScheduleScheme) error { switch info := fileInfo.(type) { case *schsdk.LocalJobFileInfo: - evt, ok := event.WaitTypeAnd[event.LocalFileUploaded](ctx, rtx.EventSet, func(e event.LocalFileUploaded) bool { + evt, ok := event.WaitTypeAnd[*event.LocalFileUploaded](ctx, rtx.EventSet, func(e *event.LocalFileUploaded) bool { return e.LocalPath == info.LocalPath }) if !ok { @@ -260,7 +283,7 @@ func (s *Adjusting) doImageScheduling(ctx context.Context, rtx jobmgr.JobStateRu } // TODO 镜像名称 - err = rtx.Mgr.DB.PCMImage().Create(rtx.Mgr.DB.SQLCtx(), file.ImageID, job.TargetCCID, uploadStatus.PCMImageID, uploadStatus.Name, time.Now()) + err = rtx.Mgr.DB.PCMImage().Create(rtx.Mgr.DB.SQLCtx(), file.ImageID, targetCCID, uploadStatus.PCMImageID, uploadStatus.Name, time.Now()) if err != nil { return fmt.Errorf("creating image info: %w", err) } diff --git a/manager/internal/jobmgr/job/state/executing.go b/manager/internal/jobmgr/job/state/executing.go index a8cc7f4..218dca4 100644 --- a/manager/internal/jobmgr/job/state/executing.go +++ b/manager/internal/jobmgr/job/state/executing.go @@ -3,6 +3,7 @@ package state import ( "context" "fmt" + schsdk "gitlink.org.cn/cloudream/common/sdks/scheduler" "gitlink.org.cn/cloudream/common/pkgs/logger" pcmsdk "gitlink.org.cn/cloudream/common/sdks/pcm" @@ -40,30 +41,45 @@ func (s *NormalJobExecuting) Dump(rtx jobmgr.JobStateRunContext, jo *jobmgr.Job) } func (s *NormalJobExecuting) do(rtx jobmgr.JobStateRunContext, jo *jobmgr.Job) error { - norJob := jo.Body.(*job.NormalJob) + //norJob := jo.Body.(*job.NormalJob) + + var runtime *schsdk.JobRuntimeInfo + var jobFiles *jobmod.JobFiles + var targetCCID schsdk.CCID + + switch runningJob := jo.Body.(type) { + case *job.NormalJob: + runtime = &runningJob.Info.Runtime + jobFiles = &runningJob.Files + targetCCID = runningJob.TargetCCID + case *job.InstanceJob: + runtime = &runningJob.Info.Runtime + jobFiles = &runningJob.Files + targetCCID = runningJob.TargetCCID + } log := logger.WithType[NormalJobExecuting]("State").WithField("JobID", jo.JobID) ctx, cancel := context.WithCancel(context.Background()) defer cancel() - pcmImgInfo, err := rtx.Mgr.DB.PCMImage().GetByImageIDAndCCID(rtx.Mgr.DB.SQLCtx(), norJob.Files.Image.ImageID, norJob.TargetCCID) + pcmImgInfo, err := rtx.Mgr.DB.PCMImage().GetByImageIDAndCCID(rtx.Mgr.DB.SQLCtx(), jobFiles.Image.ImageID, targetCCID) if err != nil { return fmt.Errorf("getting pcm image info: %w", err) } - ccInfo, err := rtx.Mgr.DB.ComputingCenter().GetByID(rtx.Mgr.DB.SQLCtx(), norJob.TargetCCID) + ccInfo, err := rtx.Mgr.DB.ComputingCenter().GetByID(rtx.Mgr.DB.SQLCtx(), targetCCID) if err != nil { return fmt.Errorf("getting computing center info: %w", err) } // TODO 需要添加DATA_IN、DATA_OUT等环境变量,这些数据从Job的信息中来获取 - ress, err := rtx.Mgr.DB.CCResource().GetByCCID(rtx.Mgr.DB.SQLCtx(), norJob.TargetCCID) + ress, err := rtx.Mgr.DB.CCResource().GetByCCID(rtx.Mgr.DB.SQLCtx(), targetCCID) if err != nil { return fmt.Errorf("getting computing center resource: %w", err) } if len(ress) == 0 { - return fmt.Errorf("no resource found at computing center %v", norJob.TargetCCID) + return fmt.Errorf("no resource found at computing center %v", targetCCID) } wt := rtx.Mgr.ExecMgr.StartTask(exetsk.NewSubmitTask( @@ -71,8 +87,8 @@ func (s *NormalJobExecuting) do(rtx jobmgr.JobStateRunContext, jo *jobmgr.Job) e pcmImgInfo.PCMImageID, // TODO 选择资源的算法 ress[0].PCMResourceID, - norJob.Info.Runtime.Command, - norJob.Info.Runtime.Envs, + runtime.Command, + runtime.Envs, )) defer wt.Close() @@ -129,7 +145,7 @@ func (s *DataReturnJobExecuting) do(rtx jobmgr.JobStateRunContext, jo *jobmgr.Jo // 监听取消事件 go func() { - event.WaitType[event.Cancel](ctx, rtx.EventSet) + event.WaitType[*event.Cancel](ctx, rtx.EventSet) cancel() }() @@ -138,6 +154,8 @@ func (s *DataReturnJobExecuting) do(rtx jobmgr.JobStateRunContext, jo *jobmgr.Jo return fmt.Errorf("getting computing center info: %w", err) } + logger.Infof("submited computer center name: %s, id: %s", ccInfo.Name, ccInfo.CCID) + wt := rtx.Mgr.ExecMgr.StartTask(exetsk.NewStorageCreatePackage( 1, // TOOD 用户ID ccInfo.CDSStorageID, diff --git a/manager/internal/jobmgr/job/state/making_adjust_scheme.go b/manager/internal/jobmgr/job/state/making_adjust_scheme.go index 19ffada..c857feb 100644 --- a/manager/internal/jobmgr/job/state/making_adjust_scheme.go +++ b/manager/internal/jobmgr/job/state/making_adjust_scheme.go @@ -32,7 +32,7 @@ func (s *MakingAdjustScheme) do(rtx jobmgr.JobStateRunContext, jo *jobmgr.Job) ( // 监听取消事件 go func() { - event.WaitType[event.Cancel](ctx, rtx.EventSet) + event.WaitType[*event.Cancel](ctx, rtx.EventSet) cancel() }() diff --git a/manager/internal/jobmgr/job/state/multiInstance_init.go b/manager/internal/jobmgr/job/state/multiInstance_init.go new file mode 100644 index 0000000..3323665 --- /dev/null +++ b/manager/internal/jobmgr/job/state/multiInstance_init.go @@ -0,0 +1,68 @@ +package state + +import ( + "context" + "fmt" + "gitlink.org.cn/cloudream/common/pkgs/logger" + schsdk "gitlink.org.cn/cloudream/common/sdks/scheduler" + jobmod "gitlink.org.cn/cloudream/scheduler/common/models/job" + "gitlink.org.cn/cloudream/scheduler/common/pkgs/prescheduler" + "gitlink.org.cn/cloudream/scheduler/common/utils" + "gitlink.org.cn/cloudream/scheduler/manager/internal/jobmgr" + "gitlink.org.cn/cloudream/scheduler/manager/internal/jobmgr/event" + "gitlink.org.cn/cloudream/scheduler/manager/internal/jobmgr/job" +) + +type MultiInstanceInit struct { +} + +func NewMultiInstanceInit() *MultiInstanceInit { + return &MultiInstanceInit{} +} + +func (s *MultiInstanceInit) Run(rtx jobmgr.JobStateRunContext, job *jobmgr.Job) { + s.do(rtx, job) +} + +func (s *MultiInstanceInit) do(rtx jobmgr.JobStateRunContext, jo *jobmgr.Job) { + multInstJob := jo.Body.(*job.MultiInstanceJob) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + go func() { + event.WaitType[*event.Cancel](ctx, rtx.EventSet) + cancel() + }() + + newLocalJobID := fmt.Sprintf("%s_%s", multInstJob.Info.LocalJobID, utils.GenerateRandomID()) + + instJobInfo := &schsdk.InstanceJobInfo{ + Type: schsdk.JobTypeInstance, + LocalJobID: newLocalJobID, + Files: multInstJob.Info.Files, + Runtime: multInstJob.Info.Runtime, + Resources: multInstJob.Info.Resources, + } + + files := jobmod.JobFiles{ + Dataset: multInstJob.Files.Dataset, + Code: multInstJob.Files.Code, + Image: multInstJob.Files.Image, + } + + // 创建实例并运行 + instanceJob := job.NewInstanceJob(*instJobInfo, files) + jobID := rtx.Mgr.AddJob(jo.JobSetID, instanceJob, NewPreSchuduling(multInstJob.PreScheduler)) + logger.Info("Init instance success, jobID: " + jobID) + + // 在多实例任务中新增这个实例的任务ID + multInstJob.SubJobs = append(multInstJob.SubJobs, jobID) + + rtx.Mgr.ChangeState(jo, NewMultiInstanceRunning(prescheduler.NewDefaultPreScheduler())) + logger.Info("Create multiInstance job success, jobID: " + jo.JobID) +} + +func (s *MultiInstanceInit) Dump(ctx jobmgr.JobStateRunContext, job *jobmgr.Job) jobmod.JobStateDump { + return &jobmod.MultiInstCreateInitDump{} +} diff --git a/manager/internal/jobmgr/job/state/multiInstance_running.go b/manager/internal/jobmgr/job/state/multiInstance_running.go new file mode 100644 index 0000000..d21d635 --- /dev/null +++ b/manager/internal/jobmgr/job/state/multiInstance_running.go @@ -0,0 +1,100 @@ +package state + +import ( + "context" + "fmt" + "gitlink.org.cn/cloudream/common/pkgs/logger" + schsdk "gitlink.org.cn/cloudream/common/sdks/scheduler" + jobmod "gitlink.org.cn/cloudream/scheduler/common/models/job" + "gitlink.org.cn/cloudream/scheduler/common/pkgs/prescheduler" + "gitlink.org.cn/cloudream/scheduler/common/utils" + "gitlink.org.cn/cloudream/scheduler/manager/internal/jobmgr" + "gitlink.org.cn/cloudream/scheduler/manager/internal/jobmgr/event" + "gitlink.org.cn/cloudream/scheduler/manager/internal/jobmgr/job" +) + +type MultiInstanceRunning struct { + preScheduler prescheduler.PreScheduler +} + +func NewMultiInstanceRunning(preScheduler prescheduler.PreScheduler) *MultiInstanceRunning { + return &MultiInstanceRunning{ + preScheduler: preScheduler, + } +} + +func (s *MultiInstanceRunning) Run(rtx jobmgr.JobStateRunContext, job *jobmgr.Job) { + s.do(rtx, job) +} + +func (s *MultiInstanceRunning) do(rtx jobmgr.JobStateRunContext, jo *jobmgr.Job) { + logger.Info("start run multiInstanceRunning, jobID: " + jo.JobID) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + go func() { + event.WaitType[*event.Cancel](ctx, rtx.EventSet) + cancel() + }() + + multInstJob := jo.Body.(*job.MultiInstanceJob) + + for { + // 监听创建实例事件 + ic, ok := event.WaitType[*event.InstanceCreate](ctx, rtx.EventSet) + if !ok { + logger.Info("MultiInstanceRunning canceled") + break + } + logger.Info("wait a event happened") + + // 构建InstanceJobInfo + infoFiles := schsdk.JobFilesInfo{ + Dataset: ic.DataSet, + Code: multInstJob.Info.Files.Code, + Image: multInstJob.Info.Files.Image, + } + + newLocalJobID := fmt.Sprintf("%s_%s", multInstJob.Info.LocalJobID, utils.GenerateRandomID()) + + instJobInfo := &schsdk.InstanceJobInfo{ + Type: schsdk.JobTypeInstance, + LocalJobID: newLocalJobID, + Files: infoFiles, + Runtime: multInstJob.Info.Runtime, + Resources: multInstJob.Info.Resources, + } + + files := jobmod.JobFiles{ + Code: multInstJob.Files.Code, + Image: multInstJob.Files.Image, + } + + // 生成预调度方案和文件上传方案 + jobSchedule, filesUploadScheme, err := s.preScheduler.ScheduleJob(instJobInfo) + if err != nil { + ic.Result.SetError(err) + continue + } + + // 创建实例并运行 + instanceJob := job.NewInstanceJob(*instJobInfo, files) + jobID := rtx.Mgr.AddJob(jo.JobSetID, instanceJob, NewPreSchuduling(*jobSchedule)) + logger.Info("Create instance success, jobID: " + jobID) + + // 在多实例任务中新增这个实例的任务ID + multInstJob.SubJobs = append(multInstJob.SubJobs, jobID) + + // 将实例ID和文件上传方案返回 + ic.Result.SetValue(event.CreateInstanceResult{ + JobID: jobID, + FilesUploadScheme: *filesUploadScheme, + }) + + } +} + +func (s *MultiInstanceRunning) Dump(ctx jobmgr.JobStateRunContext, job *jobmgr.Job) jobmod.JobStateDump { + return &jobmod.MultiInstCreateRunningDump{} +} diff --git a/manager/internal/jobmgr/job/state/prescheduling.go b/manager/internal/jobmgr/job/state/prescheduling.go index b20664c..6b24a09 100644 --- a/manager/internal/jobmgr/job/state/prescheduling.go +++ b/manager/internal/jobmgr/job/state/prescheduling.go @@ -7,6 +7,8 @@ import ( "sync" "time" + "gitlink.org.cn/cloudream/common/pkgs/logger" + schsdk "gitlink.org.cn/cloudream/common/sdks/scheduler" cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" schglb "gitlink.org.cn/cloudream/scheduler/common/globals" @@ -30,14 +32,32 @@ func NewPreSchuduling(scheme jobmod.JobScheduleScheme) *PreScheduling { } func (s *PreScheduling) Run(rtx jobmgr.JobStateRunContext, jo *jobmgr.Job) { - norJob := jo.Body.(*job.NormalJob) + logger.Info("start run preScheduling, jobID: " + jo.JobID) + + var jobFilesInfo schsdk.JobFilesInfo + var jobFiles *jobmod.JobFiles + + switch runningJob := jo.Body.(type) { + case *job.NormalJob: + jobFilesInfo = runningJob.Info.Files + jobFiles = &runningJob.Files + runningJob.TargetCCID = s.scheme.TargetCCID + case *job.MultiInstanceJob: + jobFilesInfo = runningJob.Info.Files + jobFiles = &runningJob.Files + runningJob.TargetCCID = s.scheme.TargetCCID + case *job.InstanceJob: + jobFilesInfo = runningJob.Info.Files + jobFiles = &runningJob.Files + runningJob.TargetCCID = s.scheme.TargetCCID + } ctx, cancel := context.WithCancel(context.Background()) defer cancel() // 监听取消事件 go func() { - event.WaitType[event.Cancel](ctx, rtx.EventSet) + event.WaitType[*event.Cancel](ctx, rtx.EventSet) cancel() }() @@ -55,7 +75,7 @@ func (s *PreScheduling) Run(rtx jobmgr.JobStateRunContext, jo *jobmgr.Job) { go func() { defer wg.Done() - e1 = s.doPackageScheduling(ctx, rtx, norJob, norJob.Info.Files.Dataset, &norJob.Files.Dataset, &s.scheme.Dataset) + e1 = s.doPackageScheduling(ctx, rtx, jobFilesInfo.Dataset, &jobFiles.Dataset, &s.scheme.Dataset) if e1 != nil { cancel() } @@ -63,7 +83,7 @@ func (s *PreScheduling) Run(rtx jobmgr.JobStateRunContext, jo *jobmgr.Job) { go func() { defer wg.Done() - e2 = s.doPackageScheduling(ctx, rtx, norJob, norJob.Info.Files.Code, &norJob.Files.Code, &s.scheme.Code) + e2 = s.doPackageScheduling(ctx, rtx, jobFilesInfo.Code, &jobFiles.Code, &s.scheme.Code) if e2 != nil { cancel() } @@ -71,15 +91,17 @@ func (s *PreScheduling) Run(rtx jobmgr.JobStateRunContext, jo *jobmgr.Job) { go func() { defer wg.Done() - e3 = s.doImageScheduling(ctx, rtx, norJob, norJob.Info.Files.Image, &norJob.Files.Image, &s.scheme.Image) + e3 = s.doImageScheduling(ctx, rtx, s.scheme.TargetCCID, jobFilesInfo.Image, &jobFiles.Image, &s.scheme.Image) if e3 != nil { cancel() } }() + wg.Wait() + allErr := errors.Join(e1, e2, e3) if allErr != nil { - rtx.Mgr.ChangeState(jo, FailureComplete(err)) + rtx.Mgr.ChangeState(jo, FailureComplete(allErr)) } else { rtx.Mgr.ChangeState(jo, NewReadyToAdjust()) } @@ -91,10 +113,10 @@ func (s *PreScheduling) Dump(rtx jobmgr.JobStateRunContext, jo *jobmgr.Job) jobm } } -func (s *PreScheduling) doPackageScheduling(ctx context.Context, rtx jobmgr.JobStateRunContext, norJob *job.NormalJob, fileInfo schsdk.JobFileInfo, file *jobmod.PackageJobFile, scheme *jobmod.FileScheduleScheme) error { +func (s *PreScheduling) doPackageScheduling(ctx context.Context, rtx jobmgr.JobStateRunContext, fileInfo schsdk.JobFileInfo, file *jobmod.PackageJobFile, scheme *jobmod.FileScheduleScheme) error { switch info := fileInfo.(type) { case *schsdk.LocalJobFileInfo: - evt, ok := event.WaitTypeAnd[event.LocalFileUploaded](ctx, rtx.EventSet, func(e event.LocalFileUploaded) bool { + evt, ok := event.WaitTypeAnd[*event.LocalFileUploaded](ctx, rtx.EventSet, func(e *event.LocalFileUploaded) bool { return e.LocalPath == info.LocalPath }) if !ok { @@ -109,7 +131,7 @@ func (s *PreScheduling) doPackageScheduling(ctx context.Context, rtx jobmgr.JobS case *schsdk.PackageJobFileInfo: file.PackageID = info.PackageID - case *schsdk.ResourceJobFileInfo: + case *schsdk.DataReturnJobFileInfo: return nil default: @@ -142,7 +164,7 @@ func (s *PreScheduling) doPackageScheduling(ctx context.Context, rtx jobmgr.JobS return fmt.Errorf("moving package: %w", err) } - moveStatus := status.(*exectsk.CacheMovePackageStatus) + moveStatus := status.(*exectsk.StorageLoadPackageStatus) if moveStatus.Error != "" { return fmt.Errorf("moving package: %s", moveStatus.Error) } @@ -153,10 +175,10 @@ func (s *PreScheduling) doPackageScheduling(ctx context.Context, rtx jobmgr.JobS return nil } -func (s *PreScheduling) doImageScheduling(ctx context.Context, rtx jobmgr.JobStateRunContext, norJob *job.NormalJob, fileInfo schsdk.JobFileInfo, file *jobmod.ImageJobFile, scheme *jobmod.FileScheduleScheme) error { +func (s *PreScheduling) doImageScheduling(ctx context.Context, rtx jobmgr.JobStateRunContext, targetCCID schsdk.CCID, fileInfo schsdk.JobFileInfo, file *jobmod.ImageJobFile, scheme *jobmod.FileScheduleScheme) error { switch info := fileInfo.(type) { case *schsdk.LocalJobFileInfo: - evt, ok := event.WaitTypeAnd[event.LocalFileUploaded](ctx, rtx.EventSet, func(e event.LocalFileUploaded) bool { + evt, ok := event.WaitTypeAnd[*event.LocalFileUploaded](ctx, rtx.EventSet, func(e *event.LocalFileUploaded) bool { return e.LocalPath == info.LocalPath }) if !ok { @@ -240,7 +262,7 @@ func (s *PreScheduling) doImageScheduling(ctx context.Context, rtx jobmgr.JobSta } // TODO 镜像名称 - err = rtx.Mgr.DB.PCMImage().Create(rtx.Mgr.DB.SQLCtx(), file.ImageID, norJob.TargetCCID, uploadStatus.PCMImageID, uploadStatus.Name, time.Now()) + err = rtx.Mgr.DB.PCMImage().Create(rtx.Mgr.DB.SQLCtx(), file.ImageID, targetCCID, uploadStatus.PCMImageID, uploadStatus.Name, time.Now()) if err != nil { return fmt.Errorf("creating image info: %w", err) } diff --git a/manager/internal/jobmgr/job/state/ready_to_adjust.go b/manager/internal/jobmgr/job/state/ready_to_adjust.go index d649653..25b6dc9 100644 --- a/manager/internal/jobmgr/job/state/ready_to_adjust.go +++ b/manager/internal/jobmgr/job/state/ready_to_adjust.go @@ -28,20 +28,30 @@ func (s *ReadyToAdjust) Run(rtx jobmgr.JobStateRunContext, jo *jobmgr.Job) { } func (s *ReadyToAdjust) do(rtx jobmgr.JobStateRunContext, jo *jobmgr.Job) error { - norJob := jo.Body.(*job.NormalJob) + var jobFilesInfo schsdk.JobFilesInfo + var jobFiles *jobmod.JobFiles + + switch runningJob := jo.Body.(type) { + case *job.NormalJob: + jobFilesInfo = runningJob.Info.Files + jobFiles = &runningJob.Files + case *job.InstanceJob: + jobFilesInfo = runningJob.Info.Files + jobFiles = &runningJob.Files + } ctx, cancel := context.WithCancel(context.Background()) defer cancel() // 监听取消事件 go func() { - event.WaitType[event.Cancel](ctx, rtx.EventSet) + event.WaitType[*event.Cancel](ctx, rtx.EventSet) cancel() }() - if rt, ok := norJob.Info.Files.Dataset.(*schsdk.ResourceJobFileInfo); ok { - evt, ok := event.WaitTypeAnd[event.JobCompleted](ctx, rtx.EventSet, func(val event.JobCompleted) bool { - return val.Job.GetInfo().GetLocalJobID() == rt.ResourceLocalJobID + if rt, ok := jobFilesInfo.Dataset.(*schsdk.DataReturnJobFileInfo); ok { + evt, ok := event.WaitTypeAnd[*event.JobCompleted](ctx, rtx.EventSet, func(val *event.JobCompleted) bool { + return val.Job.GetInfo().GetLocalJobID() == rt.DataReturnLocalJobID }) if !ok { return jobmgr.ErrJobCancelled @@ -54,7 +64,7 @@ func (s *ReadyToAdjust) do(rtx jobmgr.JobStateRunContext, jo *jobmgr.Job) error return fmt.Errorf("job %s is not a DataReturn job(which is %T)", evt.Job.JobID, evt.Job) } - norJob.Files.Dataset.PackageID = rtJob.DataReturnPackageID + jobFiles.Dataset.PackageID = rtJob.DataReturnPackageID } return nil diff --git a/manager/internal/jobmgr/job/state/wait_target_complete.go b/manager/internal/jobmgr/job/state/wait_target_complete.go index c0f4d5c..aeb826e 100644 --- a/manager/internal/jobmgr/job/state/wait_target_complete.go +++ b/manager/internal/jobmgr/job/state/wait_target_complete.go @@ -34,11 +34,11 @@ func (s *WaitTargetComplete) do(rtx jobmgr.JobStateRunContext, jo *jobmgr.Job) e // 监听取消事件 go func() { - event.WaitType[event.Cancel](ctx, rtx.EventSet) + event.WaitType[*event.Cancel](ctx, rtx.EventSet) cancel() }() - evt, ok := event.WaitTypeAnd[event.JobCompleted](ctx, rtx.EventSet, func(val event.JobCompleted) bool { + evt, ok := event.WaitTypeAnd[*event.JobCompleted](ctx, rtx.EventSet, func(val *event.JobCompleted) bool { return val.Job.GetInfo().GetLocalJobID() == reJob.Info.TargetLocalJobID }) if !ok { diff --git a/manager/internal/jobmgr/jobmgr.go b/manager/internal/jobmgr/jobmgr.go index 34059d6..befb904 100644 --- a/manager/internal/jobmgr/jobmgr.go +++ b/manager/internal/jobmgr/jobmgr.go @@ -134,6 +134,7 @@ func (m *Manager) BroadcastEvent(jobSetID schsdk.JobSetID, evt Event) { jobSet, ok := m.jobSets[jobSetID] if !ok { + // 如果作业集不存在,则直接返回 return } @@ -171,10 +172,18 @@ func (m *Manager) SubmitJobSet(jobs []SubmittingJob) schsdk.JobSetID { Body: subJob.Body, }, eventSet: NewEventSet(), + state: subJob.InitState, } jobSet.jobs[jobID] = job + m.jobs[jobID] = job - m.ChangeState(&job.job, subJob.InitState) + go func() { + subJob.InitState.Run(JobStateRunContext{ + Mgr: m, + EventSet: &job.eventSet, + LastState: nil, + }, &job.job) + }() } m.jobIDIndex += len(jobs) @@ -202,3 +211,41 @@ func (m *Manager) DumpJobSet(jobSetID schsdk.JobSetID) []jobmod.JobDump { return jobDumps } + +type PreSchedulerInstJob struct { + Body JobBody + InitState JobState +} + +// AddJob 添加一个作业到指定的作业集。 +func (m *Manager) AddJob(jobSetID schsdk.JobSetID, jobBody JobBody, jobState JobState) schsdk.JobID { + m.pubLock.Lock() + defer m.pubLock.Unlock() + + jobID := schsdk.JobID(fmt.Sprintf("%d", m.jobIDIndex+1)) + m.jobIDIndex += 1 + + job := &mgrJob{ + job: Job{ + JobSetID: jobSetID, + JobID: jobID, + Body: jobBody, + }, + state: jobState, + eventSet: NewEventSet(), + } + + m.jobs[jobID] = job + jobSet := m.jobSets[jobSetID] + jobSet.jobs[jobID] = job + + go func() { + jobState.Run(JobStateRunContext{ + Mgr: m, + EventSet: &job.eventSet, + LastState: nil, + }, &job.job) + }() + + return jobID +} diff --git a/manager/internal/mq/job.go b/manager/internal/mq/job.go index 13f99e8..9d876fc 100644 --- a/manager/internal/mq/job.go +++ b/manager/internal/mq/job.go @@ -1,9 +1,12 @@ package mq import ( + "context" "errors" "fmt" + "gitlink.org.cn/cloudream/common/pkgs/future" + "gitlink.org.cn/cloudream/common/consts/errorcode" "gitlink.org.cn/cloudream/common/pkgs/logger" "gitlink.org.cn/cloudream/common/pkgs/mq" @@ -43,19 +46,55 @@ func (svc *Service) SubmitJobSet(msg *mgrmq.SubmitJobSet) (*mgrmq.SubmitJobSetRe Body: job, InitState: state.NewWaitTargetComplete(), }) + + case *schsdk.MultiInstanceJobInfo: + preSch, ok := msg.PreScheduleScheme.JobSchemes[info.LocalJobID] + logger.Info(">>>localJobID: " + info.LocalJobID) + + job := job.NewMultiInstanceJob(*info, preSch) + + if !ok { + return nil, mq.Failed(errorcode.OperationFailed, fmt.Sprintf("pre schedule scheme for job %s is not found", info.LocalJobID)) + } + + jobs = append(jobs, jobmgr.SubmittingJob{ + Body: job, + InitState: state.NewMultiInstanceInit(), + }) + } } return mq.ReplyOK(mgrmq.NewSubmitJobSetResp(svc.jobMgr.SubmitJobSet(jobs))) } +func (svc *Service) CreateInstance(instInfo *mgrmq.CreateInstance) (*mgrmq.CreateInstanceResp, *mq.CodeMessage) { + logger.Debugf("start create instance") + + fut := future.NewSetValue[event.CreateInstanceResult]() + svc.jobMgr.PostEvent(instInfo.JobID, event.NewInstanceCreate(instInfo.DataSet, fut)) + + result, err := fut.WaitValue(context.TODO()) + + if err != nil { + return nil, mq.Failed(errorcode.OperationFailed, err.Error()) + } + + return mq.ReplyOK(mgrmq.NewCreateInstanceResp(result.JobID, result.FilesUploadScheme)) +} + // 任务集中某个文件上传完成 func (svc *Service) JobSetLocalFileUploaded(msg *mgrmq.JobSetLocalFileUploaded) (*mgrmq.JobSetLocalFileUploadedResp, *mq.CodeMessage) { logger.WithField("LocalPath", msg.LocalPath). WithField("PackageID", msg.PackageID). Debugf("local file uploaded") - svc.jobMgr.BroadcastEvent(msg.JobSetID, event.NewLocalFileUploaded(msg.LocalPath, errors.New(msg.Error), msg.PackageID)) + var err error + if msg.Error != "" { + err = errors.New(msg.Error) + } + + svc.jobMgr.BroadcastEvent(msg.JobSetID, event.NewLocalFileUploaded(msg.LocalPath, err, msg.PackageID)) return mq.ReplyOK(mgrmq.NewJobSetLocalFileUploadedResp()) }