多实例任务代码优化

This commit is contained in:
JeshuaRen 2024-05-07 15:45:13 +08:00
parent a4c43731ac
commit 0de174fd9a
13 changed files with 236 additions and 53 deletions

View File

@ -5,6 +5,8 @@ import (
"gitlink.org.cn/cloudream/common/consts/errorcode" "gitlink.org.cn/cloudream/common/consts/errorcode"
"gitlink.org.cn/cloudream/common/pkgs/logger" "gitlink.org.cn/cloudream/common/pkgs/logger"
schsdk "gitlink.org.cn/cloudream/common/sdks/scheduler" schsdk "gitlink.org.cn/cloudream/common/sdks/scheduler"
"gitlink.org.cn/cloudream/common/utils/serder"
"io"
"net/http" "net/http"
) )
@ -19,7 +21,7 @@ type CreateInstanceResp struct {
type CreateInstanceReq struct { type CreateInstanceReq struct {
LocalJobID string `json:"localJobID" binding:"required"` LocalJobID string `json:"localJobID" binding:"required"`
LocalPath schsdk.JobFileInfo `json:"filePath" binding:"required"` LocalPath schsdk.JobFileInfo `json:"localPath" binding:"required"`
} }
func (s *Server) JobSvc() *JobService { func (s *Server) JobSvc() *JobService {
@ -31,10 +33,24 @@ func (s *Server) JobSvc() *JobService {
func (s *JobService) CreateInstance(ctx *gin.Context) { func (s *JobService) CreateInstance(ctx *gin.Context) {
log := logger.WithField("HTTP", "JobSet.HTTP") log := logger.WithField("HTTP", "JobSet.HTTP")
var req CreateInstanceReq //var req CreateInstanceReq
if err := ctx.ShouldBindQuery(&req); err != nil { //if err := ctx.ShouldBindJSON(&req); err != nil {
log.Warnf("binding body: %s", err.Error()) // log.Warnf("binding body: %s", err.Error())
ctx.JSON(http.StatusBadRequest, Failed(errorcode.BadArgument, "missing argument or invalid argument")) // ctx.JSON(http.StatusBadRequest, Failed(errorcode.BadArgument, "missing argument or invalid argument"))
// return
//}
bodyData, err := io.ReadAll(ctx.Request.Body)
if err != nil {
log.Warnf("reading request body: %s", err.Error())
ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "read request body failed"))
return
}
req, err := serder.JSONToObjectEx[CreateInstanceReq](bodyData)
if err != nil {
log.Warnf("parsing request body: %s", err.Error())
ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "parse request body failed"))
return return
} }

View File

@ -39,7 +39,7 @@ func (s *Server) Serve() error {
func (s *Server) initRouters() { func (s *Server) initRouters() {
s.engine.POST("/jobSet/submit", s.JobSetSvc().Submit) s.engine.POST("/jobSet/submit", s.JobSetSvc().Submit)
s.engine.POST("/jobSet/submit", s.JobSvc().CreateInstance) s.engine.POST("/job/CreateInstance", s.JobSvc().CreateInstance)
s.engine.POST("/jobSet/localFileUploaded", s.JobSetSvc().LocalFileUploaded) s.engine.POST("/jobSet/localFileUploaded", s.JobSetSvc().LocalFileUploaded)
s.engine.GET("/jobSet/getServiceList", s.JobSetSvc().GetServiceList) s.engine.GET("/jobSet/getServiceList", s.JobSetSvc().GetServiceList)
} }

View File

@ -40,3 +40,25 @@ type DataReturnJobDump struct {
func (d *DataReturnJobDump) getType() JobBodyDumpType { func (d *DataReturnJobDump) getType() JobBodyDumpType {
return d.Type return d.Type
} }
type InstanceJobDump struct {
serder.Metadata `union:"NormalJob"`
Type JobBodyDumpType `json:"type"`
TargetCCID schsdk.CCID `json:"targetCCID"`
Files JobFiles `json:"files"`
}
func (d *InstanceJobDump) getType() JobBodyDumpType {
return d.Type
}
type MultiInstanceJobDump struct {
serder.Metadata `union:"NormalJob"`
Type JobBodyDumpType `json:"type"`
TargetCCID schsdk.CCID `json:"targetCCID"`
Files JobFiles `json:"files"`
}
func (d *MultiInstanceJobDump) getType() JobBodyDumpType {
return d.Type
}

View File

@ -19,6 +19,10 @@ func WaitType[T jobmgr.Event](ctx context.Context, set *jobmgr.EventSet) (T, boo
_, ok := evt.(T) _, ok := evt.(T)
return ok return ok
}) })
if ret == nil {
var r T
return r, false // 如果事件为空则返回false。
}
// 因为 set.Wait 返回的事件类型是 jobmgr.Event这里将它转换为 T 类型,并返回转换结果及操作成功标志。 // 因为 set.Wait 返回的事件类型是 jobmgr.Event这里将它转换为 T 类型,并返回转换结果及操作成功标志。
return ret.(T), ok return ret.(T), ok
} }
@ -40,6 +44,10 @@ func WaitTypeAnd[T jobmgr.Event](ctx context.Context, set *jobmgr.EventSet, cond
// 如果事件是类型T且满足给定条件则返回true。 // 如果事件是类型T且满足给定条件则返回true。
return cond(e) return cond(e)
}) })
if ret == nil {
var r T
return r, false // 如果事件为空则返回false。
}
// 断言返回的事件为类型T并返回该事件和操作成功标志。 // 断言返回的事件为类型T并返回该事件和操作成功标志。
return ret.(T), ok return ret.(T), ok
} }

View File

@ -12,9 +12,10 @@ type InstanceJob struct {
OutputFullPath string // 程序结果的完整输出路径 OutputFullPath string // 程序结果的完整输出路径
} }
func NewInstanceJob(info schsdk.InstanceJobInfo) *InstanceJob { func NewInstanceJob(info schsdk.InstanceJobInfo, files jobmod.JobFiles) *InstanceJob {
return &InstanceJob{ return &InstanceJob{
Info: info, Info: info,
Files: files,
} }
} }
@ -23,7 +24,7 @@ func (j *InstanceJob) GetInfo() schsdk.JobInfo {
} }
func (j *InstanceJob) Dump() jobmod.JobBodyDump { func (j *InstanceJob) Dump() jobmod.JobBodyDump {
return &jobmod.NormalJobDump{ return &jobmod.InstanceJobDump{
Files: j.Files, Files: j.Files,
TargetCCID: j.TargetCCID, TargetCCID: j.TargetCCID,
} }

View File

@ -6,15 +6,17 @@ import (
) )
type MultiInstanceJob struct { type MultiInstanceJob struct {
Info schsdk.MultiInstanceJobInfo Info schsdk.MultiInstanceJobInfo
Files jobmod.JobFiles Files jobmod.JobFiles
TargetCCID schsdk.CCID TargetCCID schsdk.CCID
SubJobs []schsdk.JobID SubJobs []schsdk.JobID
PreScheduler jobmod.JobScheduleScheme
} }
func NewMultiInstanceJob(info schsdk.MultiInstanceJobInfo) *MultiInstanceJob { func NewMultiInstanceJob(info schsdk.MultiInstanceJobInfo, preScheduler jobmod.JobScheduleScheme) *MultiInstanceJob {
return &MultiInstanceJob{ return &MultiInstanceJob{
Info: info, Info: info,
PreScheduler: preScheduler,
} }
} }
@ -23,7 +25,7 @@ func (j *MultiInstanceJob) GetInfo() schsdk.JobInfo {
} }
func (j *MultiInstanceJob) Dump() jobmod.JobBodyDump { func (j *MultiInstanceJob) Dump() jobmod.JobBodyDump {
return &jobmod.NormalJobDump{ return &jobmod.MultiInstanceJobDump{
Files: j.Files, Files: j.Files,
TargetCCID: j.TargetCCID, TargetCCID: j.TargetCCID,
} }

View File

@ -46,7 +46,25 @@ func (s *Adjusting) Dump(rtx jobmgr.JobStateRunContext, jo *jobmgr.Job) jobmod.J
} }
func (s *Adjusting) do(rtx jobmgr.JobStateRunContext, jo *jobmgr.Job) error { func (s *Adjusting) do(rtx jobmgr.JobStateRunContext, jo *jobmgr.Job) error {
norJob := jo.Body.(*job.NormalJob) //norJob := jo.Body.(*job.NormalJob)
var jobFilesInfo schsdk.JobFilesInfo
var jobFiles *jobmod.JobFiles
var targetCCID schsdk.CCID
switch runningJob := jo.Body.(type) {
case *job.NormalJob:
jobFilesInfo = runningJob.Info.Files
jobFiles = &runningJob.Files
targetCCID = runningJob.TargetCCID
case *job.MultiInstanceJob:
jobFilesInfo = runningJob.Info.Files
jobFiles = &runningJob.Files
targetCCID = runningJob.TargetCCID
case *job.InstanceJob:
jobFilesInfo = runningJob.Info.Files
jobFiles = &runningJob.Files
targetCCID = runningJob.TargetCCID
}
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
defer cancel() defer cancel()
@ -77,7 +95,8 @@ func (s *Adjusting) do(rtx jobmgr.JobStateRunContext, jo *jobmgr.Job) error {
return fmt.Errorf("getting cds storage info: %w", err) return fmt.Errorf("getting cds storage info: %w", err)
} }
// TODO UserID // TODO UserID
norJob.OutputFullPath = utils.MakeJobOutputFullPath(stgInfo.Directory, 1, jo.JobID) //norJob.OutputFullPath = utils.MakeJobOutputFullPath(stgInfo.Directory, 1, jo.JobID)
utils.MakeJobOutputFullPath(stgInfo.Directory, 1, jo.JobID)
wg := sync.WaitGroup{} wg := sync.WaitGroup{}
wg.Add(3) wg.Add(3)
@ -86,7 +105,7 @@ func (s *Adjusting) do(rtx jobmgr.JobStateRunContext, jo *jobmgr.Job) error {
go func() { go func() {
defer wg.Done() defer wg.Done()
e1 = s.doPackageScheduling(ctx, rtx, norJob, norJob.Info.Files.Dataset, &norJob.Files.Dataset, &s.scheme.Dataset) e1 = s.doPackageScheduling(ctx, rtx, jobFilesInfo.Dataset, &jobFiles.Dataset, &s.scheme.Dataset)
if e1 != nil { if e1 != nil {
cancel() cancel()
} }
@ -94,7 +113,7 @@ func (s *Adjusting) do(rtx jobmgr.JobStateRunContext, jo *jobmgr.Job) error {
go func() { go func() {
defer wg.Done() defer wg.Done()
e2 = s.doPackageScheduling(ctx, rtx, norJob, norJob.Info.Files.Code, &norJob.Files.Code, &s.scheme.Code) e2 = s.doPackageScheduling(ctx, rtx, jobFilesInfo.Code, &jobFiles.Code, &s.scheme.Code)
if e2 != nil { if e2 != nil {
cancel() cancel()
} }
@ -102,7 +121,7 @@ func (s *Adjusting) do(rtx jobmgr.JobStateRunContext, jo *jobmgr.Job) error {
go func() { go func() {
defer wg.Done() defer wg.Done()
e3 = s.doImageScheduling(ctx, rtx, norJob, norJob.Info.Files.Image, &norJob.Files.Image, &s.scheme.Image) e3 = s.doImageScheduling(ctx, rtx, targetCCID, jobFilesInfo.Image, &jobFiles.Image, &s.scheme.Image)
if e3 != nil { if e3 != nil {
cancel() cancel()
} }
@ -111,7 +130,7 @@ func (s *Adjusting) do(rtx jobmgr.JobStateRunContext, jo *jobmgr.Job) error {
return errors.Join(e1, e2, e3) return errors.Join(e1, e2, e3)
} }
func (s *Adjusting) doPackageScheduling(ctx context.Context, rtx jobmgr.JobStateRunContext, job *job.NormalJob, fileInfo schsdk.JobFileInfo, file *jobmod.PackageJobFile, scheme *jobmod.FileScheduleScheme) error { func (s *Adjusting) doPackageScheduling(ctx context.Context, rtx jobmgr.JobStateRunContext, fileInfo schsdk.JobFileInfo, file *jobmod.PackageJobFile, scheme *jobmod.FileScheduleScheme) error {
switch info := fileInfo.(type) { switch info := fileInfo.(type) {
case *schsdk.LocalJobFileInfo: case *schsdk.LocalJobFileInfo:
evt, ok := event.WaitTypeAnd[event.LocalFileUploaded](ctx, rtx.EventSet, func(e event.LocalFileUploaded) bool { evt, ok := event.WaitTypeAnd[event.LocalFileUploaded](ctx, rtx.EventSet, func(e event.LocalFileUploaded) bool {
@ -173,7 +192,7 @@ func (s *Adjusting) doPackageScheduling(ctx context.Context, rtx jobmgr.JobState
return nil return nil
} }
func (s *Adjusting) doImageScheduling(ctx context.Context, rtx jobmgr.JobStateRunContext, job *job.NormalJob, fileInfo schsdk.JobFileInfo, file *jobmod.ImageJobFile, scheme *jobmod.FileScheduleScheme) error { func (s *Adjusting) doImageScheduling(ctx context.Context, rtx jobmgr.JobStateRunContext, targetCCID schsdk.CCID, fileInfo schsdk.JobFileInfo, file *jobmod.ImageJobFile, scheme *jobmod.FileScheduleScheme) error {
switch info := fileInfo.(type) { switch info := fileInfo.(type) {
case *schsdk.LocalJobFileInfo: case *schsdk.LocalJobFileInfo:
evt, ok := event.WaitTypeAnd[event.LocalFileUploaded](ctx, rtx.EventSet, func(e event.LocalFileUploaded) bool { evt, ok := event.WaitTypeAnd[event.LocalFileUploaded](ctx, rtx.EventSet, func(e event.LocalFileUploaded) bool {
@ -260,7 +279,7 @@ func (s *Adjusting) doImageScheduling(ctx context.Context, rtx jobmgr.JobStateRu
} }
// TODO 镜像名称 // TODO 镜像名称
err = rtx.Mgr.DB.PCMImage().Create(rtx.Mgr.DB.SQLCtx(), file.ImageID, job.TargetCCID, uploadStatus.PCMImageID, uploadStatus.Name, time.Now()) err = rtx.Mgr.DB.PCMImage().Create(rtx.Mgr.DB.SQLCtx(), file.ImageID, targetCCID, uploadStatus.PCMImageID, uploadStatus.Name, time.Now())
if err != nil { if err != nil {
return fmt.Errorf("creating image info: %w", err) return fmt.Errorf("creating image info: %w", err)
} }

View File

@ -3,6 +3,7 @@ package state
import ( import (
"context" "context"
"fmt" "fmt"
schsdk "gitlink.org.cn/cloudream/common/sdks/scheduler"
"gitlink.org.cn/cloudream/common/pkgs/logger" "gitlink.org.cn/cloudream/common/pkgs/logger"
pcmsdk "gitlink.org.cn/cloudream/common/sdks/pcm" pcmsdk "gitlink.org.cn/cloudream/common/sdks/pcm"
@ -40,30 +41,45 @@ func (s *NormalJobExecuting) Dump(rtx jobmgr.JobStateRunContext, jo *jobmgr.Job)
} }
func (s *NormalJobExecuting) do(rtx jobmgr.JobStateRunContext, jo *jobmgr.Job) error { func (s *NormalJobExecuting) do(rtx jobmgr.JobStateRunContext, jo *jobmgr.Job) error {
norJob := jo.Body.(*job.NormalJob) //norJob := jo.Body.(*job.NormalJob)
var runtime *schsdk.JobRuntimeInfo
var jobFiles *jobmod.JobFiles
var targetCCID schsdk.CCID
switch runningJob := jo.Body.(type) {
case *job.NormalJob:
runtime = &runningJob.Info.Runtime
jobFiles = &runningJob.Files
targetCCID = runningJob.TargetCCID
case *job.InstanceJob:
runtime = &runningJob.Info.Runtime
jobFiles = &runningJob.Files
targetCCID = runningJob.TargetCCID
}
log := logger.WithType[NormalJobExecuting]("State").WithField("JobID", jo.JobID) log := logger.WithType[NormalJobExecuting]("State").WithField("JobID", jo.JobID)
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
defer cancel() defer cancel()
pcmImgInfo, err := rtx.Mgr.DB.PCMImage().GetByImageIDAndCCID(rtx.Mgr.DB.SQLCtx(), norJob.Files.Image.ImageID, norJob.TargetCCID) pcmImgInfo, err := rtx.Mgr.DB.PCMImage().GetByImageIDAndCCID(rtx.Mgr.DB.SQLCtx(), jobFiles.Image.ImageID, targetCCID)
if err != nil { if err != nil {
return fmt.Errorf("getting pcm image info: %w", err) return fmt.Errorf("getting pcm image info: %w", err)
} }
ccInfo, err := rtx.Mgr.DB.ComputingCenter().GetByID(rtx.Mgr.DB.SQLCtx(), norJob.TargetCCID) ccInfo, err := rtx.Mgr.DB.ComputingCenter().GetByID(rtx.Mgr.DB.SQLCtx(), targetCCID)
if err != nil { if err != nil {
return fmt.Errorf("getting computing center info: %w", err) return fmt.Errorf("getting computing center info: %w", err)
} }
// TODO 需要添加DATA_IN、DATA_OUT等环境变量这些数据从Job的信息中来获取 // TODO 需要添加DATA_IN、DATA_OUT等环境变量这些数据从Job的信息中来获取
ress, err := rtx.Mgr.DB.CCResource().GetByCCID(rtx.Mgr.DB.SQLCtx(), norJob.TargetCCID) ress, err := rtx.Mgr.DB.CCResource().GetByCCID(rtx.Mgr.DB.SQLCtx(), targetCCID)
if err != nil { if err != nil {
return fmt.Errorf("getting computing center resource: %w", err) return fmt.Errorf("getting computing center resource: %w", err)
} }
if len(ress) == 0 { if len(ress) == 0 {
return fmt.Errorf("no resource found at computing center %v", norJob.TargetCCID) return fmt.Errorf("no resource found at computing center %v", targetCCID)
} }
wt := rtx.Mgr.ExecMgr.StartTask(exetsk.NewSubmitTask( wt := rtx.Mgr.ExecMgr.StartTask(exetsk.NewSubmitTask(
@ -71,8 +87,8 @@ func (s *NormalJobExecuting) do(rtx jobmgr.JobStateRunContext, jo *jobmgr.Job) e
pcmImgInfo.PCMImageID, pcmImgInfo.PCMImageID,
// TODO 选择资源的算法 // TODO 选择资源的算法
ress[0].PCMResourceID, ress[0].PCMResourceID,
norJob.Info.Runtime.Command, runtime.Command,
norJob.Info.Runtime.Envs, runtime.Envs,
)) ))
defer wt.Close() defer wt.Close()

View File

@ -0,0 +1,53 @@
package state
import (
"context"
"gitlink.org.cn/cloudream/common/pkgs/logger"
schsdk "gitlink.org.cn/cloudream/common/sdks/scheduler"
jobmod "gitlink.org.cn/cloudream/scheduler/common/models/job"
"gitlink.org.cn/cloudream/scheduler/manager/internal/jobmgr"
"gitlink.org.cn/cloudream/scheduler/manager/internal/jobmgr/event"
"gitlink.org.cn/cloudream/scheduler/manager/internal/jobmgr/job"
)
type MultiInstanceInit struct {
}
func (s *MultiInstanceInit) Run(rtx jobmgr.JobStateRunContext, job *jobmgr.Job) {
s.do(rtx, job)
}
func (s *MultiInstanceInit) do(rtx jobmgr.JobStateRunContext, jo *jobmgr.Job) {
multInstJob := jo.Body.(*job.MultiInstanceJob)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go func() {
event.WaitType[event.Cancel](ctx, rtx.EventSet)
cancel()
}()
instJobInfo := &schsdk.InstanceJobInfo{
LocalJobID: multInstJob.Info.LocalJobID,
Files: multInstJob.Info.Files,
Runtime: multInstJob.Info.Runtime,
Resources: multInstJob.Info.Resources,
}
files := jobmod.JobFiles{
Dataset: multInstJob.Files.Dataset,
Code: multInstJob.Files.Code,
Image: multInstJob.Files.Image,
}
// 创建实例并运行
instanceJob := job.NewInstanceJob(*instJobInfo, files)
jobID := rtx.Mgr.AddJob(jo.JobSetID, instanceJob, NewPreSchuduling(multInstJob.PreScheduler))
logger.Info("Init instance success, jobID: " + jobID)
// 在多实例任务中新增这个实例的任务ID
multInstJob.SubJobs = append(multInstJob.SubJobs, jobID)
rtx.Mgr.ChangeState(jo, NewMultiInstanceRunning())
}

View File

@ -2,28 +2,39 @@ package state
import ( import (
"context" "context"
"gitlink.org.cn/cloudream/common/pkgs/logger"
"gitlink.org.cn/cloudream/common/pkgs/prescheduler" "gitlink.org.cn/cloudream/common/pkgs/prescheduler"
schsdk "gitlink.org.cn/cloudream/common/sdks/scheduler" schsdk "gitlink.org.cn/cloudream/common/sdks/scheduler"
jobmod "gitlink.org.cn/cloudream/scheduler/common/models/job"
"gitlink.org.cn/cloudream/scheduler/manager/internal/jobmgr" "gitlink.org.cn/cloudream/scheduler/manager/internal/jobmgr"
"gitlink.org.cn/cloudream/scheduler/manager/internal/jobmgr/event" "gitlink.org.cn/cloudream/scheduler/manager/internal/jobmgr/event"
"gitlink.org.cn/cloudream/scheduler/manager/internal/jobmgr/job" "gitlink.org.cn/cloudream/scheduler/manager/internal/jobmgr/job"
) )
type CreateInstance struct { type MultiInstanceRunning struct {
preScheduler prescheduler.PreScheduler preScheduler prescheduler.PreScheduler
} }
func NewCreateInstance(preScheduler prescheduler.PreScheduler) *CreateInstance { func (s *MultiInstanceRunning) Dump(ctx jobmgr.JobStateRunContext, job *jobmgr.Job) jobmod.JobStateDump {
return &CreateInstance{ //TODO implement me
preScheduler: preScheduler, panic("implement me")
}
} }
func (s *CreateInstance) Run(rtx jobmgr.JobStateRunContext, job *jobmgr.Job) { //func NewMultiInstanceRunning(preScheduler prescheduler.PreScheduler) *MultiInstanceRunning {
// return &MultiInstanceRunning{
// preScheduler: preScheduler,
// }
//}
func NewMultiInstanceRunning() *MultiInstanceRunning {
return &MultiInstanceRunning{}
}
func (s *MultiInstanceRunning) Run(rtx jobmgr.JobStateRunContext, job *jobmgr.Job) {
s.do(rtx, job) s.do(rtx, job)
} }
func (s *CreateInstance) do(rtx jobmgr.JobStateRunContext, jo *jobmgr.Job) { func (s *MultiInstanceRunning) do(rtx jobmgr.JobStateRunContext, jo *jobmgr.Job) {
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
defer cancel() defer cancel()
@ -42,7 +53,7 @@ func (s *CreateInstance) do(rtx jobmgr.JobStateRunContext, jo *jobmgr.Job) {
} }
// 构建InstanceJobInfo // 构建InstanceJobInfo
files := schsdk.JobFilesInfo{ infoFiles := schsdk.JobFilesInfo{
Dataset: ic.LocalPath, Dataset: ic.LocalPath,
Code: multInstJob.Info.Files.Code, Code: multInstJob.Info.Files.Code,
Image: multInstJob.Info.Files.Image, Image: multInstJob.Info.Files.Image,
@ -50,11 +61,16 @@ func (s *CreateInstance) do(rtx jobmgr.JobStateRunContext, jo *jobmgr.Job) {
instJobInfo := &schsdk.InstanceJobInfo{ instJobInfo := &schsdk.InstanceJobInfo{
LocalJobID: multInstJob.Info.LocalJobID, LocalJobID: multInstJob.Info.LocalJobID,
Files: files, Files: infoFiles,
Runtime: multInstJob.Info.Runtime, Runtime: multInstJob.Info.Runtime,
Resources: multInstJob.Info.Resources, Resources: multInstJob.Info.Resources,
} }
files := jobmod.JobFiles{
Code: multInstJob.Files.Code,
Image: multInstJob.Files.Image,
}
// 生成预调度方案和文件上传方案 // 生成预调度方案和文件上传方案
jobSchedule, filesUploadScheme, err := s.preScheduler.ScheduleJob(instJobInfo) jobSchedule, filesUploadScheme, err := s.preScheduler.ScheduleJob(instJobInfo)
if err != nil { if err != nil {
@ -63,8 +79,9 @@ func (s *CreateInstance) do(rtx jobmgr.JobStateRunContext, jo *jobmgr.Job) {
} }
// 创建实例并运行 // 创建实例并运行
instanceJob := job.NewInstanceJob(*instJobInfo) instanceJob := job.NewInstanceJob(*instJobInfo, files)
jobID := rtx.Mgr.AddJob(jo.JobSetID, instanceJob, NewPreSchuduling(*jobSchedule)) jobID := rtx.Mgr.AddJob(jo.JobSetID, instanceJob, NewPreSchuduling(*jobSchedule))
logger.Info("Create instance success, jobID: " + jobID)
// 在多实例任务中新增这个实例的任务ID // 在多实例任务中新增这个实例的任务ID
multInstJob.SubJobs = append(multInstJob.SubJobs, jobID) multInstJob.SubJobs = append(multInstJob.SubJobs, jobID)

View File

@ -30,7 +30,24 @@ func NewPreSchuduling(scheme jobmod.JobScheduleScheme) *PreScheduling {
} }
func (s *PreScheduling) Run(rtx jobmgr.JobStateRunContext, jo *jobmgr.Job) { func (s *PreScheduling) Run(rtx jobmgr.JobStateRunContext, jo *jobmgr.Job) {
norJob := jo.Body.(*job.NormalJob) var jobFilesInfo schsdk.JobFilesInfo
var jobFiles *jobmod.JobFiles
var targetCCID schsdk.CCID
switch runningJob := jo.Body.(type) {
case *job.NormalJob:
jobFilesInfo = runningJob.Info.Files
jobFiles = &runningJob.Files
targetCCID = runningJob.TargetCCID
case *job.MultiInstanceJob:
jobFilesInfo = runningJob.Info.Files
jobFiles = &runningJob.Files
targetCCID = runningJob.TargetCCID
case *job.InstanceJob:
jobFilesInfo = runningJob.Info.Files
jobFiles = &runningJob.Files
targetCCID = runningJob.TargetCCID
}
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
defer cancel() defer cancel()
@ -55,7 +72,7 @@ func (s *PreScheduling) Run(rtx jobmgr.JobStateRunContext, jo *jobmgr.Job) {
go func() { go func() {
defer wg.Done() defer wg.Done()
e1 = s.doPackageScheduling(ctx, rtx, norJob, norJob.Info.Files.Dataset, &norJob.Files.Dataset, &s.scheme.Dataset) e1 = s.doPackageScheduling(ctx, rtx, jobFilesInfo.Dataset, &jobFiles.Dataset, &s.scheme.Dataset)
if e1 != nil { if e1 != nil {
cancel() cancel()
} }
@ -63,7 +80,7 @@ func (s *PreScheduling) Run(rtx jobmgr.JobStateRunContext, jo *jobmgr.Job) {
go func() { go func() {
defer wg.Done() defer wg.Done()
e2 = s.doPackageScheduling(ctx, rtx, norJob, norJob.Info.Files.Code, &norJob.Files.Code, &s.scheme.Code) e2 = s.doPackageScheduling(ctx, rtx, jobFilesInfo.Code, &jobFiles.Code, &s.scheme.Code)
if e2 != nil { if e2 != nil {
cancel() cancel()
} }
@ -71,7 +88,7 @@ func (s *PreScheduling) Run(rtx jobmgr.JobStateRunContext, jo *jobmgr.Job) {
go func() { go func() {
defer wg.Done() defer wg.Done()
e3 = s.doImageScheduling(ctx, rtx, norJob, norJob.Info.Files.Image, &norJob.Files.Image, &s.scheme.Image) e3 = s.doImageScheduling(ctx, rtx, targetCCID, jobFilesInfo.Image, &jobFiles.Image, &s.scheme.Image)
if e3 != nil { if e3 != nil {
cancel() cancel()
} }
@ -91,7 +108,7 @@ func (s *PreScheduling) Dump(rtx jobmgr.JobStateRunContext, jo *jobmgr.Job) jobm
} }
} }
func (s *PreScheduling) doPackageScheduling(ctx context.Context, rtx jobmgr.JobStateRunContext, norJob *job.NormalJob, fileInfo schsdk.JobFileInfo, file *jobmod.PackageJobFile, scheme *jobmod.FileScheduleScheme) error { func (s *PreScheduling) doPackageScheduling(ctx context.Context, rtx jobmgr.JobStateRunContext, fileInfo schsdk.JobFileInfo, file *jobmod.PackageJobFile, scheme *jobmod.FileScheduleScheme) error {
switch info := fileInfo.(type) { switch info := fileInfo.(type) {
case *schsdk.LocalJobFileInfo: case *schsdk.LocalJobFileInfo:
evt, ok := event.WaitTypeAnd[event.LocalFileUploaded](ctx, rtx.EventSet, func(e event.LocalFileUploaded) bool { evt, ok := event.WaitTypeAnd[event.LocalFileUploaded](ctx, rtx.EventSet, func(e event.LocalFileUploaded) bool {
@ -153,7 +170,7 @@ func (s *PreScheduling) doPackageScheduling(ctx context.Context, rtx jobmgr.JobS
return nil return nil
} }
func (s *PreScheduling) doImageScheduling(ctx context.Context, rtx jobmgr.JobStateRunContext, norJob *job.NormalJob, fileInfo schsdk.JobFileInfo, file *jobmod.ImageJobFile, scheme *jobmod.FileScheduleScheme) error { func (s *PreScheduling) doImageScheduling(ctx context.Context, rtx jobmgr.JobStateRunContext, targetCCID schsdk.CCID, fileInfo schsdk.JobFileInfo, file *jobmod.ImageJobFile, scheme *jobmod.FileScheduleScheme) error {
switch info := fileInfo.(type) { switch info := fileInfo.(type) {
case *schsdk.LocalJobFileInfo: case *schsdk.LocalJobFileInfo:
evt, ok := event.WaitTypeAnd[event.LocalFileUploaded](ctx, rtx.EventSet, func(e event.LocalFileUploaded) bool { evt, ok := event.WaitTypeAnd[event.LocalFileUploaded](ctx, rtx.EventSet, func(e event.LocalFileUploaded) bool {
@ -240,7 +257,7 @@ func (s *PreScheduling) doImageScheduling(ctx context.Context, rtx jobmgr.JobSta
} }
// TODO 镜像名称 // TODO 镜像名称
err = rtx.Mgr.DB.PCMImage().Create(rtx.Mgr.DB.SQLCtx(), file.ImageID, norJob.TargetCCID, uploadStatus.PCMImageID, uploadStatus.Name, time.Now()) err = rtx.Mgr.DB.PCMImage().Create(rtx.Mgr.DB.SQLCtx(), file.ImageID, targetCCID, uploadStatus.PCMImageID, uploadStatus.Name, time.Now())
if err != nil { if err != nil {
return fmt.Errorf("creating image info: %w", err) return fmt.Errorf("creating image info: %w", err)
} }

View File

@ -28,7 +28,17 @@ func (s *ReadyToAdjust) Run(rtx jobmgr.JobStateRunContext, jo *jobmgr.Job) {
} }
func (s *ReadyToAdjust) do(rtx jobmgr.JobStateRunContext, jo *jobmgr.Job) error { func (s *ReadyToAdjust) do(rtx jobmgr.JobStateRunContext, jo *jobmgr.Job) error {
norJob := jo.Body.(*job.NormalJob) var jobFilesInfo schsdk.JobFilesInfo
var jobFiles *jobmod.JobFiles
switch runningJob := jo.Body.(type) {
case *job.NormalJob:
jobFilesInfo = runningJob.Info.Files
jobFiles = &runningJob.Files
case *job.InstanceJob:
jobFilesInfo = runningJob.Info.Files
jobFiles = &runningJob.Files
}
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
defer cancel() defer cancel()
@ -39,7 +49,7 @@ func (s *ReadyToAdjust) do(rtx jobmgr.JobStateRunContext, jo *jobmgr.Job) error
cancel() cancel()
}() }()
if rt, ok := norJob.Info.Files.Dataset.(*schsdk.DataReturnJobFileInfo); ok { if rt, ok := jobFilesInfo.Dataset.(*schsdk.DataReturnJobFileInfo); ok {
evt, ok := event.WaitTypeAnd[event.JobCompleted](ctx, rtx.EventSet, func(val event.JobCompleted) bool { evt, ok := event.WaitTypeAnd[event.JobCompleted](ctx, rtx.EventSet, func(val event.JobCompleted) bool {
return val.Job.GetInfo().GetLocalJobID() == rt.DataReturnLocalJobID return val.Job.GetInfo().GetLocalJobID() == rt.DataReturnLocalJobID
}) })
@ -54,7 +64,7 @@ func (s *ReadyToAdjust) do(rtx jobmgr.JobStateRunContext, jo *jobmgr.Job) error
return fmt.Errorf("job %s is not a DataReturn job(which is %T)", evt.Job.JobID, evt.Job) return fmt.Errorf("job %s is not a DataReturn job(which is %T)", evt.Job.JobID, evt.Job)
} }
norJob.Files.Dataset.PackageID = rtJob.DataReturnPackageID jobFiles.Dataset.PackageID = rtJob.DataReturnPackageID
} }
return nil return nil

View File

@ -47,9 +47,11 @@ func (svc *Service) SubmitJobSet(msg *mgrmq.SubmitJobSet) (*mgrmq.SubmitJobSetRe
}) })
case *schsdk.MultiInstanceJobInfo: case *schsdk.MultiInstanceJobInfo:
job := job.NewMultiInstanceJob(*info)
preSch, ok := msg.PreScheduleScheme.JobSchemes[info.LocalJobID] preSch, ok := msg.PreScheduleScheme.JobSchemes[info.LocalJobID]
logger.Info(">>>localJobID: " + info.LocalJobID)
job := job.NewMultiInstanceJob(*info, preSch)
if !ok { if !ok {
return nil, mq.Failed(errorcode.OperationFailed, fmt.Sprintf("pre schedule scheme for job %s is not found", info.LocalJobID)) return nil, mq.Failed(errorcode.OperationFailed, fmt.Sprintf("pre schedule scheme for job %s is not found", info.LocalJobID))
} }