调度系统新增多实例任务功能

This commit is contained in:
JeshuaRen 2024-04-30 16:31:51 +08:00
parent cd64f3da5b
commit a4c43731ac
19 changed files with 402 additions and 863 deletions

View File

@ -0,0 +1,53 @@
package http
import (
"github.com/gin-gonic/gin"
"gitlink.org.cn/cloudream/common/consts/errorcode"
"gitlink.org.cn/cloudream/common/pkgs/logger"
schsdk "gitlink.org.cn/cloudream/common/sdks/scheduler"
"net/http"
)
type JobService struct {
*Server
}
type CreateInstanceResp struct {
InstanceID schsdk.JobID `json:"instanceID"`
UploadScheme schsdk.JobFilesUploadScheme `json:"uploadScheme"`
}
type CreateInstanceReq struct {
LocalJobID string `json:"localJobID" binding:"required"`
LocalPath schsdk.JobFileInfo `json:"filePath" binding:"required"`
}
func (s *Server) JobSvc() *JobService {
return &JobService{
Server: s,
}
}
func (s *JobService) CreateInstance(ctx *gin.Context) {
log := logger.WithField("HTTP", "JobSet.HTTP")
var req CreateInstanceReq
if err := ctx.ShouldBindQuery(&req); err != nil {
log.Warnf("binding body: %s", err.Error())
ctx.JSON(http.StatusBadRequest, Failed(errorcode.BadArgument, "missing argument or invalid argument"))
return
}
jobID, filesUploadScheme, err := s.svc.JobSetSvc().CreateInstance(req.LocalJobID, req.LocalPath)
if err != nil {
log.Warnf("create job instance: %s", err.Error())
ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "create job instance failed"))
return
}
ctx.JSON(http.StatusOK, OK(CreateInstanceResp{
InstanceID: jobID,
UploadScheme: filesUploadScheme,
}))
}

View File

@ -39,6 +39,7 @@ func (s *Server) Serve() error {
func (s *Server) initRouters() { func (s *Server) initRouters() {
s.engine.POST("/jobSet/submit", s.JobSetSvc().Submit) s.engine.POST("/jobSet/submit", s.JobSetSvc().Submit)
s.engine.POST("/jobSet/submit", s.JobSvc().CreateInstance)
s.engine.POST("/jobSet/localFileUploaded", s.JobSetSvc().LocalFileUploaded) s.engine.POST("/jobSet/localFileUploaded", s.JobSetSvc().LocalFileUploaded)
s.engine.GET("/jobSet/getServiceList", s.JobSetSvc().GetServiceList) s.engine.GET("/jobSet/getServiceList", s.JobSetSvc().GetServiceList)
} }

View File

@ -1,721 +0,0 @@
package prescheduler
import (
"fmt"
"sort"
"github.com/inhies/go-bytesize"
"github.com/samber/lo"
schsdk "gitlink.org.cn/cloudream/common/sdks/scheduler"
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
uopsdk "gitlink.org.cn/cloudream/common/sdks/unifyops"
"gitlink.org.cn/cloudream/common/utils/math2"
schglb "gitlink.org.cn/cloudream/scheduler/common/globals"
schmod "gitlink.org.cn/cloudream/scheduler/common/models"
jobmod "gitlink.org.cn/cloudream/scheduler/common/models/job"
"gitlink.org.cn/cloudream/scheduler/common/pkgs/mq/collector"
mgrmq "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq/manager"
)
const (
//每个节点划分的资源等级:
// ResourceLevel1表示所有资源类型均满足 大于等于1.5倍
ResourceLevel1 = 1
// ResourceLevel2表示不满足Level1但所有资源类型均满足 大于等于1倍
ResourceLevel2 = 2
// ResourceLevel3 表示某些资源类型 小于一倍
ResourceLevel3 = 3
CpuResourceWeight float64 = 1
StgResourceWeight float64 = 1.2
CachingWeight float64 = 1
LoadedWeight float64 = 2
)
var ErrNoAvailableScheme = fmt.Errorf("no appropriate scheduling node found, please wait")
type candidate struct {
CC schmod.ComputingCenter
IsReferencedJobTarget bool // 这个节点是否是所依赖的任务所选择的节点
Resource resourcesDetail
Files filesDetail
}
type resourcesDetail struct {
CPU resourceDetail
GPU resourceDetail
NPU resourceDetail
MLU resourceDetail
Storage resourceDetail
Memory resourceDetail
TotalScore float64
AvgScore float64
MaxLevel int
}
type resourceDetail struct {
Level int
Score float64
}
type filesDetail struct {
Dataset fileDetail
Code fileDetail
Image fileDetail
TotalScore float64
}
type fileDetail struct {
CachingScore float64
LoadingScore float64
IsLoaded bool //表示storage是否已经调度到该节点, image表示镜像是否已经加载到该算力中心
}
type schedulingJob struct {
Job schsdk.JobInfo
Afters []string
}
type CandidateArr []*candidate
func (a CandidateArr) Len() int { return len(a) }
func (a CandidateArr) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a CandidateArr) Less(i, j int) bool {
n1 := a[i]
n2 := a[j]
// 优先与所依赖的任务放到一起,但要求那个节点的资源足够
if n1.IsReferencedJobTarget && n1.Resource.MaxLevel < ResourceLevel3 {
return true
}
if n2.IsReferencedJobTarget && n2.Resource.MaxLevel < ResourceLevel3 {
return true
}
// 优先判断资源等级,资源等级越低,代表越满足需求
if n1.Resource.MaxLevel < n2.Resource.MaxLevel {
return true
}
if n1.Resource.MaxLevel > n2.Resource.MaxLevel {
return false
}
// 等级相同时,根据单项分值比较
switch n1.Resource.MaxLevel {
case ResourceLevel1:
// 数据文件总分越高,代表此节点上拥有的数据文件越完整,则越优先考虑
return n1.Files.TotalScore > n2.Files.TotalScore
case ResourceLevel2:
// 资源分的平均值越高,代表资源越空余,则越优先考虑
return n1.Resource.AvgScore > n2.Resource.AvgScore
case ResourceLevel3:
// 资源分的平均值越高,代表资源越空余,则越优先考虑
return n1.Resource.AvgScore > n2.Resource.AvgScore
}
return false
}
type DefaultPreScheduler struct {
}
func NewDefaultPreScheduler() *DefaultPreScheduler {
return &DefaultPreScheduler{}
}
func (s *DefaultPreScheduler) Schedule(info *schsdk.JobSetInfo) (*jobmod.JobSetPreScheduleScheme, *schsdk.JobSetFilesUploadScheme, error) {
jobSetScheme := &jobmod.JobSetPreScheduleScheme{
JobSchemes: make(map[string]jobmod.JobScheduleScheme),
}
filesUploadSchemes := make(map[string]schsdk.LocalFileUploadScheme)
mgrCli, err := schglb.ManagerMQPool.Acquire()
if err != nil {
return nil, nil, fmt.Errorf("new collector client: %w", err)
}
defer schglb.ManagerMQPool.Release(mgrCli)
// 查询有哪些算力中心可用
allCC, err := mgrCli.GetAllComputingCenter(mgrmq.NewGetAllComputingCenter())
if err != nil {
return nil, nil, fmt.Errorf("getting all computing center info: %w", err)
}
ccs := make(map[schsdk.CCID]schmod.ComputingCenter)
for _, node := range allCC.ComputingCenters {
ccs[node.CCID] = node
}
if len(ccs) == 0 {
return nil, nil, ErrNoAvailableScheme
}
// 先根据任务配置收集它们依赖的任务的LocalID
var schJobs []*schedulingJob
for _, job := range info.Jobs {
j := &schedulingJob{
Job: job,
}
if norJob, ok := job.(*schsdk.NormalJobInfo); ok {
if resFile, ok := norJob.Files.Dataset.(*schsdk.ResourceJobFileInfo); ok {
j.Afters = append(j.Afters, resFile.ResourceLocalJobID)
}
if resFile, ok := norJob.Files.Code.(*schsdk.ResourceJobFileInfo); ok {
j.Afters = append(j.Afters, resFile.ResourceLocalJobID)
}
} else if resJob, ok := job.(*schsdk.DataReturnJobInfo); ok {
j.Afters = append(j.Afters, resJob.TargetLocalJobID)
}
schJobs = append(schJobs, j)
}
// 然后根据依赖进行排序
schJobs, ok := s.orderByAfters(schJobs)
if !ok {
return nil, nil, fmt.Errorf("circular reference detected between jobs in the job set")
}
// 经过排序后,按顺序生成调度方案
for _, job := range schJobs {
if norJob, ok := job.Job.(*schsdk.NormalJobInfo); ok {
scheme, err := s.scheduleForNormalJob(info, job, ccs, jobSetScheme.JobSchemes)
if err != nil {
return nil, nil, err
}
jobSetScheme.JobSchemes[job.Job.GetLocalJobID()] = *scheme
// 检查数据文件的配置项,生成上传文件方案
s.fillNormarlJobLocalUploadScheme(norJob, scheme.TargetCCID, filesUploadSchemes, ccs)
}
// 回源任务目前不需要生成调度方案
}
return jobSetScheme, &schsdk.JobSetFilesUploadScheme{
LocalFileSchemes: lo.Values(filesUploadSchemes),
}, nil
}
func (s *DefaultPreScheduler) orderByAfters(jobs []*schedulingJob) ([]*schedulingJob, bool) {
type jobOrder struct {
Job *schedulingJob
Afters []string
}
var jobOrders []*jobOrder
for _, job := range jobs {
od := &jobOrder{
Job: job,
Afters: make([]string, len(job.Afters)),
}
copy(od.Afters, job.Afters)
jobOrders = append(jobOrders, od)
}
// 然后排序
var orderedJob []*schedulingJob
for {
rm := 0
for i, jo := range jobOrders {
// 找到没有依赖的任务,然后将其取出
if len(jo.Afters) == 0 {
orderedJob = append(orderedJob, jo.Job)
// 删除其他任务对它的引用
for _, job2 := range jobOrders {
job2.Afters = lo.Reject(job2.Afters, func(item string, idx int) bool { return item == jo.Job.Job.GetLocalJobID() })
}
rm++
continue
}
jobOrders[i-rm] = jobOrders[i]
}
jobOrders = jobOrders[:len(jobOrders)-rm]
if len(jobOrders) == 0 {
break
}
// 遍历一轮后没有找到无依赖的任务,那么就是存在循环引用,排序失败
if rm == 0 {
return nil, false
}
}
return orderedJob, true
}
func (s *DefaultPreScheduler) scheduleForNormalJob(jobSet *schsdk.JobSetInfo, job *schedulingJob, ccs map[schsdk.CCID]schmod.ComputingCenter, jobSchemes map[string]jobmod.JobScheduleScheme) (*jobmod.JobScheduleScheme, error) {
allCCs := make(map[schsdk.CCID]*candidate)
// 初始化备选节点信息
for _, cc := range ccs {
caNode := &candidate{
CC: cc,
}
// 检查此节点是否是它所引用的任务所选的节点
for _, af := range job.Afters {
resJob := findJobInfo[*schsdk.DataReturnJobInfo](jobSet.Jobs, af)
if resJob == nil {
return nil, fmt.Errorf("resource job %s not found in the job set", af)
}
// 由于jobs已经按照引用排序所以正常情况下这里肯定能取到值
scheme, ok := jobSchemes[resJob.TargetLocalJobID]
if !ok {
continue
}
if scheme.TargetCCID == cc.CCID {
caNode.IsReferencedJobTarget = true
break
}
}
allCCs[cc.CCID] = caNode
}
norJob := job.Job.(*schsdk.NormalJobInfo)
// 计算文件占有量得分
err := s.calcFileScore(norJob.Files, allCCs)
if err != nil {
return nil, err
}
// 计算资源余量得分
err = s.calcResourceScore(norJob, allCCs)
if err != nil {
return nil, err
}
allCCsArr := lo.Values(allCCs)
sort.Sort(CandidateArr(allCCsArr))
targetNode := allCCsArr[0]
if targetNode.Resource.MaxLevel == ResourceLevel3 {
return nil, ErrNoAvailableScheme
}
scheme := s.makeSchemeForNode(norJob, targetNode)
return &scheme, nil
}
func (s *DefaultPreScheduler) fillNormarlJobLocalUploadScheme(norJob *schsdk.NormalJobInfo, targetCCID schsdk.CCID, schemes map[string]schsdk.LocalFileUploadScheme, ccs map[schsdk.CCID]schmod.ComputingCenter) {
if localFile, ok := norJob.Files.Dataset.(*schsdk.LocalJobFileInfo); ok {
if _, ok := schemes[localFile.LocalPath]; !ok {
cdsNodeID := ccs[targetCCID].CDSNodeID
schemes[localFile.LocalPath] = schsdk.LocalFileUploadScheme{
LocalPath: localFile.LocalPath,
UploadToCDSNodeID: &cdsNodeID,
}
}
}
if localFile, ok := norJob.Files.Code.(*schsdk.LocalJobFileInfo); ok {
if _, ok := schemes[localFile.LocalPath]; !ok {
cdsNodeID := ccs[targetCCID].CDSNodeID
schemes[localFile.LocalPath] = schsdk.LocalFileUploadScheme{
LocalPath: localFile.LocalPath,
UploadToCDSNodeID: &cdsNodeID,
}
}
}
if localFile, ok := norJob.Files.Image.(*schsdk.LocalJobFileInfo); ok {
if _, ok := schemes[localFile.LocalPath]; !ok {
cdsNodeID := ccs[targetCCID].CDSNodeID
schemes[localFile.LocalPath] = schsdk.LocalFileUploadScheme{
LocalPath: localFile.LocalPath,
UploadToCDSNodeID: &cdsNodeID,
}
}
}
}
func (s *DefaultPreScheduler) makeSchemeForNode(job *schsdk.NormalJobInfo, targetCC *candidate) jobmod.JobScheduleScheme {
scheme := jobmod.JobScheduleScheme{
TargetCCID: targetCC.CC.CCID,
}
// TODO 根据实际情况选择Move或者Load
if _, ok := job.Files.Dataset.(*schsdk.PackageJobFileInfo); ok && !targetCC.Files.Dataset.IsLoaded {
scheme.Dataset.Action = jobmod.ActionLoad
} else {
scheme.Dataset.Action = jobmod.ActionNo
}
if _, ok := job.Files.Code.(*schsdk.PackageJobFileInfo); ok && !targetCC.Files.Code.IsLoaded {
scheme.Code.Action = jobmod.ActionLoad
} else {
scheme.Code.Action = jobmod.ActionNo
}
if _, ok := job.Files.Image.(*schsdk.PackageJobFileInfo); ok && !targetCC.Files.Image.IsLoaded {
scheme.Image.Action = jobmod.ActionImportImage
} else {
scheme.Image.Action = jobmod.ActionNo
}
return scheme
}
func (s *DefaultPreScheduler) calcResourceScore(job *schsdk.NormalJobInfo, allCCs map[schsdk.CCID]*candidate) error {
for _, cc := range allCCs {
res, err := s.calcOneResourceScore(job.Resources, &cc.CC)
if err != nil {
return err
}
cc.Resource = *res
}
return nil
}
// 划分节点资源等级,并计算资源得分
func (s *DefaultPreScheduler) calcOneResourceScore(requires schsdk.JobResourcesInfo, cc *schmod.ComputingCenter) (*resourcesDetail, error) {
colCli, err := schglb.CollectorMQPool.Acquire()
if err != nil {
return nil, fmt.Errorf("new collector client: %w", err)
}
defer schglb.CollectorMQPool.Release(colCli)
getResDataResp, err := colCli.GetAllResourceData(collector.NewGetAllResourceData(cc.UOPSlwNodeID))
if err != nil {
return nil, err
}
var resDetail resourcesDetail
//计算资源得分
totalScore := 0.0
maxLevel := 0
resKinds := 0
if requires.CPU > 0 {
res := findResuorce[*uopsdk.CPUResourceData](getResDataResp.Datas)
if res == nil {
resDetail.CPU.Level = ResourceLevel3
resDetail.CPU.Score = 0
} else {
resDetail.CPU.Level = s.calcResourceLevel(float64(res.Available.Value), requires.CPU)
resDetail.CPU.Score = (float64(res.Available.Value) / requires.CPU) * CpuResourceWeight
}
maxLevel = math2.Max(maxLevel, resDetail.CPU.Level)
totalScore += resDetail.CPU.Score
resKinds++
}
if requires.GPU > 0 {
res := findResuorce[*uopsdk.GPUResourceData](getResDataResp.Datas)
if res == nil {
resDetail.GPU.Level = ResourceLevel3
resDetail.GPU.Score = 0
} else {
resDetail.GPU.Level = s.calcResourceLevel(float64(res.Available.Value), requires.GPU)
resDetail.GPU.Score = (float64(res.Available.Value) / requires.GPU) * CpuResourceWeight
}
maxLevel = math2.Max(maxLevel, resDetail.GPU.Level)
totalScore += resDetail.GPU.Score
resKinds++
}
if requires.NPU > 0 {
res := findResuorce[*uopsdk.NPUResourceData](getResDataResp.Datas)
if res == nil {
resDetail.NPU.Level = ResourceLevel3
resDetail.NPU.Score = 0
} else {
resDetail.NPU.Level = s.calcResourceLevel(float64(res.Available.Value), requires.NPU)
resDetail.NPU.Score = (float64(res.Available.Value) / requires.NPU) * CpuResourceWeight
}
maxLevel = math2.Max(maxLevel, resDetail.NPU.Level)
totalScore += resDetail.NPU.Score
resKinds++
}
if requires.MLU > 0 {
res := findResuorce[*uopsdk.MLUResourceData](getResDataResp.Datas)
if res == nil {
resDetail.MLU.Level = ResourceLevel3
resDetail.MLU.Score = 0
} else {
resDetail.MLU.Level = s.calcResourceLevel(float64(res.Available.Value), requires.MLU)
resDetail.MLU.Score = (float64(res.Available.Value) / requires.MLU) * CpuResourceWeight
}
maxLevel = math2.Max(maxLevel, resDetail.MLU.Level)
totalScore += resDetail.MLU.Score
resKinds++
}
if requires.Storage > 0 {
res := findResuorce[*uopsdk.StorageResourceData](getResDataResp.Datas)
if res == nil {
resDetail.Storage.Level = ResourceLevel3
resDetail.Storage.Score = 0
} else {
bytes, err := bytesize.Parse(fmt.Sprintf("%f%s", res.Available.Value, res.Available.Unit))
if err != nil {
return nil, err
}
resDetail.Storage.Level = s.calcResourceLevel(float64(bytes), float64(requires.Storage))
resDetail.Storage.Score = (float64(bytes) / float64(requires.Storage)) * StgResourceWeight
}
maxLevel = math2.Max(maxLevel, resDetail.Storage.Level)
totalScore += resDetail.Storage.Score
resKinds++
}
if requires.Memory > 0 {
res := findResuorce[*uopsdk.MemoryResourceData](getResDataResp.Datas)
if res == nil {
resDetail.Memory.Level = ResourceLevel3
resDetail.Memory.Score = 0
} else {
bytes, err := bytesize.Parse(fmt.Sprintf("%f%s", res.Available.Value, res.Available.Unit))
if err != nil {
return nil, err
}
resDetail.Memory.Level = s.calcResourceLevel(float64(bytes), float64(requires.Memory))
resDetail.Memory.Score = (float64(bytes) / float64(requires.Memory)) * StgResourceWeight
}
maxLevel = math2.Max(maxLevel, resDetail.Memory.Level)
totalScore += resDetail.Memory.Score
resKinds++
}
if resKinds == 0 {
return &resDetail, nil
}
resDetail.TotalScore = totalScore
resDetail.AvgScore = resDetail.AvgScore / float64(resKinds)
resDetail.MaxLevel = maxLevel
return &resDetail, nil
}
func (s *DefaultPreScheduler) calcResourceLevel(avai float64, need float64) int {
if avai >= 1.5*need {
return ResourceLevel1
}
if avai >= need {
return ResourceLevel2
}
return ResourceLevel3
}
// 计算节点得分情况
func (s *DefaultPreScheduler) calcFileScore(files schsdk.JobFilesInfo, allCCs map[schsdk.CCID]*candidate) error {
// 只计算运控返回的可用计算中心上的存储服务的数据权重
cdsNodeToCC := make(map[cdssdk.NodeID]*candidate)
for _, cc := range allCCs {
cdsNodeToCC[cc.CC.CDSNodeID] = cc
}
//计算code相关得分
if pkgFile, ok := files.Code.(*schsdk.PackageJobFileInfo); ok {
codeFileScores, err := s.calcPackageFileScore(pkgFile.PackageID, cdsNodeToCC)
if err != nil {
return fmt.Errorf("calc code file score: %w", err)
}
for id, score := range codeFileScores {
allCCs[id].Files.Code = *score
}
}
//计算dataset相关得分
if pkgFile, ok := files.Dataset.(*schsdk.PackageJobFileInfo); ok {
datasetFileScores, err := s.calcPackageFileScore(pkgFile.PackageID, cdsNodeToCC)
if err != nil {
return fmt.Errorf("calc dataset file score: %w", err)
}
for id, score := range datasetFileScores {
allCCs[id].Files.Dataset = *score
}
}
//计算image相关得分
if imgFile, ok := files.Image.(*schsdk.ImageJobFileInfo); ok {
//计算image相关得分
imageFileScores, err := s.calcImageFileScore(imgFile.ImageID, allCCs, cdsNodeToCC)
if err != nil {
return fmt.Errorf("calc image file score: %w", err)
}
for id, score := range imageFileScores {
allCCs[id].Files.Image = *score
}
}
for _, cc := range allCCs {
cc.Files.TotalScore = cc.Files.Code.CachingScore +
cc.Files.Code.LoadingScore +
cc.Files.Dataset.CachingScore +
cc.Files.Dataset.LoadingScore +
cc.Files.Image.CachingScore +
cc.Files.Image.LoadingScore
}
return nil
}
// 计算package在各节点的得分情况
func (s *DefaultPreScheduler) calcPackageFileScore(packageID cdssdk.PackageID, cdsNodeToCC map[cdssdk.NodeID]*candidate) (map[schsdk.CCID]*fileDetail, error) {
colCli, err := schglb.CollectorMQPool.Acquire()
if err != nil {
return nil, fmt.Errorf("new collector client: %w", err)
}
defer schglb.CollectorMQPool.Release(colCli)
ccFileScores := make(map[schsdk.CCID]*fileDetail)
// TODO UserID
cachedResp, err := colCli.PackageGetCachedStgNodes(collector.NewPackageGetCachedStgNodes(1, packageID))
if err != nil {
return nil, err
}
for _, cdsNodeCacheInfo := range cachedResp.NodeInfos {
cc, ok := cdsNodeToCC[cdsNodeCacheInfo.NodeID]
if !ok {
continue
}
ccFileScores[cc.CC.CCID] = &fileDetail{
//TODO 根据缓存方式不同,可能会有不同的计算方式
CachingScore: float64(cdsNodeCacheInfo.FileSize) / float64(cachedResp.PackageSize) * CachingWeight,
}
}
// TODO UserID
loadedResp, err := colCli.PackageGetLoadedStgNodes(collector.NewPackageGetLoadedStgNodes(1, packageID))
if err != nil {
return nil, err
}
for _, cdsNodeID := range loadedResp.StgNodeIDs {
cc, ok := cdsNodeToCC[cdsNodeID]
if !ok {
continue
}
sfc, ok := ccFileScores[cc.CC.CCID]
if !ok {
sfc = &fileDetail{}
ccFileScores[cc.CC.CCID] = sfc
}
sfc.LoadingScore = 1 * LoadedWeight
sfc.IsLoaded = true
}
return ccFileScores, nil
}
// 计算package在各节点的得分情况
func (s *DefaultPreScheduler) calcImageFileScore(imageID schsdk.ImageID, allCCs map[schsdk.CCID]*candidate, cdsNodeToCC map[cdssdk.NodeID]*candidate) (map[schsdk.CCID]*fileDetail, error) {
colCli, err := schglb.CollectorMQPool.Acquire()
if err != nil {
return nil, fmt.Errorf("new collector client: %w", err)
}
defer schglb.CollectorMQPool.Release(colCli)
magCli, err := schglb.ManagerMQPool.Acquire()
if err != nil {
return nil, fmt.Errorf("new manager client: %w", err)
}
defer schglb.ManagerMQPool.Release(magCli)
imageInfoResp, err := magCli.GetImageInfo(mgrmq.NewGetImageInfo(imageID))
if err != nil {
return nil, fmt.Errorf("getting image info: %w", err)
}
ccFileScores := make(map[schsdk.CCID]*fileDetail)
if imageInfoResp.Image.CDSPackageID != nil {
cachedResp, err := colCli.PackageGetCachedStgNodes(collector.NewPackageGetCachedStgNodes(1, *imageInfoResp.Image.CDSPackageID))
if err != nil {
return nil, err
}
for _, cdsNodeCacheInfo := range cachedResp.NodeInfos {
cc, ok := cdsNodeToCC[cdsNodeCacheInfo.NodeID]
if !ok {
continue
}
ccFileScores[cc.CC.CCID] = &fileDetail{
//TODO 根据缓存方式不同,可能会有不同的计算方式
CachingScore: float64(cdsNodeCacheInfo.FileSize) / float64(cachedResp.PackageSize) * CachingWeight,
}
}
}
// 镜像的LoadingScore是判断是否导入到算力中心
for _, pcmImg := range imageInfoResp.PCMImages {
_, ok := allCCs[pcmImg.CCID]
if !ok {
continue
}
fsc, ok := ccFileScores[pcmImg.CCID]
if !ok {
fsc = &fileDetail{}
ccFileScores[pcmImg.CCID] = fsc
}
fsc.LoadingScore = 1 * LoadedWeight
fsc.IsLoaded = true
}
return ccFileScores, nil
}
func findResuorce[T uopsdk.ResourceData](all []uopsdk.ResourceData) T {
for _, data := range all {
if ret, ok := data.(T); ok {
return ret
}
}
var def T
return def
}
func findJobInfo[T schsdk.JobInfo](jobs []schsdk.JobInfo, localJobID string) T {
for _, job := range jobs {
if ret, ok := job.(T); ok && job.GetLocalJobID() == localJobID {
return ret
}
}
var def T
return def
}

View File

@ -1,117 +0,0 @@
package prescheduler
import (
"testing"
"github.com/samber/lo"
. "github.com/smartystreets/goconvey/convey"
schsdk "gitlink.org.cn/cloudream/common/sdks/scheduler"
)
func TestOrderByAfters(t *testing.T) {
cases := []struct {
title string
jobs []*schedulingJob
wants []string
}{
{
title: "所有Job都有直接或间接的依赖关系",
jobs: []*schedulingJob{
{
Job: &schsdk.NormalJobInfo{JobInfoBase: schsdk.JobInfoBase{LocalJobID: "1"}},
Afters: []string{"2"},
},
{
Job: &schsdk.NormalJobInfo{JobInfoBase: schsdk.JobInfoBase{LocalJobID: "2"}},
Afters: []string{},
},
{
Job: &schsdk.NormalJobInfo{JobInfoBase: schsdk.JobInfoBase{LocalJobID: "3"}},
Afters: []string{"1"},
},
},
wants: []string{"2", "1", "3"},
},
{
title: "部分Job之间无依赖关系",
jobs: []*schedulingJob{
{
Job: &schsdk.NormalJobInfo{JobInfoBase: schsdk.JobInfoBase{LocalJobID: "1"}},
Afters: []string{"2"},
},
{
Job: &schsdk.NormalJobInfo{JobInfoBase: schsdk.JobInfoBase{LocalJobID: "2"}},
Afters: []string{},
},
{
Job: &schsdk.NormalJobInfo{JobInfoBase: schsdk.JobInfoBase{LocalJobID: "3"}},
Afters: []string{"1"},
},
{
Job: &schsdk.NormalJobInfo{JobInfoBase: schsdk.JobInfoBase{LocalJobID: "4"}},
Afters: []string{"5"},
},
{
Job: &schsdk.NormalJobInfo{JobInfoBase: schsdk.JobInfoBase{LocalJobID: "5"}},
Afters: []string{},
},
},
wants: []string{"2", "5", "1", "3", "4"},
},
{
title: "存在循环依赖",
jobs: []*schedulingJob{
{
Job: &schsdk.NormalJobInfo{JobInfoBase: schsdk.JobInfoBase{LocalJobID: "1"}},
Afters: []string{"2"},
},
{
Job: &schsdk.NormalJobInfo{JobInfoBase: schsdk.JobInfoBase{LocalJobID: "2"}},
Afters: []string{"1"},
},
},
wants: nil,
},
{
title: "完全不依赖",
jobs: []*schedulingJob{
{
Job: &schsdk.NormalJobInfo{JobInfoBase: schsdk.JobInfoBase{LocalJobID: "1"}},
Afters: []string{},
},
{
Job: &schsdk.NormalJobInfo{JobInfoBase: schsdk.JobInfoBase{LocalJobID: "2"}},
Afters: []string{},
},
},
wants: []string{"1", "2"},
},
}
sch := NewDefaultPreScheduler()
for _, c := range cases {
Convey(c.title, t, func() {
ordered, ok := sch.orderByAfters(c.jobs)
if c.wants == nil {
So(ok, ShouldBeFalse)
} else {
So(ok, ShouldBeTrue)
ids := lo.Map(ordered, func(item *schedulingJob, idx int) string { return item.Job.GetLocalJobID() })
So(ids, ShouldResemble, c.wants)
}
})
}
}

View File

@ -1,10 +0,0 @@
package prescheduler
import (
schsdk "gitlink.org.cn/cloudream/common/sdks/scheduler"
jobmod "gitlink.org.cn/cloudream/scheduler/common/models/job"
)
type PreScheduler interface {
Schedule(info *schsdk.JobSetInfo) (*jobmod.JobSetPreScheduleScheme, *schsdk.JobSetFilesUploadScheme, error)
}

View File

@ -0,0 +1,27 @@
package services
import (
"fmt"
schsdk "gitlink.org.cn/cloudream/common/sdks/scheduler"
schglb "gitlink.org.cn/cloudream/scheduler/common/globals"
mgrmq "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq/manager"
)
// Create 创建多实例任务中的实例任务
func (svc *JobSetService) CreateInstance(LocalJobID string, LocalPath schsdk.JobFileInfo) (schsdk.JobID, schsdk.JobFilesUploadScheme, error) {
scheme := new(schsdk.JobFilesUploadScheme)
mgrCli, err := schglb.ManagerMQPool.Acquire()
if err != nil {
return "", *scheme, fmt.Errorf("new manager client: %w", err)
}
defer schglb.ManagerMQPool.Release(mgrCli)
resp, err := mgrCli.CreateInstance(mgrmq.NewCreateInstance(LocalJobID, LocalPath))
if err != nil {
return "", *scheme, fmt.Errorf("submitting job set to manager: %w", err)
}
return resp.InstanceID, resp.UploadScheme, nil
}

View File

@ -25,7 +25,7 @@ func (svc *JobSetService) Submit(info schsdk.JobSetInfo) (schsdk.JobSetID, *schs
} }
defer schglb.ManagerMQPool.Release(mgrCli) defer schglb.ManagerMQPool.Release(mgrCli)
schScheme, uploadScheme, err := svc.preScheduler.Schedule(&info) schScheme, uploadScheme, err := svc.preScheduler.ScheduleJobSet(&info)
if err != nil { if err != nil {
return "", nil, fmt.Errorf("pre scheduling: %w", err) return "", nil, fmt.Errorf("pre scheduling: %w", err)
} }

View File

@ -1,6 +1,8 @@
package services package services
import "gitlink.org.cn/cloudream/scheduler/client/internal/prescheduler" import (
"gitlink.org.cn/cloudream/common/pkgs/prescheduler"
)
type Service struct { type Service struct {
preScheduler prescheduler.PreScheduler preScheduler prescheduler.PreScheduler

View File

@ -2,6 +2,7 @@ package main
import ( import (
"fmt" "fmt"
"gitlink.org.cn/cloudream/common/pkgs/prescheduler"
"os" "os"
_ "google.golang.org/grpc/balancer/grpclb" _ "google.golang.org/grpc/balancer/grpclb"
@ -9,7 +10,6 @@ import (
"gitlink.org.cn/cloudream/common/pkgs/logger" "gitlink.org.cn/cloudream/common/pkgs/logger"
"gitlink.org.cn/cloudream/scheduler/client/internal/cmdline" "gitlink.org.cn/cloudream/scheduler/client/internal/cmdline"
"gitlink.org.cn/cloudream/scheduler/client/internal/config" "gitlink.org.cn/cloudream/scheduler/client/internal/config"
"gitlink.org.cn/cloudream/scheduler/client/internal/prescheduler"
"gitlink.org.cn/cloudream/scheduler/client/internal/services" "gitlink.org.cn/cloudream/scheduler/client/internal/services"
schglb "gitlink.org.cn/cloudream/scheduler/common/globals" schglb "gitlink.org.cn/cloudream/scheduler/common/globals"
) )

View File

@ -15,11 +15,15 @@ type JobService interface {
GetServiceList(msg *GetServiceList) (*GetServiceListResp, *mq.CodeMessage) GetServiceList(msg *GetServiceList) (*GetServiceListResp, *mq.CodeMessage)
GetJobSetDump(msg *GetJobSetDump) (*GetJobSetDumpResp, *mq.CodeMessage) GetJobSetDump(msg *GetJobSetDump) (*GetJobSetDumpResp, *mq.CodeMessage)
CreateInstance(msg *CreateInstance) (*CreateInstanceResp, *mq.CodeMessage)
} }
// 提交任务集 // 提交任务集
var _ = Register(Service.SubmitJobSet) var _ = Register(Service.SubmitJobSet)
var _ = Register(Service.CreateInstance)
type SubmitJobSet struct { type SubmitJobSet struct {
mq.MessageBodyBase mq.MessageBodyBase
JobSet schsdk.JobSetInfo `json:"jobSet"` JobSet schsdk.JobSetInfo `json:"jobSet"`
@ -45,6 +49,36 @@ func (c *Client) SubmitJobSet(msg *SubmitJobSet, opts ...mq.RequestOption) (*Sub
return mq.Request(Service.SubmitJobSet, c.roundTripper, msg, opts...) return mq.Request(Service.SubmitJobSet, c.roundTripper, msg, opts...)
} }
type CreateInstance struct {
mq.MessageBodyBase
LocalJobID string
LocalPath schsdk.JobFileInfo
}
type CreateInstanceResp struct {
mq.MessageBodyBase
InstanceID schsdk.JobID `json:"instanceID"`
UploadScheme schsdk.JobFilesUploadScheme `json:"uploadScheme"`
}
func NewCreateInstance(LocalJobID string, LocalPath schsdk.JobFileInfo) *CreateInstance {
return &CreateInstance{
LocalJobID: LocalJobID,
LocalPath: LocalPath,
}
}
func NewCreateInstanceResp(InstanceID schsdk.JobID, UploadScheme schsdk.JobFilesUploadScheme) *CreateInstanceResp {
return &CreateInstanceResp{
InstanceID: InstanceID,
UploadScheme: UploadScheme,
}
}
func (c *Client) CreateInstance(instance *CreateInstance, opts ...mq.RequestOption) (*CreateInstanceResp, error) {
return mq.Request(Service.CreateInstance, c.roundTripper, instance, opts...)
}
// JobSet中需要使用的一个文件上传完成 // JobSet中需要使用的一个文件上传完成
var _ = Register(Service.JobSetLocalFileUploaded) var _ = Register(Service.JobSetLocalFileUploaded)

View File

@ -0,0 +1,25 @@
package event
import (
"gitlink.org.cn/cloudream/common/pkgs/future"
schsdk "gitlink.org.cn/cloudream/common/sdks/scheduler"
)
type CreateInstanceFuture = *future.SetValueFuture[CreateInstanceResult]
type InstanceCreate struct {
LocalPath schsdk.JobFileInfo
Result CreateInstanceFuture
}
type CreateInstanceResult struct {
JobID schsdk.JobID
FilesUploadScheme schsdk.JobFilesUploadScheme
}
func NewInstanceCreate(LocalPath schsdk.JobFileInfo, future CreateInstanceFuture) *InstanceCreate {
return &InstanceCreate{
LocalPath: LocalPath,
Result: future,
}
}

View File

@ -6,22 +6,40 @@ import (
"gitlink.org.cn/cloudream/scheduler/manager/internal/jobmgr" "gitlink.org.cn/cloudream/scheduler/manager/internal/jobmgr"
) )
// WaitType 等待一个特定类型的事件。
// 通过给定的上下文和事件集,这个函数会阻塞直到匹配指定类型的事件发生。
// ctx: 用于控制等待过程的上下文,如果上下文被取消或到期,等待将被终止。
// set: 指向一个事件集,这个事件集会被用来等待特定类型的事件。
// 返回值 T: 等待到的事件,它会被强制转换为函数参数类型 T。
// 返回值 bool: 表示等待操作是否成功。如果成功等到事件,返回 true如果因为上下文被取消或到期而终止返回 false。
func WaitType[T jobmgr.Event](ctx context.Context, set *jobmgr.EventSet) (T, bool) { func WaitType[T jobmgr.Event](ctx context.Context, set *jobmgr.EventSet) (T, bool) {
// 使用 set.Wait 方法等待一个满足给定条件的事件。
// 条件函数检查事件是否能被转换为类型 T。
ret, ok := set.Wait(ctx, func(evt jobmgr.Event) bool { ret, ok := set.Wait(ctx, func(evt jobmgr.Event) bool {
_, ok := evt.(T) _, ok := evt.(T)
return ok return ok
}) })
// 因为 set.Wait 返回的事件类型是 jobmgr.Event这里将它转换为 T 类型,并返回转换结果及操作成功标志。
return ret.(T), ok return ret.(T), ok
} }
// WaitTypeAnd 等待一个特定类型的事件并检查该事件是否满足给定的条件。
// ctx: 上下文,用于控制等待过程的取消或超时。
// set: 事件集合,从中等待事件发生。
// cond: 一个函数,用于检查等待的事件是否满足特定条件。
// 返回值为满足条件的事件和一个布尔值,指示获取事件是否成功。
func WaitTypeAnd[T jobmgr.Event](ctx context.Context, set *jobmgr.EventSet, cond func(val T) bool) (T, bool) { func WaitTypeAnd[T jobmgr.Event](ctx context.Context, set *jobmgr.EventSet, cond func(val T) bool) (T, bool) {
// 等待一个满足特定类型和条件的事件。
ret, ok := set.Wait(ctx, func(evt jobmgr.Event) bool { ret, ok := set.Wait(ctx, func(evt jobmgr.Event) bool {
// 尝试将事件断言为特定类型T并检查断言是否成功。
e, ok := evt.(T) e, ok := evt.(T)
if !ok { if !ok {
return false return false // 如果事件不是期望的类型T则返回false。
} }
// 如果事件是类型T且满足给定条件则返回true。
return cond(e) return cond(e)
}) })
// 断言返回的事件为类型T并返回该事件和操作成功标志。
return ret.(T), ok return ret.(T), ok
} }

View File

@ -25,20 +25,26 @@ func NewEventSet() EventSet {
return EventSet{} return EventSet{}
} }
// Post 函数用于向事件集合中发布一个事件。
// 如果有等待该事件的协程,会唤醒它们并将事件传递给它们。
// 参数:
//
// evt Event - 需要发布的事件对象。
func (s *EventSet) Post(evt Event) { func (s *EventSet) Post(evt Event) {
s.lock.Lock() s.lock.Lock() // 加锁保护事件集合
defer s.lock.Unlock() defer s.lock.Unlock() // 确保在函数结束时释放锁
// 一个事件能唤醒多个等待者 // 遍历等待者列表查找匹配的等待者。如果找到从列表中移除并设置其future的值。
used := false used := false // 标记当前事件是否已被使用(即是否唤醒了某个等待者)
for i, waiter := range s.waiters { for i, waiter := range s.waiters {
if waiter.condition(evt) { if waiter.condition(evt) { // 检查当前事件是否满足等待条件
s.waiters = lo2.RemoveAt(s.waiters, i) s.waiters = lo2.RemoveAt(s.waiters, i) // 从等待者列表中移除当前等待者
waiter.future.SetValue(evt) waiter.future.SetValue(evt) // 设置等待者的future值为当前事件
used = true used = true // 标记事件已被使用
} }
} }
// 如果没有匹配的等待者,则将事件添加到事件列表中。
if !used { if !used {
s.events = append(s.events, evt) s.events = append(s.events, evt)
} }

View File

@ -0,0 +1,30 @@
package job
import (
schsdk "gitlink.org.cn/cloudream/common/sdks/scheduler"
jobmod "gitlink.org.cn/cloudream/scheduler/common/models/job"
)
type InstanceJob struct {
Info schsdk.InstanceJobInfo // 提交任务时提供的任务描述信息
Files jobmod.JobFiles // 任务需要的文件
TargetCCID schsdk.CCID // 将要运行此任务的算力中心ID
OutputFullPath string // 程序结果的完整输出路径
}
func NewInstanceJob(info schsdk.InstanceJobInfo) *InstanceJob {
return &InstanceJob{
Info: info,
}
}
func (j *InstanceJob) GetInfo() schsdk.JobInfo {
return &j.Info
}
func (j *InstanceJob) Dump() jobmod.JobBodyDump {
return &jobmod.NormalJobDump{
Files: j.Files,
TargetCCID: j.TargetCCID,
}
}

View File

@ -0,0 +1,30 @@
package job
import (
schsdk "gitlink.org.cn/cloudream/common/sdks/scheduler"
jobmod "gitlink.org.cn/cloudream/scheduler/common/models/job"
)
type MultiInstanceJob struct {
Info schsdk.MultiInstanceJobInfo
Files jobmod.JobFiles
TargetCCID schsdk.CCID
SubJobs []schsdk.JobID
}
func NewMultiInstanceJob(info schsdk.MultiInstanceJobInfo) *MultiInstanceJob {
return &MultiInstanceJob{
Info: info,
}
}
func (j *MultiInstanceJob) GetInfo() schsdk.JobInfo {
return &j.Info
}
func (j *MultiInstanceJob) Dump() jobmod.JobBodyDump {
return &jobmod.NormalJobDump{
Files: j.Files,
TargetCCID: j.TargetCCID,
}
}

View File

@ -41,7 +41,7 @@ func (s *ReadyToAdjust) do(rtx jobmgr.JobStateRunContext, jo *jobmgr.Job) error
if rt, ok := norJob.Info.Files.Dataset.(*schsdk.DataReturnJobFileInfo); ok { if rt, ok := norJob.Info.Files.Dataset.(*schsdk.DataReturnJobFileInfo); ok {
evt, ok := event.WaitTypeAnd[event.JobCompleted](ctx, rtx.EventSet, func(val event.JobCompleted) bool { evt, ok := event.WaitTypeAnd[event.JobCompleted](ctx, rtx.EventSet, func(val event.JobCompleted) bool {
return val.Job.GetInfo().GetLocalJobID() == rt.ResourceLocalJobID return val.Job.GetInfo().GetLocalJobID() == rt.DataReturnLocalJobID
}) })
if !ok { if !ok {
return jobmgr.ErrJobCancelled return jobmgr.ErrJobCancelled

View File

@ -0,0 +1,79 @@
package state
import (
"context"
"gitlink.org.cn/cloudream/common/pkgs/prescheduler"
schsdk "gitlink.org.cn/cloudream/common/sdks/scheduler"
"gitlink.org.cn/cloudream/scheduler/manager/internal/jobmgr"
"gitlink.org.cn/cloudream/scheduler/manager/internal/jobmgr/event"
"gitlink.org.cn/cloudream/scheduler/manager/internal/jobmgr/job"
)
type CreateInstance struct {
preScheduler prescheduler.PreScheduler
}
func NewCreateInstance(preScheduler prescheduler.PreScheduler) *CreateInstance {
return &CreateInstance{
preScheduler: preScheduler,
}
}
func (s *CreateInstance) Run(rtx jobmgr.JobStateRunContext, job *jobmgr.Job) {
s.do(rtx, job)
}
func (s *CreateInstance) do(rtx jobmgr.JobStateRunContext, jo *jobmgr.Job) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go func() {
event.WaitType[event.Cancel](ctx, rtx.EventSet)
cancel()
}()
multInstJob := jo.Body.(*job.MultiInstanceJob)
for {
// 监听创建实例事件
ic, ok := event.WaitType[event.InstanceCreate](ctx, rtx.EventSet)
if !ok {
break
}
// 构建InstanceJobInfo
files := schsdk.JobFilesInfo{
Dataset: ic.LocalPath,
Code: multInstJob.Info.Files.Code,
Image: multInstJob.Info.Files.Image,
}
instJobInfo := &schsdk.InstanceJobInfo{
LocalJobID: multInstJob.Info.LocalJobID,
Files: files,
Runtime: multInstJob.Info.Runtime,
Resources: multInstJob.Info.Resources,
}
// 生成预调度方案和文件上传方案
jobSchedule, filesUploadScheme, err := s.preScheduler.ScheduleJob(instJobInfo)
if err != nil {
ic.Result.SetError(err)
continue
}
// 创建实例并运行
instanceJob := job.NewInstanceJob(*instJobInfo)
jobID := rtx.Mgr.AddJob(jo.JobSetID, instanceJob, NewPreSchuduling(*jobSchedule))
// 在多实例任务中新增这个实例的任务ID
multInstJob.SubJobs = append(multInstJob.SubJobs, jobID)
// 将实例ID和文件上传方案返回
ic.Result.SetValue(event.CreateInstanceResult{
JobID: jobID,
FilesUploadScheme: *filesUploadScheme,
})
}
}

View File

@ -95,17 +95,25 @@ func (m *Manager) PostEvent(jobID schsdk.JobID, evt Event) {
}() }()
} }
// BroadcastEvent 向所有属于指定 jobSet 的任务广播一个事件。
// jobSetID: 代表作业集的唯一标识符。
// evt: 需要广播的事件。
func (m *Manager) BroadcastEvent(jobSetID schsdk.JobSetID, evt Event) { func (m *Manager) BroadcastEvent(jobSetID schsdk.JobSetID, evt Event) {
// 加锁以确保发布事件时的线程安全
m.pubLock.Lock() m.pubLock.Lock()
defer m.pubLock.Unlock() defer m.pubLock.Unlock() // 确保函数退出时释放锁
// 尝试从管理器的作业集中获取指定的作业集
jobSet, ok := m.jobSets[jobSetID] jobSet, ok := m.jobSets[jobSetID]
if !ok { if !ok {
// 如果作业集不存在,则直接返回
return return
} }
// 遍历作业集中的所有任务,并为每个任务发布事件
for _, mjob := range jobSet.jobs { for _, mjob := range jobSet.jobs {
go func(j *mgrJob) { go func(j *mgrJob) {
// 使用 goroutine 为每个任务发布事件,以异步方式处理,避免阻塞
j.eventSet.Post(evt) j.eventSet.Post(evt)
}(mjob) }(mjob)
} }
@ -116,18 +124,31 @@ type SubmittingJob struct {
InitState JobState InitState JobState
} }
// SubmitJobSet 提交一个作业集将一组提交作业转换为系统可识别的作业集并为每个提交的作业创建一个唯一的作业ID。
//
// 参数:
//
// jobs []SubmittingJob - 要提交的作业列表,每个作业包含作业的初始状态和内容。
//
// 返回值:
//
// schsdk.JobSetID - 生成的作业集ID用于标识这个作业集。
func (m *Manager) SubmitJobSet(jobs []SubmittingJob) schsdk.JobSetID { func (m *Manager) SubmitJobSet(jobs []SubmittingJob) schsdk.JobSetID {
// 加锁以保护对作业集ID和作业ID索引的修改
m.pubLock.Lock() m.pubLock.Lock()
defer m.pubLock.Unlock() defer m.pubLock.Unlock()
// 生成一个新的作业集ID并递增作业集ID索引
jobSetID := schsdk.JobSetID(fmt.Sprintf("%d", m.jobSetIDIndex)) jobSetID := schsdk.JobSetID(fmt.Sprintf("%d", m.jobSetIDIndex))
m.jobSetIDIndex += 1 m.jobSetIDIndex += 1
// 创建一个新的作业集实例,并初始化其作业映射
jobSet := &mgrJobSet{ jobSet := &mgrJobSet{
jobs: make(map[schsdk.JobID]*mgrJob), jobs: make(map[schsdk.JobID]*mgrJob),
} }
m.jobSets[jobSetID] = jobSet m.jobSets[jobSetID] = jobSet
// 遍历提交的作业为每个作业创建一个唯一的作业ID初始化作业状态并将其添加到作业集中
for i, subJob := range jobs { for i, subJob := range jobs {
jobID := schsdk.JobID(fmt.Sprintf("%d", m.jobIDIndex+i)) jobID := schsdk.JobID(fmt.Sprintf("%d", m.jobIDIndex+i))
job := &mgrJob{ job := &mgrJob{
@ -140,10 +161,20 @@ func (m *Manager) SubmitJobSet(jobs []SubmittingJob) schsdk.JobSetID {
} }
jobSet.jobs[jobID] = job jobSet.jobs[jobID] = job
m.ChangeState(&job.job, subJob.InitState) // 更改作业的初始状态
//m.ChangeState(&job.job, subJob.InitState)
go func() {
subJob.InitState.Run(JobStateRunContext{
Mgr: m,
EventSet: &job.eventSet,
LastState: job.state,
}, &job.job)
}()
} }
// 更新作业ID索引基于提交的作业数量
m.jobIDIndex += len(jobs) m.jobIDIndex += len(jobs)
// 返回生成的作业集ID
return jobSetID return jobSetID
} }
@ -167,3 +198,23 @@ func (m *Manager) DumpJobSet(jobSetID schsdk.JobSetID) []jobmod.JobDump {
return jobDumps return jobDumps
} }
type PreSchedulerInstJob struct {
Body JobBody
InitState JobState
}
// AddJob 添加一个作业到指定的作业集。
func (m *Manager) AddJob(jobSetID schsdk.JobSetID, jobBody JobBody, State JobState) schsdk.JobID {
jobID := schsdk.JobID(fmt.Sprintf("%d", m.jobIDIndex+1))
job := Job{
JobSetID: jobSetID,
JobID: jobID,
Body: jobBody,
}
m.ChangeState(&job, State)
return jobID
}

View File

@ -1,8 +1,10 @@
package mq package mq
import ( import (
"context"
"errors" "errors"
"fmt" "fmt"
"gitlink.org.cn/cloudream/common/pkgs/future"
"gitlink.org.cn/cloudream/common/consts/errorcode" "gitlink.org.cn/cloudream/common/consts/errorcode"
"gitlink.org.cn/cloudream/common/pkgs/logger" "gitlink.org.cn/cloudream/common/pkgs/logger"
@ -43,12 +45,41 @@ func (svc *Service) SubmitJobSet(msg *mgrmq.SubmitJobSet) (*mgrmq.SubmitJobSetRe
Body: job, Body: job,
InitState: state.NewWaitTargetComplete(), InitState: state.NewWaitTargetComplete(),
}) })
case *schsdk.MultiInstanceJobInfo:
job := job.NewMultiInstanceJob(*info)
preSch, ok := msg.PreScheduleScheme.JobSchemes[info.LocalJobID]
if !ok {
return nil, mq.Failed(errorcode.OperationFailed, fmt.Sprintf("pre schedule scheme for job %s is not found", info.LocalJobID))
}
jobs = append(jobs, jobmgr.SubmittingJob{
Body: job,
InitState: state.NewPreSchuduling(preSch),
})
} }
} }
return mq.ReplyOK(mgrmq.NewSubmitJobSetResp(svc.jobMgr.SubmitJobSet(jobs))) return mq.ReplyOK(mgrmq.NewSubmitJobSetResp(svc.jobMgr.SubmitJobSet(jobs)))
} }
func (svc *Service) CreateInstance(instInfo *mgrmq.CreateInstance) (*mgrmq.CreateInstanceResp, *mq.CodeMessage) {
logger.Debugf("start create instance")
fut := future.NewSetValue[event.CreateInstanceResult]()
svc.jobMgr.PostEvent(schsdk.JobID(instInfo.LocalJobID), event.NewInstanceCreate(instInfo.LocalPath, fut))
result, err := fut.WaitValue(context.TODO())
if err != nil {
return nil, mq.Failed(errorcode.OperationFailed, err.Error())
}
return mq.ReplyOK(mgrmq.NewCreateInstanceResp(result.JobID, result.FilesUploadScheme))
}
// 任务集中某个文件上传完成 // 任务集中某个文件上传完成
func (svc *Service) JobSetLocalFileUploaded(msg *mgrmq.JobSetLocalFileUploaded) (*mgrmq.JobSetLocalFileUploadedResp, *mq.CodeMessage) { func (svc *Service) JobSetLocalFileUploaded(msg *mgrmq.JobSetLocalFileUploaded) (*mgrmq.JobSetLocalFileUploadedResp, *mq.CodeMessage) {
logger.WithField("LocalPath", msg.LocalPath). logger.WithField("LocalPath", msg.LocalPath).