diff --git a/client/internal/http/job.go b/client/internal/http/job.go index 8ea8185..b429c44 100644 --- a/client/internal/http/job.go +++ b/client/internal/http/job.go @@ -5,6 +5,8 @@ import ( "gitlink.org.cn/cloudream/common/consts/errorcode" "gitlink.org.cn/cloudream/common/pkgs/logger" schsdk "gitlink.org.cn/cloudream/common/sdks/scheduler" + "gitlink.org.cn/cloudream/common/utils/serder" + "io" "net/http" ) @@ -19,7 +21,7 @@ type CreateInstanceResp struct { type CreateInstanceReq struct { LocalJobID string `json:"localJobID" binding:"required"` - LocalPath schsdk.JobFileInfo `json:"filePath" binding:"required"` + LocalPath schsdk.JobFileInfo `json:"localPath" binding:"required"` } func (s *Server) JobSvc() *JobService { @@ -31,10 +33,24 @@ func (s *Server) JobSvc() *JobService { func (s *JobService) CreateInstance(ctx *gin.Context) { log := logger.WithField("HTTP", "JobSet.HTTP") - var req CreateInstanceReq - if err := ctx.ShouldBindQuery(&req); err != nil { - log.Warnf("binding body: %s", err.Error()) - ctx.JSON(http.StatusBadRequest, Failed(errorcode.BadArgument, "missing argument or invalid argument")) + //var req CreateInstanceReq + //if err := ctx.ShouldBindJSON(&req); err != nil { + // log.Warnf("binding body: %s", err.Error()) + // ctx.JSON(http.StatusBadRequest, Failed(errorcode.BadArgument, "missing argument or invalid argument")) + // return + //} + + bodyData, err := io.ReadAll(ctx.Request.Body) + if err != nil { + log.Warnf("reading request body: %s", err.Error()) + ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "read request body failed")) + return + } + + req, err := serder.JSONToObjectEx[CreateInstanceReq](bodyData) + if err != nil { + log.Warnf("parsing request body: %s", err.Error()) + ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "parse request body failed")) return } diff --git a/client/internal/http/server.go b/client/internal/http/server.go index 2b2eb38..38eba6a 100644 --- a/client/internal/http/server.go +++ b/client/internal/http/server.go @@ -39,7 +39,7 @@ func (s *Server) Serve() error { func (s *Server) initRouters() { s.engine.POST("/jobSet/submit", s.JobSetSvc().Submit) - s.engine.POST("/jobSet/submit", s.JobSvc().CreateInstance) + s.engine.POST("/job/CreateInstance", s.JobSvc().CreateInstance) s.engine.POST("/jobSet/localFileUploaded", s.JobSetSvc().LocalFileUploaded) s.engine.GET("/jobSet/getServiceList", s.JobSetSvc().GetServiceList) } diff --git a/common/models/job/body.go b/common/models/job/body.go index 8b99381..1d91981 100644 --- a/common/models/job/body.go +++ b/common/models/job/body.go @@ -40,3 +40,25 @@ type DataReturnJobDump struct { func (d *DataReturnJobDump) getType() JobBodyDumpType { return d.Type } + +type InstanceJobDump struct { + serder.Metadata `union:"NormalJob"` + Type JobBodyDumpType `json:"type"` + TargetCCID schsdk.CCID `json:"targetCCID"` + Files JobFiles `json:"files"` +} + +func (d *InstanceJobDump) getType() JobBodyDumpType { + return d.Type +} + +type MultiInstanceJobDump struct { + serder.Metadata `union:"NormalJob"` + Type JobBodyDumpType `json:"type"` + TargetCCID schsdk.CCID `json:"targetCCID"` + Files JobFiles `json:"files"` +} + +func (d *MultiInstanceJobDump) getType() JobBodyDumpType { + return d.Type +} diff --git a/manager/internal/jobmgr/event/utils.go b/manager/internal/jobmgr/event/utils.go index dbfbc7d..496ef1f 100644 --- a/manager/internal/jobmgr/event/utils.go +++ b/manager/internal/jobmgr/event/utils.go @@ -19,6 +19,10 @@ func WaitType[T jobmgr.Event](ctx context.Context, set *jobmgr.EventSet) (T, boo _, ok := evt.(T) return ok }) + if ret == nil { + var r T + return r, false // 如果事件为空,则返回false。 + } // 因为 set.Wait 返回的事件类型是 jobmgr.Event,这里将它转换为 T 类型,并返回转换结果及操作成功标志。 return ret.(T), ok } @@ -40,6 +44,10 @@ func WaitTypeAnd[T jobmgr.Event](ctx context.Context, set *jobmgr.EventSet, cond // 如果事件是类型T且满足给定条件,则返回true。 return cond(e) }) + if ret == nil { + var r T + return r, false // 如果事件为空,则返回false。 + } // 断言返回的事件为类型T,并返回该事件和操作成功标志。 return ret.(T), ok } diff --git a/manager/internal/jobmgr/job/instance_job.go b/manager/internal/jobmgr/job/instance_job.go index 24e9243..350bf5a 100644 --- a/manager/internal/jobmgr/job/instance_job.go +++ b/manager/internal/jobmgr/job/instance_job.go @@ -12,9 +12,10 @@ type InstanceJob struct { OutputFullPath string // 程序结果的完整输出路径 } -func NewInstanceJob(info schsdk.InstanceJobInfo) *InstanceJob { +func NewInstanceJob(info schsdk.InstanceJobInfo, files jobmod.JobFiles) *InstanceJob { return &InstanceJob{ - Info: info, + Info: info, + Files: files, } } @@ -23,7 +24,7 @@ func (j *InstanceJob) GetInfo() schsdk.JobInfo { } func (j *InstanceJob) Dump() jobmod.JobBodyDump { - return &jobmod.NormalJobDump{ + return &jobmod.InstanceJobDump{ Files: j.Files, TargetCCID: j.TargetCCID, } diff --git a/manager/internal/jobmgr/job/multiInstance_job.go b/manager/internal/jobmgr/job/multiInstance_job.go index 34325ad..bf0827e 100644 --- a/manager/internal/jobmgr/job/multiInstance_job.go +++ b/manager/internal/jobmgr/job/multiInstance_job.go @@ -6,15 +6,17 @@ import ( ) type MultiInstanceJob struct { - Info schsdk.MultiInstanceJobInfo - Files jobmod.JobFiles - TargetCCID schsdk.CCID - SubJobs []schsdk.JobID + Info schsdk.MultiInstanceJobInfo + Files jobmod.JobFiles + TargetCCID schsdk.CCID + SubJobs []schsdk.JobID + PreScheduler jobmod.JobScheduleScheme } -func NewMultiInstanceJob(info schsdk.MultiInstanceJobInfo) *MultiInstanceJob { +func NewMultiInstanceJob(info schsdk.MultiInstanceJobInfo, preScheduler jobmod.JobScheduleScheme) *MultiInstanceJob { return &MultiInstanceJob{ - Info: info, + Info: info, + PreScheduler: preScheduler, } } @@ -23,7 +25,7 @@ func (j *MultiInstanceJob) GetInfo() schsdk.JobInfo { } func (j *MultiInstanceJob) Dump() jobmod.JobBodyDump { - return &jobmod.NormalJobDump{ + return &jobmod.MultiInstanceJobDump{ Files: j.Files, TargetCCID: j.TargetCCID, } diff --git a/manager/internal/jobmgr/job/state/adjusting.go b/manager/internal/jobmgr/job/state/adjusting.go index 0269baf..19d1ac1 100644 --- a/manager/internal/jobmgr/job/state/adjusting.go +++ b/manager/internal/jobmgr/job/state/adjusting.go @@ -46,7 +46,25 @@ func (s *Adjusting) Dump(rtx jobmgr.JobStateRunContext, jo *jobmgr.Job) jobmod.J } func (s *Adjusting) do(rtx jobmgr.JobStateRunContext, jo *jobmgr.Job) error { - norJob := jo.Body.(*job.NormalJob) + //norJob := jo.Body.(*job.NormalJob) + var jobFilesInfo schsdk.JobFilesInfo + var jobFiles *jobmod.JobFiles + var targetCCID schsdk.CCID + + switch runningJob := jo.Body.(type) { + case *job.NormalJob: + jobFilesInfo = runningJob.Info.Files + jobFiles = &runningJob.Files + targetCCID = runningJob.TargetCCID + case *job.MultiInstanceJob: + jobFilesInfo = runningJob.Info.Files + jobFiles = &runningJob.Files + targetCCID = runningJob.TargetCCID + case *job.InstanceJob: + jobFilesInfo = runningJob.Info.Files + jobFiles = &runningJob.Files + targetCCID = runningJob.TargetCCID + } ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -77,7 +95,8 @@ func (s *Adjusting) do(rtx jobmgr.JobStateRunContext, jo *jobmgr.Job) error { return fmt.Errorf("getting cds storage info: %w", err) } // TODO UserID - norJob.OutputFullPath = utils.MakeJobOutputFullPath(stgInfo.Directory, 1, jo.JobID) + //norJob.OutputFullPath = utils.MakeJobOutputFullPath(stgInfo.Directory, 1, jo.JobID) + utils.MakeJobOutputFullPath(stgInfo.Directory, 1, jo.JobID) wg := sync.WaitGroup{} wg.Add(3) @@ -86,7 +105,7 @@ func (s *Adjusting) do(rtx jobmgr.JobStateRunContext, jo *jobmgr.Job) error { go func() { defer wg.Done() - e1 = s.doPackageScheduling(ctx, rtx, norJob, norJob.Info.Files.Dataset, &norJob.Files.Dataset, &s.scheme.Dataset) + e1 = s.doPackageScheduling(ctx, rtx, jobFilesInfo.Dataset, &jobFiles.Dataset, &s.scheme.Dataset) if e1 != nil { cancel() } @@ -94,7 +113,7 @@ func (s *Adjusting) do(rtx jobmgr.JobStateRunContext, jo *jobmgr.Job) error { go func() { defer wg.Done() - e2 = s.doPackageScheduling(ctx, rtx, norJob, norJob.Info.Files.Code, &norJob.Files.Code, &s.scheme.Code) + e2 = s.doPackageScheduling(ctx, rtx, jobFilesInfo.Code, &jobFiles.Code, &s.scheme.Code) if e2 != nil { cancel() } @@ -102,7 +121,7 @@ func (s *Adjusting) do(rtx jobmgr.JobStateRunContext, jo *jobmgr.Job) error { go func() { defer wg.Done() - e3 = s.doImageScheduling(ctx, rtx, norJob, norJob.Info.Files.Image, &norJob.Files.Image, &s.scheme.Image) + e3 = s.doImageScheduling(ctx, rtx, targetCCID, jobFilesInfo.Image, &jobFiles.Image, &s.scheme.Image) if e3 != nil { cancel() } @@ -111,7 +130,7 @@ func (s *Adjusting) do(rtx jobmgr.JobStateRunContext, jo *jobmgr.Job) error { return errors.Join(e1, e2, e3) } -func (s *Adjusting) doPackageScheduling(ctx context.Context, rtx jobmgr.JobStateRunContext, job *job.NormalJob, fileInfo schsdk.JobFileInfo, file *jobmod.PackageJobFile, scheme *jobmod.FileScheduleScheme) error { +func (s *Adjusting) doPackageScheduling(ctx context.Context, rtx jobmgr.JobStateRunContext, fileInfo schsdk.JobFileInfo, file *jobmod.PackageJobFile, scheme *jobmod.FileScheduleScheme) error { switch info := fileInfo.(type) { case *schsdk.LocalJobFileInfo: evt, ok := event.WaitTypeAnd[event.LocalFileUploaded](ctx, rtx.EventSet, func(e event.LocalFileUploaded) bool { @@ -173,7 +192,7 @@ func (s *Adjusting) doPackageScheduling(ctx context.Context, rtx jobmgr.JobState return nil } -func (s *Adjusting) doImageScheduling(ctx context.Context, rtx jobmgr.JobStateRunContext, job *job.NormalJob, fileInfo schsdk.JobFileInfo, file *jobmod.ImageJobFile, scheme *jobmod.FileScheduleScheme) error { +func (s *Adjusting) doImageScheduling(ctx context.Context, rtx jobmgr.JobStateRunContext, targetCCID schsdk.CCID, fileInfo schsdk.JobFileInfo, file *jobmod.ImageJobFile, scheme *jobmod.FileScheduleScheme) error { switch info := fileInfo.(type) { case *schsdk.LocalJobFileInfo: evt, ok := event.WaitTypeAnd[event.LocalFileUploaded](ctx, rtx.EventSet, func(e event.LocalFileUploaded) bool { @@ -260,7 +279,7 @@ func (s *Adjusting) doImageScheduling(ctx context.Context, rtx jobmgr.JobStateRu } // TODO 镜像名称 - err = rtx.Mgr.DB.PCMImage().Create(rtx.Mgr.DB.SQLCtx(), file.ImageID, job.TargetCCID, uploadStatus.PCMImageID, uploadStatus.Name, time.Now()) + err = rtx.Mgr.DB.PCMImage().Create(rtx.Mgr.DB.SQLCtx(), file.ImageID, targetCCID, uploadStatus.PCMImageID, uploadStatus.Name, time.Now()) if err != nil { return fmt.Errorf("creating image info: %w", err) } diff --git a/manager/internal/jobmgr/job/state/executing.go b/manager/internal/jobmgr/job/state/executing.go index a8cc7f4..f77a02c 100644 --- a/manager/internal/jobmgr/job/state/executing.go +++ b/manager/internal/jobmgr/job/state/executing.go @@ -3,6 +3,7 @@ package state import ( "context" "fmt" + schsdk "gitlink.org.cn/cloudream/common/sdks/scheduler" "gitlink.org.cn/cloudream/common/pkgs/logger" pcmsdk "gitlink.org.cn/cloudream/common/sdks/pcm" @@ -40,30 +41,45 @@ func (s *NormalJobExecuting) Dump(rtx jobmgr.JobStateRunContext, jo *jobmgr.Job) } func (s *NormalJobExecuting) do(rtx jobmgr.JobStateRunContext, jo *jobmgr.Job) error { - norJob := jo.Body.(*job.NormalJob) + //norJob := jo.Body.(*job.NormalJob) + + var runtime *schsdk.JobRuntimeInfo + var jobFiles *jobmod.JobFiles + var targetCCID schsdk.CCID + + switch runningJob := jo.Body.(type) { + case *job.NormalJob: + runtime = &runningJob.Info.Runtime + jobFiles = &runningJob.Files + targetCCID = runningJob.TargetCCID + case *job.InstanceJob: + runtime = &runningJob.Info.Runtime + jobFiles = &runningJob.Files + targetCCID = runningJob.TargetCCID + } log := logger.WithType[NormalJobExecuting]("State").WithField("JobID", jo.JobID) ctx, cancel := context.WithCancel(context.Background()) defer cancel() - pcmImgInfo, err := rtx.Mgr.DB.PCMImage().GetByImageIDAndCCID(rtx.Mgr.DB.SQLCtx(), norJob.Files.Image.ImageID, norJob.TargetCCID) + pcmImgInfo, err := rtx.Mgr.DB.PCMImage().GetByImageIDAndCCID(rtx.Mgr.DB.SQLCtx(), jobFiles.Image.ImageID, targetCCID) if err != nil { return fmt.Errorf("getting pcm image info: %w", err) } - ccInfo, err := rtx.Mgr.DB.ComputingCenter().GetByID(rtx.Mgr.DB.SQLCtx(), norJob.TargetCCID) + ccInfo, err := rtx.Mgr.DB.ComputingCenter().GetByID(rtx.Mgr.DB.SQLCtx(), targetCCID) if err != nil { return fmt.Errorf("getting computing center info: %w", err) } // TODO 需要添加DATA_IN、DATA_OUT等环境变量,这些数据从Job的信息中来获取 - ress, err := rtx.Mgr.DB.CCResource().GetByCCID(rtx.Mgr.DB.SQLCtx(), norJob.TargetCCID) + ress, err := rtx.Mgr.DB.CCResource().GetByCCID(rtx.Mgr.DB.SQLCtx(), targetCCID) if err != nil { return fmt.Errorf("getting computing center resource: %w", err) } if len(ress) == 0 { - return fmt.Errorf("no resource found at computing center %v", norJob.TargetCCID) + return fmt.Errorf("no resource found at computing center %v", targetCCID) } wt := rtx.Mgr.ExecMgr.StartTask(exetsk.NewSubmitTask( @@ -71,8 +87,8 @@ func (s *NormalJobExecuting) do(rtx jobmgr.JobStateRunContext, jo *jobmgr.Job) e pcmImgInfo.PCMImageID, // TODO 选择资源的算法 ress[0].PCMResourceID, - norJob.Info.Runtime.Command, - norJob.Info.Runtime.Envs, + runtime.Command, + runtime.Envs, )) defer wt.Close() diff --git a/manager/internal/jobmgr/job/state/multiInstance_init.go b/manager/internal/jobmgr/job/state/multiInstance_init.go new file mode 100644 index 0000000..99c8785 --- /dev/null +++ b/manager/internal/jobmgr/job/state/multiInstance_init.go @@ -0,0 +1,53 @@ +package state + +import ( + "context" + "gitlink.org.cn/cloudream/common/pkgs/logger" + schsdk "gitlink.org.cn/cloudream/common/sdks/scheduler" + jobmod "gitlink.org.cn/cloudream/scheduler/common/models/job" + "gitlink.org.cn/cloudream/scheduler/manager/internal/jobmgr" + "gitlink.org.cn/cloudream/scheduler/manager/internal/jobmgr/event" + "gitlink.org.cn/cloudream/scheduler/manager/internal/jobmgr/job" +) + +type MultiInstanceInit struct { +} + +func (s *MultiInstanceInit) Run(rtx jobmgr.JobStateRunContext, job *jobmgr.Job) { + s.do(rtx, job) +} + +func (s *MultiInstanceInit) do(rtx jobmgr.JobStateRunContext, jo *jobmgr.Job) { + multInstJob := jo.Body.(*job.MultiInstanceJob) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + go func() { + event.WaitType[event.Cancel](ctx, rtx.EventSet) + cancel() + }() + + instJobInfo := &schsdk.InstanceJobInfo{ + LocalJobID: multInstJob.Info.LocalJobID, + Files: multInstJob.Info.Files, + Runtime: multInstJob.Info.Runtime, + Resources: multInstJob.Info.Resources, + } + + files := jobmod.JobFiles{ + Dataset: multInstJob.Files.Dataset, + Code: multInstJob.Files.Code, + Image: multInstJob.Files.Image, + } + + // 创建实例并运行 + instanceJob := job.NewInstanceJob(*instJobInfo, files) + jobID := rtx.Mgr.AddJob(jo.JobSetID, instanceJob, NewPreSchuduling(multInstJob.PreScheduler)) + logger.Info("Init instance success, jobID: " + jobID) + + // 在多实例任务中新增这个实例的任务ID + multInstJob.SubJobs = append(multInstJob.SubJobs, jobID) + + rtx.Mgr.ChangeState(jo, NewMultiInstanceRunning()) +} diff --git a/manager/internal/jobmgr/job/state/running.go b/manager/internal/jobmgr/job/state/multiInstance_running.go similarity index 62% rename from manager/internal/jobmgr/job/state/running.go rename to manager/internal/jobmgr/job/state/multiInstance_running.go index fa829ea..5786cac 100644 --- a/manager/internal/jobmgr/job/state/running.go +++ b/manager/internal/jobmgr/job/state/multiInstance_running.go @@ -2,28 +2,39 @@ package state import ( "context" + "gitlink.org.cn/cloudream/common/pkgs/logger" "gitlink.org.cn/cloudream/common/pkgs/prescheduler" schsdk "gitlink.org.cn/cloudream/common/sdks/scheduler" + jobmod "gitlink.org.cn/cloudream/scheduler/common/models/job" "gitlink.org.cn/cloudream/scheduler/manager/internal/jobmgr" "gitlink.org.cn/cloudream/scheduler/manager/internal/jobmgr/event" "gitlink.org.cn/cloudream/scheduler/manager/internal/jobmgr/job" ) -type CreateInstance struct { +type MultiInstanceRunning struct { preScheduler prescheduler.PreScheduler } -func NewCreateInstance(preScheduler prescheduler.PreScheduler) *CreateInstance { - return &CreateInstance{ - preScheduler: preScheduler, - } +func (s *MultiInstanceRunning) Dump(ctx jobmgr.JobStateRunContext, job *jobmgr.Job) jobmod.JobStateDump { + //TODO implement me + panic("implement me") } -func (s *CreateInstance) Run(rtx jobmgr.JobStateRunContext, job *jobmgr.Job) { +//func NewMultiInstanceRunning(preScheduler prescheduler.PreScheduler) *MultiInstanceRunning { +// return &MultiInstanceRunning{ +// preScheduler: preScheduler, +// } +//} + +func NewMultiInstanceRunning() *MultiInstanceRunning { + return &MultiInstanceRunning{} +} + +func (s *MultiInstanceRunning) Run(rtx jobmgr.JobStateRunContext, job *jobmgr.Job) { s.do(rtx, job) } -func (s *CreateInstance) do(rtx jobmgr.JobStateRunContext, jo *jobmgr.Job) { +func (s *MultiInstanceRunning) do(rtx jobmgr.JobStateRunContext, jo *jobmgr.Job) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -42,7 +53,7 @@ func (s *CreateInstance) do(rtx jobmgr.JobStateRunContext, jo *jobmgr.Job) { } // 构建InstanceJobInfo - files := schsdk.JobFilesInfo{ + infoFiles := schsdk.JobFilesInfo{ Dataset: ic.LocalPath, Code: multInstJob.Info.Files.Code, Image: multInstJob.Info.Files.Image, @@ -50,11 +61,16 @@ func (s *CreateInstance) do(rtx jobmgr.JobStateRunContext, jo *jobmgr.Job) { instJobInfo := &schsdk.InstanceJobInfo{ LocalJobID: multInstJob.Info.LocalJobID, - Files: files, + Files: infoFiles, Runtime: multInstJob.Info.Runtime, Resources: multInstJob.Info.Resources, } + files := jobmod.JobFiles{ + Code: multInstJob.Files.Code, + Image: multInstJob.Files.Image, + } + // 生成预调度方案和文件上传方案 jobSchedule, filesUploadScheme, err := s.preScheduler.ScheduleJob(instJobInfo) if err != nil { @@ -63,8 +79,9 @@ func (s *CreateInstance) do(rtx jobmgr.JobStateRunContext, jo *jobmgr.Job) { } // 创建实例并运行 - instanceJob := job.NewInstanceJob(*instJobInfo) + instanceJob := job.NewInstanceJob(*instJobInfo, files) jobID := rtx.Mgr.AddJob(jo.JobSetID, instanceJob, NewPreSchuduling(*jobSchedule)) + logger.Info("Create instance success, jobID: " + jobID) // 在多实例任务中新增这个实例的任务ID multInstJob.SubJobs = append(multInstJob.SubJobs, jobID) diff --git a/manager/internal/jobmgr/job/state/prescheduling.go b/manager/internal/jobmgr/job/state/prescheduling.go index 3617e79..efb246c 100644 --- a/manager/internal/jobmgr/job/state/prescheduling.go +++ b/manager/internal/jobmgr/job/state/prescheduling.go @@ -30,7 +30,24 @@ func NewPreSchuduling(scheme jobmod.JobScheduleScheme) *PreScheduling { } func (s *PreScheduling) Run(rtx jobmgr.JobStateRunContext, jo *jobmgr.Job) { - norJob := jo.Body.(*job.NormalJob) + var jobFilesInfo schsdk.JobFilesInfo + var jobFiles *jobmod.JobFiles + var targetCCID schsdk.CCID + + switch runningJob := jo.Body.(type) { + case *job.NormalJob: + jobFilesInfo = runningJob.Info.Files + jobFiles = &runningJob.Files + targetCCID = runningJob.TargetCCID + case *job.MultiInstanceJob: + jobFilesInfo = runningJob.Info.Files + jobFiles = &runningJob.Files + targetCCID = runningJob.TargetCCID + case *job.InstanceJob: + jobFilesInfo = runningJob.Info.Files + jobFiles = &runningJob.Files + targetCCID = runningJob.TargetCCID + } ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -55,7 +72,7 @@ func (s *PreScheduling) Run(rtx jobmgr.JobStateRunContext, jo *jobmgr.Job) { go func() { defer wg.Done() - e1 = s.doPackageScheduling(ctx, rtx, norJob, norJob.Info.Files.Dataset, &norJob.Files.Dataset, &s.scheme.Dataset) + e1 = s.doPackageScheduling(ctx, rtx, jobFilesInfo.Dataset, &jobFiles.Dataset, &s.scheme.Dataset) if e1 != nil { cancel() } @@ -63,7 +80,7 @@ func (s *PreScheduling) Run(rtx jobmgr.JobStateRunContext, jo *jobmgr.Job) { go func() { defer wg.Done() - e2 = s.doPackageScheduling(ctx, rtx, norJob, norJob.Info.Files.Code, &norJob.Files.Code, &s.scheme.Code) + e2 = s.doPackageScheduling(ctx, rtx, jobFilesInfo.Code, &jobFiles.Code, &s.scheme.Code) if e2 != nil { cancel() } @@ -71,7 +88,7 @@ func (s *PreScheduling) Run(rtx jobmgr.JobStateRunContext, jo *jobmgr.Job) { go func() { defer wg.Done() - e3 = s.doImageScheduling(ctx, rtx, norJob, norJob.Info.Files.Image, &norJob.Files.Image, &s.scheme.Image) + e3 = s.doImageScheduling(ctx, rtx, targetCCID, jobFilesInfo.Image, &jobFiles.Image, &s.scheme.Image) if e3 != nil { cancel() } @@ -91,7 +108,7 @@ func (s *PreScheduling) Dump(rtx jobmgr.JobStateRunContext, jo *jobmgr.Job) jobm } } -func (s *PreScheduling) doPackageScheduling(ctx context.Context, rtx jobmgr.JobStateRunContext, norJob *job.NormalJob, fileInfo schsdk.JobFileInfo, file *jobmod.PackageJobFile, scheme *jobmod.FileScheduleScheme) error { +func (s *PreScheduling) doPackageScheduling(ctx context.Context, rtx jobmgr.JobStateRunContext, fileInfo schsdk.JobFileInfo, file *jobmod.PackageJobFile, scheme *jobmod.FileScheduleScheme) error { switch info := fileInfo.(type) { case *schsdk.LocalJobFileInfo: evt, ok := event.WaitTypeAnd[event.LocalFileUploaded](ctx, rtx.EventSet, func(e event.LocalFileUploaded) bool { @@ -153,7 +170,7 @@ func (s *PreScheduling) doPackageScheduling(ctx context.Context, rtx jobmgr.JobS return nil } -func (s *PreScheduling) doImageScheduling(ctx context.Context, rtx jobmgr.JobStateRunContext, norJob *job.NormalJob, fileInfo schsdk.JobFileInfo, file *jobmod.ImageJobFile, scheme *jobmod.FileScheduleScheme) error { +func (s *PreScheduling) doImageScheduling(ctx context.Context, rtx jobmgr.JobStateRunContext, targetCCID schsdk.CCID, fileInfo schsdk.JobFileInfo, file *jobmod.ImageJobFile, scheme *jobmod.FileScheduleScheme) error { switch info := fileInfo.(type) { case *schsdk.LocalJobFileInfo: evt, ok := event.WaitTypeAnd[event.LocalFileUploaded](ctx, rtx.EventSet, func(e event.LocalFileUploaded) bool { @@ -240,7 +257,7 @@ func (s *PreScheduling) doImageScheduling(ctx context.Context, rtx jobmgr.JobSta } // TODO 镜像名称 - err = rtx.Mgr.DB.PCMImage().Create(rtx.Mgr.DB.SQLCtx(), file.ImageID, norJob.TargetCCID, uploadStatus.PCMImageID, uploadStatus.Name, time.Now()) + err = rtx.Mgr.DB.PCMImage().Create(rtx.Mgr.DB.SQLCtx(), file.ImageID, targetCCID, uploadStatus.PCMImageID, uploadStatus.Name, time.Now()) if err != nil { return fmt.Errorf("creating image info: %w", err) } diff --git a/manager/internal/jobmgr/job/state/ready_to_adjust.go b/manager/internal/jobmgr/job/state/ready_to_adjust.go index 9522ab1..d2914a1 100644 --- a/manager/internal/jobmgr/job/state/ready_to_adjust.go +++ b/manager/internal/jobmgr/job/state/ready_to_adjust.go @@ -28,7 +28,17 @@ func (s *ReadyToAdjust) Run(rtx jobmgr.JobStateRunContext, jo *jobmgr.Job) { } func (s *ReadyToAdjust) do(rtx jobmgr.JobStateRunContext, jo *jobmgr.Job) error { - norJob := jo.Body.(*job.NormalJob) + var jobFilesInfo schsdk.JobFilesInfo + var jobFiles *jobmod.JobFiles + + switch runningJob := jo.Body.(type) { + case *job.NormalJob: + jobFilesInfo = runningJob.Info.Files + jobFiles = &runningJob.Files + case *job.InstanceJob: + jobFilesInfo = runningJob.Info.Files + jobFiles = &runningJob.Files + } ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -39,7 +49,7 @@ func (s *ReadyToAdjust) do(rtx jobmgr.JobStateRunContext, jo *jobmgr.Job) error cancel() }() - if rt, ok := norJob.Info.Files.Dataset.(*schsdk.DataReturnJobFileInfo); ok { + if rt, ok := jobFilesInfo.Dataset.(*schsdk.DataReturnJobFileInfo); ok { evt, ok := event.WaitTypeAnd[event.JobCompleted](ctx, rtx.EventSet, func(val event.JobCompleted) bool { return val.Job.GetInfo().GetLocalJobID() == rt.DataReturnLocalJobID }) @@ -54,7 +64,7 @@ func (s *ReadyToAdjust) do(rtx jobmgr.JobStateRunContext, jo *jobmgr.Job) error return fmt.Errorf("job %s is not a DataReturn job(which is %T)", evt.Job.JobID, evt.Job) } - norJob.Files.Dataset.PackageID = rtJob.DataReturnPackageID + jobFiles.Dataset.PackageID = rtJob.DataReturnPackageID } return nil diff --git a/manager/internal/mq/job.go b/manager/internal/mq/job.go index 766c8f5..bd45610 100644 --- a/manager/internal/mq/job.go +++ b/manager/internal/mq/job.go @@ -47,9 +47,11 @@ func (svc *Service) SubmitJobSet(msg *mgrmq.SubmitJobSet) (*mgrmq.SubmitJobSetRe }) case *schsdk.MultiInstanceJobInfo: - job := job.NewMultiInstanceJob(*info) - preSch, ok := msg.PreScheduleScheme.JobSchemes[info.LocalJobID] + logger.Info(">>>localJobID: " + info.LocalJobID) + + job := job.NewMultiInstanceJob(*info, preSch) + if !ok { return nil, mq.Failed(errorcode.OperationFailed, fmt.Sprintf("pre schedule scheme for job %s is not found", info.LocalJobID)) }