JCC-CSScheduler/manager/internal/jobmgr/ready_to_adjust_handler.go

215 lines
5.5 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package jobmgr
import (
"fmt"
"reflect"
"gitlink.org.cn/cloudream/common/pkgs/actor"
schsdk "gitlink.org.cn/cloudream/common/sdks/scheduler"
jobmod "gitlink.org.cn/cloudream/scheduler/common/models/job"
"gitlink.org.cn/cloudream/scheduler/manager/internal/jobmgr/event"
)
type readyToAdjustJob struct {
job jobmod.Job
state *jobmod.StateReadyToAdjust
}
type ReadyToAdjustHandler struct {
mgr *Manager
jobs map[schsdk.JobID]*readyToAdjustJob
cmdChan actor.CommandChannel
}
func NewReadyToAdjustHandler(mgr *Manager) *ReadyToAdjustHandler {
return &ReadyToAdjustHandler{
mgr: mgr,
jobs: make(map[schsdk.JobID]*readyToAdjustJob),
cmdChan: *actor.NewCommandChannel(),
}
}
func (h *ReadyToAdjustHandler) Handle(job jobmod.Job) {
h.cmdChan.Send(func() {
state, ok := job.GetState().(*jobmod.StateReadyToAdjust)
if !ok {
h.changeJobState(job, jobmod.NewStateFailed(fmt.Sprintf("unknow state: %v", reflect.TypeOf(job.GetState())), job.GetState()))
return
}
rjob := &readyToAdjustJob{
job: job,
state: state,
}
h.jobs[job.GetJobID()] = rjob
h.onJobEvent(nil, rjob)
})
}
func (h *ReadyToAdjustHandler) onJobEvent(evt event.Event, job *readyToAdjustJob) {
if cloneEvt, ok := evt.(*event.CloneJob); ok {
cloneEvt.Callback.SetValue(job.job.Clone())
return
}
if norJob, ok := job.job.(*jobmod.NormalJob); ok {
h.onNormalJobEvent(evt, job, norJob)
} else if resJob, ok := job.job.(*jobmod.ResourceJob); ok {
h.onResourceJobEvent(evt, job, resJob)
}
}
func (h *ReadyToAdjustHandler) onNormalJobEvent(evt event.Event, job *readyToAdjustJob, norJob *jobmod.NormalJob) {
h.mgr.pubLock.Lock()
jobSet, ok := h.mgr.jobSets[job.job.GetJobSetID()]
h.mgr.pubLock.Unlock()
if !ok {
h.changeJobState(job.job, jobmod.NewStateFailed(fmt.Sprintf("job set %s not found", job.job.GetJobSetID()), job.state))
return
}
needWait := false
// 无论发生什么事件,都检查一下前置任务的状态
if resFile, ok := norJob.Info.Files.Dataset.(*schsdk.ResourceJobFileInfo); ok {
ref := jobSet.FindRefByLocalJobID(resFile.ResourceLocalJobID)
if ref == nil {
h.changeJobState(job.job, jobmod.NewStateFailed(
fmt.Sprintf("job %s not found in job set %s", resFile.ResourceLocalJobID, jobSet.JobSetID),
job.state,
))
return
}
h.mgr.pubLock.Lock()
waitJob := h.mgr.jobs[ref.JobID]
h.mgr.pubLock.Unlock()
if waitJob == nil {
h.changeJobState(job.job, jobmod.NewStateFailed(fmt.Sprintf("job %s not found", ref.JobID), job.state))
return
}
if _, ok = waitJob.Job.GetState().(*jobmod.StateSuccess); ok {
waitResJob, ok := waitJob.Job.(*jobmod.ResourceJob)
if !ok {
h.changeJobState(job.job, jobmod.NewStateFailed(
fmt.Sprintf("job(%v) %s is not a resource job", reflect.TypeOf(waitJob), waitResJob.JobID),
job.state,
))
return
}
norJob.Files.Dataset.PackageID = waitResJob.ResourcePackageID
} else if _, ok = waitJob.Job.GetState().(*jobmod.StateFailed); ok {
h.changeJobState(job.job, jobmod.NewStateFailed(
fmt.Sprintf("job %s is failed", waitJob.Job.GetJobID()),
job.state,
))
return
} else {
// 等待的Job不是失败或者成功状态则需要继续等待
needWait = true
}
}
if !needWait {
h.changeJobState(job.job, jobmod.NewStateMakingAdjustScheme())
}
}
func (h *ReadyToAdjustHandler) onResourceJobEvent(evt event.Event, job *readyToAdjustJob, resJob *jobmod.ResourceJob) {
h.mgr.pubLock.Lock()
jobSet, ok := h.mgr.jobSets[job.job.GetJobSetID()]
h.mgr.pubLock.Unlock()
if !ok {
h.changeJobState(job.job, jobmod.NewStateFailed(fmt.Sprintf("job set %s not found", job.job.GetJobSetID()), job.state))
return
}
needWait := false
ref := jobSet.FindRefByLocalJobID(resJob.Info.TargetLocalJobID)
if ref == nil {
h.changeJobState(job.job, jobmod.NewStateFailed(
fmt.Sprintf("job %s not found in job set %s", resJob.Info.TargetLocalJobID, jobSet.JobSetID),
job.state,
))
return
}
h.mgr.pubLock.Lock()
waitJob := h.mgr.jobs[ref.JobID]
h.mgr.pubLock.Unlock()
if waitJob == nil {
h.changeJobState(job.job, jobmod.NewStateFailed(fmt.Sprintf("job %s not found", ref.JobID), job.state))
return
}
// 无论发生什么事件,都检查一下前置任务的状态
if _, ok = waitJob.Job.GetState().(*jobmod.StateFailed); ok {
h.changeJobState(job.job, jobmod.NewStateFailed(
fmt.Sprintf("job %s is failed", waitJob.Job.GetJobID()),
job.state,
))
return
} else if _, ok = waitJob.Job.GetState().(*jobmod.StateSuccess); !ok {
needWait = true
}
if !needWait {
h.changeJobState(job.job, jobmod.NewStateReadyToExecute())
}
}
func (h *ReadyToAdjustHandler) changeJobState(job jobmod.Job, state jobmod.JobState) {
job.SetState(state)
delete(h.jobs, job.GetJobID())
h.mgr.pubLock.Lock()
h.mgr.handleState(job)
h.mgr.pubLock.Unlock()
}
func (h *ReadyToAdjustHandler) OnEvent(broadcast event.Broadcast, evt event.Event) {
h.cmdChan.Send(func() {
if broadcast.ToAll() {
for _, job := range h.jobs {
h.onJobEvent(evt, job)
}
} else if broadcast.ToJobSet() {
for _, job := range h.jobs {
if job.job.GetJobSetID() != broadcast.JobSetID {
continue
}
h.onJobEvent(evt, job)
}
} else if broadcast.ToJob() {
if job, ok := h.jobs[broadcast.JobID]; ok {
h.onJobEvent(evt, job)
}
}
})
}
func (h *ReadyToAdjustHandler) Serve() {
cmdChan := h.cmdChan.BeginChanReceive()
defer h.cmdChan.CloseChanReceive()
for {
select {
case cmd := <-cmdChan:
cmd()
}
}
}
func (h *ReadyToAdjustHandler) Stop() {
// TODO 支持STOP
}