432 lines
12 KiB
Go
432 lines
12 KiB
Go
package jobmgr
|
|
|
|
import (
|
|
"fmt"
|
|
"reflect"
|
|
"time"
|
|
|
|
"gitlink.org.cn/cloudream/common/pkgs/actor"
|
|
schsdk "gitlink.org.cn/cloudream/common/sdks/scheduler"
|
|
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
|
|
|
|
schglb "gitlink.org.cn/cloudream/scheduler/common/globals"
|
|
schmod "gitlink.org.cn/cloudream/scheduler/common/models"
|
|
jobmod "gitlink.org.cn/cloudream/scheduler/common/models/job"
|
|
exectsk "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq/executor/task"
|
|
"gitlink.org.cn/cloudream/scheduler/manager/internal/jobmgr/event"
|
|
)
|
|
|
|
var ErrPreScheduleFailed = fmt.Errorf("pre schedule failed")
|
|
|
|
type preSchedulingJob struct {
|
|
job *jobmod.NormalJob
|
|
state *jobmod.StatePreScheduling
|
|
ccInfo schmod.ComputingCenter
|
|
}
|
|
|
|
type PreSchedulingHandler struct {
|
|
mgr *Manager
|
|
|
|
jobs map[schsdk.JobID]*preSchedulingJob
|
|
|
|
cmdChan actor.CommandChannel
|
|
}
|
|
|
|
func NewPreSchedulingHandler(mgr *Manager) *PreSchedulingHandler {
|
|
return &PreSchedulingHandler{
|
|
mgr: mgr,
|
|
jobs: make(map[schsdk.JobID]*preSchedulingJob),
|
|
cmdChan: *actor.NewCommandChannel(),
|
|
}
|
|
}
|
|
|
|
func (h *PreSchedulingHandler) Handle(job jobmod.Job) {
|
|
h.cmdChan.Send(func() {
|
|
norJob, ok := job.(*jobmod.NormalJob)
|
|
if !ok {
|
|
h.changeJobState(job, jobmod.NewStateFailed(fmt.Sprintf("unknow job: %v", reflect.TypeOf(job)), job.GetState()))
|
|
return
|
|
}
|
|
|
|
preSchState, ok := norJob.GetState().(*jobmod.StatePreScheduling)
|
|
if !ok {
|
|
h.changeJobState(job, jobmod.NewStateFailed(fmt.Sprintf("unknow state: %v", reflect.TypeOf(job.GetState())), job.GetState()))
|
|
return
|
|
}
|
|
|
|
colCli, err := schglb.CollectorMQPool.Acquire()
|
|
if err != nil {
|
|
h.changeJobState(job, jobmod.NewStateFailed(fmt.Sprintf("new collector client: %s", err), job.GetState()))
|
|
return
|
|
}
|
|
defer schglb.CollectorMQPool.Release(colCli)
|
|
|
|
ccInfo, err := h.mgr.db.ComputingCenter().GetByID(h.mgr.db.SQLCtx(), preSchState.Scheme.TargetCCID)
|
|
if err != nil {
|
|
h.changeJobState(job, jobmod.NewStateFailed(fmt.Sprintf("getting computing center info: %s", err.Error()), job.GetState()))
|
|
return
|
|
}
|
|
|
|
norJob.TargetCCID = preSchState.Scheme.TargetCCID
|
|
preJob := &preSchedulingJob{
|
|
job: norJob,
|
|
state: preSchState,
|
|
ccInfo: ccInfo,
|
|
}
|
|
h.jobs[job.GetJobID()] = preJob
|
|
|
|
h.onJobEvent(nil, preJob)
|
|
})
|
|
}
|
|
|
|
func (h *PreSchedulingHandler) onJobEvent(evt event.Event, job *preSchedulingJob) {
|
|
if cloneEvt, ok := evt.(*event.CloneJob); ok {
|
|
cloneEvt.Callback.SetValue(job.job.Clone())
|
|
return
|
|
}
|
|
|
|
err := h.doPackageScheduling(evt, job,
|
|
job.job.Info.Files.Dataset, &job.job.Files.Dataset,
|
|
&job.state.Scheme.Dataset, &job.state.Dataset,
|
|
)
|
|
if err != nil {
|
|
job.state.Dataset.Error = err.Error()
|
|
h.changeJobState(job.job, jobmod.NewStateFailed(err.Error(), job.state))
|
|
return
|
|
}
|
|
|
|
err = h.doPackageScheduling(evt, job,
|
|
job.job.Info.Files.Code, &job.job.Files.Code,
|
|
&job.state.Scheme.Code, &job.state.Code,
|
|
)
|
|
if err != nil {
|
|
job.state.Code.Error = err.Error()
|
|
h.changeJobState(job.job, jobmod.NewStateFailed(err.Error(), job.state))
|
|
return
|
|
}
|
|
|
|
err = h.doImageScheduling(evt, job,
|
|
job.job.Info.Files.Image, &job.job.Files.Image,
|
|
&job.state.Scheme.Image, &job.state.Image,
|
|
)
|
|
if err != nil {
|
|
job.state.Image.Error = err.Error()
|
|
h.changeJobState(job.job, jobmod.NewStateFailed(err.Error(), job.state))
|
|
return
|
|
}
|
|
|
|
// 如果三种文件都调度完成,则可以进入下个阶段了
|
|
if job.state.Dataset.Step == jobmod.StepCompleted &&
|
|
job.state.Code.Step == jobmod.StepCompleted &&
|
|
job.state.Image.Step == jobmod.StepCompleted {
|
|
|
|
h.changeJobState(job.job, jobmod.NewStateReadyToAdjust())
|
|
}
|
|
}
|
|
|
|
func (h *PreSchedulingHandler) changeJobState(job jobmod.Job, state jobmod.JobState) {
|
|
job.SetState(state)
|
|
|
|
delete(h.jobs, job.GetJobID())
|
|
|
|
h.mgr.pubLock.Lock()
|
|
h.mgr.handleState(job)
|
|
h.mgr.pubLock.Unlock()
|
|
}
|
|
|
|
func (h *PreSchedulingHandler) doPackageScheduling(evt event.Event, job *preSchedulingJob, fileInfo schsdk.JobFileInfo, file *jobmod.PackageJobFile, scheme *jobmod.FileScheduleScheme, state *jobmod.FileSchedulingState) error {
|
|
// TODO 考虑拆分成多个函数
|
|
if state.Step == jobmod.StepBegin {
|
|
switch info := fileInfo.(type) {
|
|
case *schsdk.LocalJobFileInfo:
|
|
state.Step = jobmod.StepUploading
|
|
|
|
case *schsdk.PackageJobFileInfo:
|
|
file.PackageID = info.PackageID
|
|
state.Step = jobmod.StepUploaded
|
|
|
|
case *schsdk.ResourceJobFileInfo:
|
|
state.Step = jobmod.StepCompleted
|
|
|
|
default:
|
|
return fmt.Errorf("invalid schedule action %s for file info type %v", scheme.Action, reflect.TypeOf(fileInfo))
|
|
}
|
|
}
|
|
|
|
if state.Step == jobmod.StepUploading {
|
|
if evt == nil {
|
|
return nil
|
|
}
|
|
|
|
localFileCmd, ok := evt.(*event.LocalFileUploaded)
|
|
if !ok {
|
|
return nil
|
|
}
|
|
|
|
if localFileCmd.LocalPath != fileInfo.(*schsdk.LocalJobFileInfo).LocalPath {
|
|
return nil
|
|
}
|
|
|
|
if localFileCmd.Error != "" {
|
|
return fmt.Errorf("local file uploading: %s", localFileCmd.Error)
|
|
}
|
|
|
|
file.PackageID = localFileCmd.PackageID
|
|
state.Step = jobmod.StepUploaded
|
|
}
|
|
|
|
if state.Step == jobmod.StepUploaded {
|
|
if scheme.Action == jobmod.ActionNo {
|
|
state.Step = jobmod.StepCompleted
|
|
return nil
|
|
}
|
|
|
|
if scheme.Action == jobmod.ActionMove {
|
|
fullTaskID, err := h.mgr.execMgr.StartTask(job.job.JobID, exectsk.NewCacheMovePackage(0, file.PackageID, job.ccInfo.CDSNodeID))
|
|
if err != nil {
|
|
return fmt.Errorf("starting cache move package: %w", err)
|
|
}
|
|
|
|
state.Step = jobmod.StepMoving
|
|
state.FullTaskID = fullTaskID
|
|
return nil
|
|
|
|
}
|
|
|
|
if scheme.Action == jobmod.ActionLoad {
|
|
fullTaskID, err := h.mgr.execMgr.StartTask(job.job.JobID, exectsk.NewStorageLoadPackage(0, file.PackageID, job.ccInfo.CDSStorageID))
|
|
if err != nil {
|
|
return fmt.Errorf("starting stroage load package: %w", err)
|
|
}
|
|
|
|
state.Step = jobmod.StepLoading
|
|
state.FullTaskID = fullTaskID
|
|
return nil
|
|
}
|
|
|
|
return fmt.Errorf("invalid schedule action %s for file info type %v", scheme.Action, reflect.TypeOf(fileInfo))
|
|
}
|
|
|
|
if state.Step == jobmod.StepMoving {
|
|
moveRet, err := event.AssertExecutorTaskStatus[*exectsk.CacheMovePackageStatus](evt, state.FullTaskID)
|
|
if err == event.ErrUnconcernedTask {
|
|
return nil
|
|
}
|
|
|
|
if err == event.ErrTaskTimeout {
|
|
return fmt.Errorf("cache move package timeout")
|
|
}
|
|
|
|
h.mgr.execMgr.ForgetTask(state.FullTaskID)
|
|
|
|
if moveRet.Error != "" {
|
|
return fmt.Errorf("cache move pacakge: %s", moveRet.Error)
|
|
}
|
|
|
|
state.Step = jobmod.StepCompleted
|
|
return nil
|
|
}
|
|
|
|
if state.Step == jobmod.StepLoading {
|
|
loadRet, err := event.AssertExecutorTaskStatus[*exectsk.StorageLoadPackageStatus](evt, state.FullTaskID)
|
|
if err == event.ErrUnconcernedTask {
|
|
return nil
|
|
}
|
|
|
|
if err == event.ErrTaskTimeout {
|
|
return fmt.Errorf("storage load package timeout")
|
|
}
|
|
|
|
h.mgr.execMgr.ForgetTask(state.FullTaskID)
|
|
|
|
if loadRet.Error != "" {
|
|
return fmt.Errorf("storage load package: %s", loadRet.Error)
|
|
}
|
|
|
|
file.FullPath = loadRet.FullPath
|
|
|
|
state.Step = jobmod.StepCompleted
|
|
return nil
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (h *PreSchedulingHandler) doImageScheduling(evt event.Event, job *preSchedulingJob, fileInfo schsdk.JobFileInfo, file *jobmod.ImageJobFile, scheme *jobmod.FileScheduleScheme, state *jobmod.FileSchedulingState) error {
|
|
// TODO 考虑拆分成多个函数
|
|
if state.Step == jobmod.StepBegin {
|
|
switch info := fileInfo.(type) {
|
|
case *schsdk.LocalJobFileInfo:
|
|
state.Step = jobmod.StepUploading
|
|
|
|
case *schsdk.ImageJobFileInfo:
|
|
imageInfo, err := h.mgr.db.Image().GetByID(h.mgr.db.SQLCtx(), info.ImageID)
|
|
if err != nil {
|
|
return fmt.Errorf("getting image info: %w", err)
|
|
}
|
|
|
|
file.ImageID = imageInfo.ImageID
|
|
file.PackageID = imageInfo.CDSPackageID
|
|
state.Step = jobmod.StepUploaded
|
|
|
|
default:
|
|
return fmt.Errorf("invalid schedule action %s for file info type %v", scheme.Action, reflect.TypeOf(info))
|
|
}
|
|
}
|
|
|
|
if state.Step == jobmod.StepUploading {
|
|
if evt == nil {
|
|
return nil
|
|
}
|
|
|
|
localFileCmd, ok := evt.(*event.LocalFileUploaded)
|
|
if !ok {
|
|
return nil
|
|
}
|
|
|
|
if localFileCmd.LocalPath != fileInfo.(*schsdk.LocalJobFileInfo).LocalPath {
|
|
return nil
|
|
}
|
|
|
|
if localFileCmd.Error != "" {
|
|
return fmt.Errorf("local file uploading: %s", localFileCmd.Error)
|
|
}
|
|
|
|
// 上传完毕,则可以新建一个空的镜像的记录
|
|
// TODO 镜像名称
|
|
imgID, err := h.mgr.db.Image().Create(h.mgr.db.SQLCtx(), &localFileCmd.PackageID, fmt.Sprintf("UPLOAD@%s", time.Now().Unix()), time.Now())
|
|
if err != nil {
|
|
return fmt.Errorf("creating image info: %w", err)
|
|
}
|
|
|
|
// 填充ImageID和PackageID
|
|
file.ImageID = imgID
|
|
file.PackageID = &localFileCmd.PackageID
|
|
state.Step = jobmod.StepUploaded
|
|
}
|
|
|
|
if state.Step == jobmod.StepUploaded {
|
|
if scheme.Action == jobmod.ActionNo {
|
|
state.Step = jobmod.StepCompleted
|
|
return nil
|
|
}
|
|
|
|
// 要导入镜像,则需要先将镜像移动到指点节点的缓存中
|
|
if scheme.Action == jobmod.ActionImportImage {
|
|
if file.PackageID == nil {
|
|
return fmt.Errorf("image %v has no associated package, which cannot be uploaded to %v", file.ImageID, job.ccInfo.CCID)
|
|
}
|
|
|
|
fullTaskID, err := h.mgr.execMgr.StartTask(job.job.JobID, exectsk.NewCacheMovePackage(0, *file.PackageID, job.ccInfo.CDSNodeID))
|
|
if err != nil {
|
|
return fmt.Errorf("starting cache move package: %w", err)
|
|
}
|
|
|
|
state.Step = jobmod.StepMoving
|
|
state.FullTaskID = fullTaskID
|
|
return nil
|
|
}
|
|
return fmt.Errorf("invalid schedule action %s for file info type %v", scheme.Action, reflect.TypeOf(fileInfo))
|
|
}
|
|
|
|
if state.Step == jobmod.StepMoving {
|
|
cacheMoveRet, err := event.AssertExecutorTaskStatus[*exectsk.CacheMovePackageStatus](evt, state.FullTaskID)
|
|
if err == event.ErrUnconcernedTask {
|
|
return nil
|
|
}
|
|
|
|
if err == event.ErrTaskTimeout {
|
|
return fmt.Errorf("cache move package timeout")
|
|
}
|
|
|
|
h.mgr.execMgr.ForgetTask(state.FullTaskID)
|
|
|
|
if cacheMoveRet.Error != "" {
|
|
return fmt.Errorf("cache move pacakge: %s", cacheMoveRet.Error)
|
|
}
|
|
|
|
if len(cacheMoveRet.CacheInfos) == 0 {
|
|
return fmt.Errorf("no object in the package which will be imported")
|
|
}
|
|
|
|
if len(cacheMoveRet.CacheInfos) > 1 {
|
|
return fmt.Errorf("there must be only 1 object in the package which will be imported")
|
|
}
|
|
|
|
fullTaskID, err := h.mgr.execMgr.StartTask(job.job.JobID, exectsk.NewUploadImage(job.ccInfo.PCMParticipantID, cdssdk.MakeIPFSFilePath(cacheMoveRet.CacheInfos[0].FileHash)))
|
|
if err != nil {
|
|
return fmt.Errorf("starting import image: %w", err)
|
|
}
|
|
|
|
state.Step = jobmod.StepImageImporting
|
|
state.FullTaskID = fullTaskID
|
|
return nil
|
|
}
|
|
|
|
if state.Step == jobmod.StepImageImporting {
|
|
uploadImageRet, err := event.AssertExecutorTaskStatus[*exectsk.UploadImageStatus](evt, state.FullTaskID)
|
|
if err == event.ErrUnconcernedTask {
|
|
return nil
|
|
}
|
|
|
|
if err == event.ErrTaskTimeout {
|
|
return fmt.Errorf("import image timeout")
|
|
}
|
|
|
|
h.mgr.execMgr.ForgetTask(state.FullTaskID)
|
|
|
|
if uploadImageRet.Error != "" {
|
|
return fmt.Errorf("import image: %s", uploadImageRet.Error)
|
|
}
|
|
|
|
err = h.mgr.db.PCMImage().Create(h.mgr.db.SQLCtx(), file.ImageID, job.ccInfo.CCID, uploadImageRet.PCMImageID, uploadImageRet.Name, time.Now())
|
|
if err != nil {
|
|
return fmt.Errorf("adding image importing info: %w", err)
|
|
}
|
|
|
|
state.Step = jobmod.StepCompleted
|
|
return nil
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (h *PreSchedulingHandler) OnEvent(broadcast event.Broadcast, evt event.Event) {
|
|
h.cmdChan.Send(func() {
|
|
if broadcast.ToAll() {
|
|
for _, job := range h.jobs {
|
|
h.onJobEvent(evt, job)
|
|
}
|
|
|
|
} else if broadcast.ToJobSet() {
|
|
for _, job := range h.jobs {
|
|
if job.job.JobSetID != broadcast.JobSetID {
|
|
continue
|
|
}
|
|
|
|
h.onJobEvent(evt, job)
|
|
}
|
|
} else if broadcast.ToJob() {
|
|
if job, ok := h.jobs[broadcast.JobID]; ok {
|
|
h.onJobEvent(evt, job)
|
|
}
|
|
}
|
|
})
|
|
}
|
|
|
|
func (h *PreSchedulingHandler) Serve() {
|
|
cmdChan := h.cmdChan.BeginChanReceive()
|
|
defer h.cmdChan.CloseChanReceive()
|
|
|
|
for {
|
|
select {
|
|
case cmd := <-cmdChan:
|
|
cmd()
|
|
}
|
|
}
|
|
}
|
|
|
|
func (h *PreSchedulingHandler) Stop() {
|
|
// TODO 支持STOP
|
|
}
|