239 lines
6.3 KiB
Go
239 lines
6.3 KiB
Go
package jobmgr
|
|
|
|
import (
|
|
"fmt"
|
|
"reflect"
|
|
|
|
"gitlink.org.cn/cloudream/common/pkgs/actor"
|
|
schsdk "gitlink.org.cn/cloudream/common/sdks/scheduler"
|
|
|
|
schglb "gitlink.org.cn/cloudream/scheduler/common/globals"
|
|
jobmod "gitlink.org.cn/cloudream/scheduler/common/models/job"
|
|
colmq "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq/collector"
|
|
exetsk "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq/executor/task"
|
|
"gitlink.org.cn/cloudream/scheduler/manager/internal/jobmgr/event"
|
|
)
|
|
|
|
type executingJob struct {
|
|
job jobmod.Job
|
|
state *jobmod.StateExecuting
|
|
}
|
|
|
|
type ExecutingHandler struct {
|
|
mgr *Manager
|
|
|
|
jobs map[schsdk.JobID]*executingJob
|
|
|
|
cmdChan actor.CommandChannel
|
|
}
|
|
|
|
func NewExecutingHandler(mgr *Manager) *ExecutingHandler {
|
|
return &ExecutingHandler{
|
|
mgr: mgr,
|
|
jobs: make(map[schsdk.JobID]*executingJob),
|
|
cmdChan: *actor.NewCommandChannel(),
|
|
}
|
|
}
|
|
|
|
func (h *ExecutingHandler) Handle(job jobmod.Job) {
|
|
h.cmdChan.Send(func() {
|
|
state, ok := job.GetState().(*jobmod.StateExecuting)
|
|
if !ok {
|
|
h.changeJobState(job, jobmod.NewStateFailed(fmt.Sprintf("unknow state: %v", reflect.TypeOf(job.GetState())), job.GetState()))
|
|
return
|
|
}
|
|
|
|
rjob := &executingJob{
|
|
job: job,
|
|
state: state,
|
|
}
|
|
h.jobs[job.GetJobID()] = rjob
|
|
|
|
h.onJobEvent(nil, rjob)
|
|
})
|
|
}
|
|
|
|
func (h *ExecutingHandler) onJobEvent(evt event.Event, job *executingJob) {
|
|
if cloneEvt, ok := evt.(*event.CloneJob); ok {
|
|
cloneEvt.Callback.SetValue(job.job.Clone())
|
|
return
|
|
}
|
|
|
|
if norJob, ok := job.job.(*jobmod.NormalJob); ok {
|
|
h.onNormalJobEvent(evt, job, norJob)
|
|
} else if resJob, ok := job.job.(*jobmod.ResourceJob); ok {
|
|
h.onResourceJobEvent(evt, job, resJob)
|
|
}
|
|
}
|
|
|
|
func (h *ExecutingHandler) onNormalJobEvent(evt event.Event, job *executingJob, norJob *jobmod.NormalJob) {
|
|
if job.state.FullTaskID == "" {
|
|
info, err := h.mgr.imageMgr.GetImageImportingInfo(norJob.Files.Image.ImageID, norJob.TargetSlwNodeID)
|
|
if err != nil {
|
|
h.changeJobState(job.job, jobmod.NewStateFailed("getting image importing info: "+err.Error(), job.state))
|
|
return
|
|
}
|
|
|
|
fullTaskID, err := h.mgr.advMgr.StartTask(job.job.GetJobID(),
|
|
exetsk.NewScheduleTask(
|
|
norJob.TargetSlwNodeID,
|
|
norJob.Info.Runtime.Envs,
|
|
info.SlwNodeImageID,
|
|
norJob.Info.Runtime.Command,
|
|
))
|
|
if err != nil {
|
|
h.changeJobState(job.job, jobmod.NewStateFailed(err.Error(), job.state))
|
|
return
|
|
}
|
|
|
|
job.state.FullTaskID = fullTaskID
|
|
}
|
|
|
|
if execRet, err := event.AssertExecutorTaskStatus[*exetsk.ScheduleTaskStatus](evt, job.state.FullTaskID); err != event.ErrUnconcernedTask {
|
|
if err == event.ErrTaskTimeout {
|
|
h.changeJobState(job.job, jobmod.NewStateFailed("schedule task timeout", job.state))
|
|
return
|
|
}
|
|
|
|
h.mgr.execMgr.ForgetTask(job.state.FullTaskID)
|
|
|
|
if execRet.Error != "" {
|
|
h.changeJobState(job.job, jobmod.NewStateFailed(execRet.Error, job.state))
|
|
return
|
|
}
|
|
|
|
h.changeJobState(job.job, jobmod.NewStateSuccess())
|
|
}
|
|
}
|
|
|
|
func (h *ExecutingHandler) onResourceJobEvent(evt event.Event, job *executingJob, resJob *jobmod.ResourceJob) {
|
|
if job.state.FullTaskID == "" {
|
|
h.mgr.pubLock.Lock()
|
|
jobSet, ok := h.mgr.jobSets[resJob.GetJobSetID()]
|
|
if !ok {
|
|
h.mgr.pubLock.Unlock()
|
|
h.changeJobState(job.job, jobmod.NewStateFailed(fmt.Sprintf("job set %s not found", resJob.GetJobSetID()), job.state))
|
|
return
|
|
}
|
|
|
|
ref := jobSet.FindRefByLocalJobID(resJob.Info.TargetLocalJobID)
|
|
if ref == nil {
|
|
h.mgr.pubLock.Unlock()
|
|
h.changeJobState(job.job, jobmod.NewStateFailed(
|
|
fmt.Sprintf("job %s not found in job set %s",
|
|
resJob.Info.TargetLocalJobID,
|
|
resJob.GetJobSetID()),
|
|
job.state,
|
|
))
|
|
return
|
|
}
|
|
|
|
targetJob, ok := h.mgr.jobs[ref.JobID]
|
|
h.mgr.pubLock.Unlock()
|
|
|
|
if !ok {
|
|
h.changeJobState(job.job, jobmod.NewStateFailed(fmt.Sprintf("job %s not found", ref.JobID), job.state))
|
|
return
|
|
}
|
|
|
|
tarNorJob, ok := targetJob.Job.(*jobmod.NormalJob)
|
|
if !ok {
|
|
h.changeJobState(job.job, jobmod.NewStateFailed(fmt.Sprintf("job(%v) %s is not a Normal job", reflect.TypeOf(targetJob), ref.JobID), job.state))
|
|
return
|
|
}
|
|
|
|
colCli, err := schglb.CollectorMQPool.Acquire()
|
|
if err != nil {
|
|
h.changeJobState(job.job, jobmod.NewStateFailed(fmt.Sprintf("new collector client: %s", err.Error()), job.state))
|
|
return
|
|
}
|
|
defer schglb.CollectorMQPool.Release(colCli)
|
|
|
|
getNodeResp, err := colCli.GetSlwNodeInfo(colmq.NewGetSlwNodeInfo(tarNorJob.TargetSlwNodeID))
|
|
if err != nil {
|
|
h.changeJobState(job.job, jobmod.NewStateFailed(fmt.Sprintf("getting slw node info: %s", err.Error()), job.state))
|
|
return
|
|
}
|
|
|
|
fullTaskID, err := h.mgr.execMgr.StartTask(job.job.GetJobID(), exetsk.NewStorageCreatePackage(
|
|
0, // TOOD 用户ID
|
|
getNodeResp.StorageID,
|
|
"", // TODO
|
|
resJob.Info.BucketID,
|
|
"", // TODO
|
|
resJob.Info.Redundancy,
|
|
))
|
|
if err != nil {
|
|
h.changeJobState(job.job, jobmod.NewStateFailed(err.Error(), job.state))
|
|
return
|
|
}
|
|
|
|
job.state.FullTaskID = fullTaskID
|
|
}
|
|
|
|
if createRet, err := event.AssertExecutorTaskStatus[*exetsk.StorageCreatePackageStatus](evt, job.state.FullTaskID); err != event.ErrUnconcernedTask {
|
|
if err == event.ErrTaskTimeout {
|
|
h.changeJobState(job.job, jobmod.NewStateFailed("storage create package timeout", job.state))
|
|
return
|
|
}
|
|
|
|
h.mgr.execMgr.ForgetTask(job.state.FullTaskID)
|
|
|
|
if createRet.Error != "" {
|
|
h.changeJobState(job.job, jobmod.NewStateFailed(createRet.Error, job.state))
|
|
return
|
|
}
|
|
|
|
h.changeJobState(job.job, jobmod.NewStateSuccess())
|
|
}
|
|
}
|
|
|
|
func (h *ExecutingHandler) changeJobState(job jobmod.Job, state jobmod.JobState) {
|
|
job.SetState(state)
|
|
|
|
delete(h.jobs, job.GetJobID())
|
|
|
|
h.mgr.pubLock.Lock()
|
|
h.mgr.handleState(job)
|
|
h.mgr.pubLock.Unlock()
|
|
}
|
|
|
|
func (h *ExecutingHandler) OnEvent(broadcast event.Broadcast, evt event.Event) {
|
|
h.cmdChan.Send(func() {
|
|
if broadcast.ToAll() {
|
|
for _, job := range h.jobs {
|
|
h.onJobEvent(evt, job)
|
|
}
|
|
|
|
} else if broadcast.ToJobSet() {
|
|
for _, job := range h.jobs {
|
|
if job.job.GetJobSetID() != broadcast.JobSetID {
|
|
continue
|
|
}
|
|
|
|
h.onJobEvent(evt, job)
|
|
}
|
|
} else if broadcast.ToJob() {
|
|
if job, ok := h.jobs[broadcast.JobID]; ok {
|
|
h.onJobEvent(evt, job)
|
|
}
|
|
}
|
|
})
|
|
}
|
|
|
|
func (h *ExecutingHandler) Serve() {
|
|
cmdChan := h.cmdChan.BeginChanReceive()
|
|
defer h.cmdChan.CloseChanReceive()
|
|
|
|
for {
|
|
select {
|
|
case cmd := <-cmdChan:
|
|
cmd()
|
|
}
|
|
}
|
|
}
|
|
|
|
func (h *ExecutingHandler) Stop() {
|
|
// TODO 支持STOP
|
|
}
|