JCC-CSScheduler/manager/internal/jobmgr/prescheduling_handler.go

422 lines
11 KiB
Go

package jobmgr
import (
"fmt"
"reflect"
"gitlink.org.cn/cloudream/common/pkgs/actor"
schsdk "gitlink.org.cn/cloudream/common/sdks/scheduler"
stgsdk "gitlink.org.cn/cloudream/common/sdks/storage"
uopsdk "gitlink.org.cn/cloudream/common/sdks/unifyops"
schglb "gitlink.org.cn/cloudream/scheduler/common/globals"
jobmod "gitlink.org.cn/cloudream/scheduler/common/models/job"
colmq "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq/collector"
exectsk "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq/executor/task"
"gitlink.org.cn/cloudream/scheduler/manager/internal/jobmgr/event"
)
var ErrPreScheduleFailed = fmt.Errorf("pre schedule failed")
type preSchedulingJob struct {
job *jobmod.NormalJob
state *jobmod.StatePreScheduling
slwNodeInfo *uopsdk.SlwNode
}
type PreSchedulingHandler struct {
mgr *Manager
jobs map[schsdk.JobID]*preSchedulingJob
cmdChan actor.CommandChannel
}
func NewPreSchedulingHandler(mgr *Manager) *PreSchedulingHandler {
return &PreSchedulingHandler{
mgr: mgr,
jobs: make(map[schsdk.JobID]*preSchedulingJob),
cmdChan: *actor.NewCommandChannel(),
}
}
func (h *PreSchedulingHandler) Handle(job jobmod.Job) {
h.cmdChan.Send(func() {
norJob, ok := job.(*jobmod.NormalJob)
if !ok {
h.changeJobState(job, jobmod.NewStateFailed(fmt.Sprintf("unknow job: %v", reflect.TypeOf(job)), job.GetState()))
return
}
preSchState, ok := norJob.GetState().(*jobmod.StatePreScheduling)
if !ok {
h.changeJobState(job, jobmod.NewStateFailed(fmt.Sprintf("unknow state: %v", reflect.TypeOf(job.GetState())), job.GetState()))
return
}
colCli, err := schglb.CollectorMQPool.Acquire()
if err != nil {
h.changeJobState(job, jobmod.NewStateFailed(fmt.Sprintf("new collector client: %s", err), job.GetState()))
return
}
defer schglb.CollectorMQPool.Release(colCli)
getNodeResp, err := colCli.GetSlwNodeInfo(colmq.NewGetSlwNodeInfo(preSchState.Scheme.TargetSlwNodeID))
if err != nil {
h.changeJobState(job, jobmod.NewStateFailed(fmt.Sprintf("getting slw node info: %s", err.Error()), job.GetState()))
return
}
norJob.TargetSlwNodeID = preSchState.Scheme.TargetSlwNodeID
preJob := &preSchedulingJob{
job: norJob,
state: preSchState,
slwNodeInfo: &getNodeResp.SlwNode,
}
h.jobs[job.GetJobID()] = preJob
h.onJobEvent(nil, preJob)
})
}
func (h *PreSchedulingHandler) onJobEvent(evt event.Event, job *preSchedulingJob) {
if cloneEvt, ok := evt.(*event.CloneJob); ok {
cloneEvt.Callback.SetValue(job.job.Clone())
return
}
err := h.doPackageScheduling(evt, job,
job.job.Info.Files.Dataset, &job.job.Files.Dataset,
&job.state.Scheme.Dataset, &job.state.Dataset,
)
if err != nil {
job.state.Dataset.Error = err.Error()
h.changeJobState(job.job, jobmod.NewStateFailed(err.Error(), job.state))
return
}
err = h.doPackageScheduling(evt, job,
job.job.Info.Files.Code, &job.job.Files.Code,
&job.state.Scheme.Code, &job.state.Code,
)
if err != nil {
job.state.Code.Error = err.Error()
h.changeJobState(job.job, jobmod.NewStateFailed(err.Error(), job.state))
return
}
err = h.doImageScheduling(evt, job,
job.job.Info.Files.Image, &job.job.Files.Image,
&job.state.Scheme.Image, &job.state.Image,
)
if err != nil {
job.state.Image.Error = err.Error()
h.changeJobState(job.job, jobmod.NewStateFailed(err.Error(), job.state))
return
}
// 如果三种文件都调度完成,则可以进入下个阶段了
if job.state.Dataset.Step == jobmod.StepCompleted &&
job.state.Code.Step == jobmod.StepCompleted &&
job.state.Image.Step == jobmod.StepCompleted {
h.changeJobState(job.job, jobmod.NewStateReadyToAdjust())
}
}
func (h *PreSchedulingHandler) changeJobState(job jobmod.Job, state jobmod.JobState) {
job.SetState(state)
delete(h.jobs, job.GetJobID())
h.mgr.pubLock.Lock()
h.mgr.handleState(job)
h.mgr.pubLock.Unlock()
}
func (h *PreSchedulingHandler) doPackageScheduling(evt event.Event, job *preSchedulingJob, fileInfo schsdk.JobFileInfo, file *jobmod.PackageJobFile, scheme *jobmod.FileScheduleScheme, state *jobmod.FileSchedulingState) error {
// TODO 考虑拆分成多个函数
if state.Step == jobmod.StepBegin {
switch info := fileInfo.(type) {
case *schsdk.LocalJobFileInfo:
state.Step = jobmod.StepUploading
case *schsdk.PackageJobFileInfo:
file.PackageID = info.PackageID
state.Step = jobmod.StepUploaded
default:
return fmt.Errorf("invalid schedule action %s for file info type %v", scheme.Action, reflect.TypeOf(fileInfo))
}
}
if state.Step == jobmod.StepUploading {
if evt == nil {
return nil
}
localFileCmd, ok := evt.(*event.LocalFileUploaded)
if !ok {
return nil
}
if localFileCmd.LocalPath != fileInfo.(*schsdk.LocalJobFileInfo).LocalPath {
return nil
}
if localFileCmd.Error != "" {
return fmt.Errorf("local file uploading: %s", localFileCmd.Error)
}
file.PackageID = localFileCmd.PackageID
state.Step = jobmod.StepUploaded
}
if state.Step == jobmod.StepUploaded {
if scheme.Action == jobmod.ActionNo {
state.Step = jobmod.StepCompleted
return nil
}
if scheme.Action == jobmod.ActionMove {
fullTaskID, err := h.mgr.execMgr.StartTask(job.job.JobID, exectsk.NewCacheMovePackage(0, file.PackageID, job.slwNodeInfo.StgNodeID))
if err != nil {
return fmt.Errorf("starting cache move package: %w", err)
}
state.Step = jobmod.StepMoving
state.FullTaskID = fullTaskID
return nil
}
if scheme.Action == jobmod.ActionLoad {
fullTaskID, err := h.mgr.execMgr.StartTask(job.job.JobID, exectsk.NewStorageLoadPackage(0, file.PackageID, job.slwNodeInfo.StorageID))
if err != nil {
return fmt.Errorf("starting stroage load package: %w", err)
}
state.Step = jobmod.StepLoading
state.FullTaskID = fullTaskID
return nil
}
return fmt.Errorf("invalid schedule action %s for file info type %v", scheme.Action, reflect.TypeOf(fileInfo))
}
if state.Step == jobmod.StepMoving {
moveRet, err := event.AssertExecutorTaskStatus[*exectsk.CacheMovePackageStatus](evt, state.FullTaskID)
if err == event.ErrUnconcernedTask {
return nil
}
if err == event.ErrTaskTimeout {
return fmt.Errorf("cache move package timeout")
}
h.mgr.execMgr.ForgetTask(state.FullTaskID)
if moveRet.Error != "" {
return fmt.Errorf("cache move pacakge: %s", moveRet.Error)
}
state.Step = jobmod.StepCompleted
return nil
}
if state.Step == jobmod.StepLoading {
loadRet, err := event.AssertExecutorTaskStatus[*exectsk.StorageLoadPackageStatus](evt, state.FullTaskID)
if err == event.ErrUnconcernedTask {
return nil
}
if err == event.ErrTaskTimeout {
return fmt.Errorf("storage load package timeout")
}
h.mgr.execMgr.ForgetTask(state.FullTaskID)
if loadRet.Error != "" {
return fmt.Errorf("storage load package: %s", loadRet.Error)
}
state.Step = jobmod.StepCompleted
return nil
}
return nil
}
func (h *PreSchedulingHandler) doImageScheduling(evt event.Event, job *preSchedulingJob, fileInfo schsdk.JobFileInfo, file *jobmod.ImageJobFile, scheme *jobmod.FileScheduleScheme, state *jobmod.FileSchedulingState) error {
// TODO 考虑拆分成多个函数
if state.Step == jobmod.StepBegin {
switch info := fileInfo.(type) {
case *schsdk.LocalJobFileInfo:
state.Step = jobmod.StepUploading
case *schsdk.ImageJobFileInfo:
imageInfo, err := h.mgr.imageMgr.GetImageInfo(file.ImageID)
if err != nil {
return fmt.Errorf("getting image info: %w", err)
}
file.ImageID = imageInfo.ImageID
file.PackageID = imageInfo.PackageID
state.Step = jobmod.StepUploaded
default:
return fmt.Errorf("invalid schedule action %s for file info type %v", scheme.Action, reflect.TypeOf(info))
}
}
if state.Step == jobmod.StepUploading {
if evt == nil {
return nil
}
localFileCmd, ok := evt.(*event.LocalFileUploaded)
if !ok {
return nil
}
if localFileCmd.LocalPath != fileInfo.(*schsdk.LocalJobFileInfo).LocalPath {
return nil
}
if localFileCmd.Error != "" {
return fmt.Errorf("local file uploading: %s", localFileCmd.Error)
}
// 上传完毕,则可以新建一个空的镜像的记录
info, err := h.mgr.imageMgr.CreateImage(localFileCmd.PackageID)
if err != nil {
return fmt.Errorf("creating image info: %w", err)
}
// 填充ImageID和PackageID
file.ImageID = info.ImageID
file.PackageID = localFileCmd.PackageID
state.Step = jobmod.StepUploaded
}
if state.Step == jobmod.StepUploaded {
if scheme.Action == jobmod.ActionNo {
state.Step = jobmod.StepCompleted
return nil
}
// 要导入镜像,则需要先将镜像移动到指点节点的缓存中
if scheme.Action == jobmod.ActionImportImage {
fullTaskID, err := h.mgr.execMgr.StartTask(job.job.JobID, exectsk.NewCacheMovePackage(0, file.PackageID, job.slwNodeInfo.StgNodeID))
if err != nil {
return fmt.Errorf("starting cache move package: %w", err)
}
state.Step = jobmod.StepMoving
state.FullTaskID = fullTaskID
return nil
}
return fmt.Errorf("invalid schedule action %s for file info type %v", scheme.Action, reflect.TypeOf(fileInfo))
}
if state.Step == jobmod.StepMoving {
cacheMoveRet, err := event.AssertExecutorTaskStatus[*exectsk.CacheMovePackageStatus](evt, state.FullTaskID)
if err == event.ErrUnconcernedTask {
return nil
}
if err == event.ErrTaskTimeout {
return fmt.Errorf("cache move package timeout")
}
h.mgr.execMgr.ForgetTask(state.FullTaskID)
if cacheMoveRet.Error != "" {
return fmt.Errorf("cache move pacakge: %s", cacheMoveRet.Error)
}
if len(cacheMoveRet.CacheInfos) == 0 {
return fmt.Errorf("no object in the package which will be imported")
}
if len(cacheMoveRet.CacheInfos) > 1 {
return fmt.Errorf("there must be only 1 object in the package which will be imported")
}
fullTaskID, err := h.mgr.execMgr.StartTask(job.job.JobID, exectsk.NewUploadImage(job.slwNodeInfo.ID, stgsdk.MakeIPFSFilePath(cacheMoveRet.CacheInfos[0].FileHash)))
if err != nil {
return fmt.Errorf("starting import image: %w", err)
}
state.Step = jobmod.StepImageImporting
state.FullTaskID = fullTaskID
return nil
}
if state.Step == jobmod.StepImageImporting {
uploadImageRet, err := event.AssertExecutorTaskStatus[*exectsk.UploadImageStatus](evt, state.FullTaskID)
if err == event.ErrUnconcernedTask {
return nil
}
if err == event.ErrTaskTimeout {
return fmt.Errorf("import image timeout")
}
h.mgr.execMgr.ForgetTask(state.FullTaskID)
if uploadImageRet.Error != "" {
return fmt.Errorf("import image: %s", uploadImageRet.Error)
}
err = h.mgr.imageMgr.AddImageImportingInfo(file.ImageID, job.slwNodeInfo.ID, uploadImageRet.ImageID)
if err != nil {
return fmt.Errorf("adding image importing info: %w", err)
}
state.Step = jobmod.StepCompleted
return nil
}
return nil
}
func (h *PreSchedulingHandler) OnEvent(broadcast event.Broadcast, evt event.Event) {
h.cmdChan.Send(func() {
if broadcast.ToAll() {
for _, job := range h.jobs {
h.onJobEvent(evt, job)
}
} else if broadcast.ToJobSet() {
for _, job := range h.jobs {
if job.job.JobSetID != broadcast.JobSetID {
continue
}
h.onJobEvent(evt, job)
}
} else if broadcast.ToJob() {
if job, ok := h.jobs[broadcast.JobID]; ok {
h.onJobEvent(evt, job)
}
}
})
}
func (h *PreSchedulingHandler) Serve() {
cmdChan := h.cmdChan.BeginChanReceive()
defer h.cmdChan.CloseChanReceive()
for {
select {
case cmd := <-cmdChan:
cmd()
}
}
}
func (h *PreSchedulingHandler) Stop() {
// TODO 支持STOP
}