任务集中所有任务结束后,删除任务集

This commit is contained in:
Sydonian 2024-04-26 16:07:40 +08:00
parent facf409ab8
commit a45403b6c6
2 changed files with 40 additions and 3 deletions

View File

@ -42,6 +42,7 @@ func (s *Completed) Dump(rtx jobmgr.JobStateRunContext, jo *jobmgr.Job) jobmod.J
func (c *Completed) handleSuccess(rtx jobmgr.JobStateRunContext, job *jobmgr.Job) {
logger.WithField("JobID", job.JobID).Infof("job completed successfuly")
rtx.Mgr.BroadcastEvent(job.JobSetID, event.NewJobCompleted(job, c.err))
rtx.Mgr.JobCompleted(job)
}
func (c *Completed) handleFailed(rtx jobmgr.JobStateRunContext, job *jobmgr.Job) {
@ -50,4 +51,5 @@ func (c *Completed) handleFailed(rtx jobmgr.JobStateRunContext, job *jobmgr.Job)
WithField("LastState", reflect.TypeOf(rtx.LastState).String()).
Infof("job failed with: %v", c.err)
rtx.Mgr.BroadcastEvent(job.JobSetID, event.NewJobCompleted(job, c.err))
rtx.Mgr.JobCompleted(job)
}

View File

@ -5,6 +5,7 @@ import (
"sync"
"time"
"gitlink.org.cn/cloudream/common/pkgs/logger"
schsdk "gitlink.org.cn/cloudream/common/sdks/scheduler"
jobmod "gitlink.org.cn/cloudream/scheduler/common/models/job"
"gitlink.org.cn/cloudream/scheduler/common/pkgs/db"
@ -13,9 +14,10 @@ import (
)
type mgrJob struct {
job Job
eventSet EventSet
state JobState
job Job
eventSet EventSet
state JobState
isCompleted bool // 任务是否结束。注任务状态为Completed时此字段不一定为true因为在Completed状态下也有工作要做。
}
type mgrJobSet struct {
@ -60,6 +62,7 @@ func (m *Manager) Stop() {
}
// 改变任务状态。注将任务改变为Completed状态不会设置mgrJob.isCompleted为true
func (m *Manager) ChangeState(job *Job, state JobState) {
m.pubLock.Lock()
defer m.pubLock.Unlock()
@ -81,6 +84,35 @@ func (m *Manager) ChangeState(job *Job, state JobState) {
}()
}
// 将任务标记为结束
func (m *Manager) JobCompleted(job *Job) {
m.pubLock.Lock()
defer m.pubLock.Unlock()
mgrJob, ok := m.jobs[job.JobID]
if !ok {
return
}
mgrJob.isCompleted = true
// 如果任务集中的所有任务都完成了,则删除任务集
jobSet := m.jobSets[job.JobSetID]
for _, mjob := range jobSet.jobs {
if !mjob.isCompleted {
return
}
}
// TODO 可以考虑加个回调
delete(m.jobSets, job.JobSetID)
go func() {
logger.Infof("job set %s completed", job.JobSetID)
}()
}
// 向某个任务投递事件
func (m *Manager) PostEvent(jobID schsdk.JobID, evt Event) {
m.pubLock.Lock()
defer m.pubLock.Unlock()
@ -95,6 +127,7 @@ func (m *Manager) PostEvent(jobID schsdk.JobID, evt Event) {
}()
}
// 向某个任务集中的所有任务投递事件
func (m *Manager) BroadcastEvent(jobSetID schsdk.JobSetID, evt Event) {
m.pubLock.Lock()
defer m.pubLock.Unlock()
@ -116,6 +149,7 @@ type SubmittingJob struct {
InitState JobState
}
// 提交一个任务集
func (m *Manager) SubmitJobSet(jobs []SubmittingJob) schsdk.JobSetID {
m.pubLock.Lock()
defer m.pubLock.Unlock()
@ -147,6 +181,7 @@ func (m *Manager) SubmitJobSet(jobs []SubmittingJob) schsdk.JobSetID {
return jobSetID
}
// 导出任务集中所有任务的状态
func (m *Manager) DumpJobSet(jobSetID schsdk.JobSetID) []jobmod.JobDump {
m.pubLock.Lock()
defer m.pubLock.Unlock()