完善dump机制
This commit is contained in:
parent
02a03ebb73
commit
facf409ab8
|
@ -38,7 +38,7 @@ const (
|
|||
var ErrNoAvailableScheme = fmt.Errorf("no appropriate scheduling node found, please wait")
|
||||
|
||||
type Scheduler interface {
|
||||
Schedule(status *jobmod.JobStatus) (*jobmod.JobScheduleScheme, error)
|
||||
Schedule(dump *jobmod.JobDump) (*jobmod.JobScheduleScheme, error)
|
||||
}
|
||||
|
||||
type candidate struct {
|
||||
|
@ -129,10 +129,10 @@ func NewDefaultSchedule() *DefaultScheduler {
|
|||
return &DefaultScheduler{}
|
||||
}
|
||||
|
||||
func (s *DefaultScheduler) Schedule(status *jobmod.JobStatus) (*jobmod.JobScheduleScheme, error) {
|
||||
norJob, ok := status.Body.(*jobmod.NormalJobStatus)
|
||||
func (s *DefaultScheduler) Schedule(dump *jobmod.JobDump) (*jobmod.JobScheduleScheme, error) {
|
||||
norJob, ok := dump.Body.(*jobmod.NormalJobDump)
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("only normal job can be scheduled, but got %T", status.Body)
|
||||
return nil, fmt.Errorf("only normal job can be scheduled, but got %T", dump.Body)
|
||||
}
|
||||
|
||||
mgrCli, err := schglb.ManagerMQPool.Acquire()
|
||||
|
@ -166,7 +166,7 @@ func (s *DefaultScheduler) Schedule(status *jobmod.JobStatus) (*jobmod.JobSchedu
|
|||
return nil, err
|
||||
}
|
||||
|
||||
err = s.calcResourceScore(status.Info.(*schsdk.NormalJobInfo), allCCs)
|
||||
err = s.calcResourceScore(dump.Info.(*schsdk.NormalJobInfo), allCCs)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
|
@ -12,7 +12,7 @@ import (
|
|||
)
|
||||
|
||||
type schedulingJob struct {
|
||||
Status jobmod.JobStatus
|
||||
Dump jobmod.JobDump
|
||||
Callback *future.SetValueFuture[*jobmod.JobScheduleScheme]
|
||||
}
|
||||
|
||||
|
@ -30,11 +30,11 @@ func NewService(scheduler Scheduler) *Service {
|
|||
}
|
||||
}
|
||||
|
||||
func (s *Service) MakeScheme(status jobmod.JobStatus) (*jobmod.JobScheduleScheme, error) {
|
||||
func (s *Service) MakeScheme(dump jobmod.JobDump) (*jobmod.JobScheduleScheme, error) {
|
||||
s.lock.Lock()
|
||||
callback := future.NewSetValue[*jobmod.JobScheduleScheme]()
|
||||
s.jobs = append(s.jobs, &schedulingJob{
|
||||
Status: status,
|
||||
Dump: dump,
|
||||
Callback: callback,
|
||||
})
|
||||
s.lock.Unlock()
|
||||
|
@ -66,7 +66,7 @@ func (s *Service) tryMakeScheme() {
|
|||
defer s.lock.Unlock()
|
||||
|
||||
for i, job := range s.jobs {
|
||||
scheme, err := s.scheduler.Schedule(&job.Status)
|
||||
scheme, err := s.scheduler.Schedule(&job.Dump)
|
||||
if err == nil {
|
||||
job.Callback.SetValue(scheme)
|
||||
s.jobs[i] = nil
|
||||
|
|
|
@ -39,7 +39,7 @@ func (t *MakeScheduleScheme) Execute(task *task.Task[TaskContext], ctx TaskConte
|
|||
}
|
||||
|
||||
func (t *MakeScheduleScheme) do(taskID string, ctx TaskContext) (*jobmod.JobScheduleScheme, error) {
|
||||
scheme, err := ctx.scheduleSvc.MakeScheme(t.JobStatus)
|
||||
scheme, err := ctx.scheduleSvc.MakeScheme(t.JobDump)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
|
@ -0,0 +1,42 @@
|
|||
package jobmod
|
||||
|
||||
import (
|
||||
"gitlink.org.cn/cloudream/common/pkgs/types"
|
||||
schsdk "gitlink.org.cn/cloudream/common/sdks/scheduler"
|
||||
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
|
||||
"gitlink.org.cn/cloudream/common/utils/serder"
|
||||
)
|
||||
|
||||
type JobBodyDumpType string
|
||||
|
||||
// 导出的任务体的状态,需要能被序列化
|
||||
type JobBodyDump interface {
|
||||
// 仅为了区分整个仓库中拥有同名函数的接口,不要调用这个函数
|
||||
getType() JobBodyDumpType
|
||||
}
|
||||
|
||||
var _ = serder.UseTypeUnionExternallyTagged(types.Ref(types.NewTypeUnion[JobBodyDump](
|
||||
(*NormalJobDump)(nil),
|
||||
(*DataReturnJobDump)(nil),
|
||||
)))
|
||||
|
||||
type NormalJobDump struct {
|
||||
serder.Metadata `union:"NormalJob"`
|
||||
Type JobBodyDumpType `json:"type"`
|
||||
TargetCCID schsdk.CCID `json:"targetCCID"`
|
||||
Files JobFiles `json:"files"`
|
||||
}
|
||||
|
||||
func (d *NormalJobDump) getType() JobBodyDumpType {
|
||||
return d.Type
|
||||
}
|
||||
|
||||
type DataReturnJobDump struct {
|
||||
serder.Metadata `union:"DataReturnJob"`
|
||||
Type JobBodyDumpType `json:"type"`
|
||||
DataReturnPackageID cdssdk.PackageID `json:"dataReturnPackageID"`
|
||||
}
|
||||
|
||||
func (d *DataReturnJobDump) getType() JobBodyDumpType {
|
||||
return d.Type
|
||||
}
|
|
@ -48,25 +48,11 @@ type ImageJobFile struct {
|
|||
ImageID schsdk.ImageID `json:"imageID"`
|
||||
}
|
||||
|
||||
type JobStatus struct {
|
||||
// 导出的任务的状态,需要能被序列化
|
||||
type JobDump struct {
|
||||
JobID schsdk.JobID `json:"jobID"`
|
||||
JobSetID schsdk.JobSetID `json:"jobSetID"`
|
||||
Info schsdk.JobInfo `json:"info"`
|
||||
Body JobBodyStatus `json:"body"`
|
||||
State JobStateStatus `json:"state"`
|
||||
}
|
||||
|
||||
type JobBodyStatus interface {
|
||||
}
|
||||
|
||||
type NormalJobStatus struct {
|
||||
TargetCCID schsdk.CCID `json:"targetCCID"`
|
||||
Files JobFiles `json:"files"`
|
||||
}
|
||||
|
||||
type DataReturnJobStatus struct {
|
||||
DataReturnPackageID cdssdk.PackageID `json:"dataReturnPackageID"`
|
||||
}
|
||||
|
||||
type JobStateStatus interface {
|
||||
Body JobBodyDump `json:"body"`
|
||||
State JobStateDump `json:"state"`
|
||||
}
|
||||
|
|
|
@ -1,7 +1,132 @@
|
|||
package jobmod
|
||||
|
||||
import pcmsdk "gitlink.org.cn/cloudream/common/sdks/pcm"
|
||||
import (
|
||||
"gitlink.org.cn/cloudream/common/pkgs/types"
|
||||
pcmsdk "gitlink.org.cn/cloudream/common/sdks/pcm"
|
||||
"gitlink.org.cn/cloudream/common/utils/serder"
|
||||
)
|
||||
|
||||
type NormalJobExecutingStatus struct {
|
||||
TaskStatus pcmsdk.TaskStatus
|
||||
type JobStateDumpType string
|
||||
|
||||
// 导出的任务状态的数据,需要能被序列化
|
||||
type JobStateDump interface {
|
||||
// 仅为了区分整个仓库中拥有同名函数的接口,不要调用这个函数
|
||||
getType() JobStateDumpType
|
||||
}
|
||||
|
||||
var _ = serder.UseTypeUnionExternallyTagged(types.Ref(types.NewTypeUnion[JobStateDump](
|
||||
(*AdjustingDump)(nil),
|
||||
(*CompletedDump)(nil),
|
||||
(*NormalJobExecutingDump)(nil),
|
||||
(*DataReturnExecutingDump)(nil),
|
||||
(*MakeingAdjustSchemeDump)(nil),
|
||||
(*PreSchedulingDump)(nil),
|
||||
(*ReadyToAdjustDump)(nil),
|
||||
(*NormalJobReadyToExecuteDump)(nil),
|
||||
(*DataReturnReadyToExecuteDump)(nil),
|
||||
(*WaitTargetCompleteDump)(nil),
|
||||
)))
|
||||
|
||||
// 调整中
|
||||
type AdjustingDump struct {
|
||||
serder.Metadata `union:"Adjusting"`
|
||||
Type JobStateDumpType `json:"type"`
|
||||
Scheme JobScheduleScheme `json:"scheme"`
|
||||
}
|
||||
|
||||
func (dump *AdjustingDump) getType() JobStateDumpType {
|
||||
return dump.Type
|
||||
}
|
||||
|
||||
// 任务结束
|
||||
type CompletedDump struct {
|
||||
serder.Metadata `union:"Completed"`
|
||||
Type JobStateDumpType `json:"type"`
|
||||
Error string `json:"error"`
|
||||
}
|
||||
|
||||
func (dump *CompletedDump) getType() JobStateDumpType {
|
||||
return dump.Type
|
||||
}
|
||||
|
||||
// 普通任务执行中
|
||||
type NormalJobExecutingDump struct {
|
||||
serder.Metadata `union:"NormalJobExecuting"`
|
||||
Type JobStateDumpType `json:"type"`
|
||||
TaskStatus pcmsdk.TaskStatus `json:"taskStatus"`
|
||||
}
|
||||
|
||||
func (dump *NormalJobExecutingDump) getType() JobStateDumpType {
|
||||
return dump.Type
|
||||
}
|
||||
|
||||
// 回源任务执行中
|
||||
type DataReturnExecutingDump struct {
|
||||
serder.Metadata `union:"DataReturnExecuting"`
|
||||
Type JobStateDumpType `json:"type"`
|
||||
}
|
||||
|
||||
func (dump *DataReturnExecutingDump) getType() JobStateDumpType {
|
||||
return dump.Type
|
||||
}
|
||||
|
||||
// 制作调整方案中
|
||||
type MakeingAdjustSchemeDump struct {
|
||||
serder.Metadata `union:"MakeingAdjustScheme"`
|
||||
Type JobStateDumpType `json:"type"`
|
||||
}
|
||||
|
||||
func (dump *MakeingAdjustSchemeDump) getType() JobStateDumpType {
|
||||
return dump.Type
|
||||
}
|
||||
|
||||
// 预调度中
|
||||
type PreSchedulingDump struct {
|
||||
serder.Metadata `union:"PreScheduling"`
|
||||
Type JobStateDumpType `json:"type"`
|
||||
Scheme JobScheduleScheme `json:"scheme"`
|
||||
}
|
||||
|
||||
func (dump *PreSchedulingDump) getType() JobStateDumpType {
|
||||
return dump.Type
|
||||
}
|
||||
|
||||
// 准备调整中
|
||||
type ReadyToAdjustDump struct {
|
||||
serder.Metadata `union:"ReadyToAdjust"`
|
||||
Type JobStateDumpType `json:"type"`
|
||||
}
|
||||
|
||||
func (dump *ReadyToAdjustDump) getType() JobStateDumpType {
|
||||
return dump.Type
|
||||
}
|
||||
|
||||
// 普通任务准备执行中
|
||||
type NormalJobReadyToExecuteDump struct {
|
||||
serder.Metadata `union:"NormalJobReadyToExecute"`
|
||||
Type JobStateDumpType `json:"type"`
|
||||
}
|
||||
|
||||
func (dump *NormalJobReadyToExecuteDump) getType() JobStateDumpType {
|
||||
return dump.Type
|
||||
}
|
||||
|
||||
// 回源任务准备执行中
|
||||
type DataReturnReadyToExecuteDump struct {
|
||||
serder.Metadata `union:"DataReturnReadyToExecute"`
|
||||
Type JobStateDumpType `json:"type"`
|
||||
}
|
||||
|
||||
func (dump *DataReturnReadyToExecuteDump) getType() JobStateDumpType {
|
||||
return dump.Type
|
||||
}
|
||||
|
||||
// 等待回源目标完成中
|
||||
type WaitTargetCompleteDump struct {
|
||||
serder.Metadata `union:"WaitTargetComplete"`
|
||||
Type JobStateDumpType `json:"type"`
|
||||
}
|
||||
|
||||
func (dump *WaitTargetCompleteDump) getType() JobStateDumpType {
|
||||
return dump.Type
|
||||
}
|
||||
|
|
|
@ -6,12 +6,12 @@ import (
|
|||
|
||||
type MakeAdjustScheme struct {
|
||||
TaskInfoBase
|
||||
JobStatus jobmod.JobStatus `json:"jobStatus"`
|
||||
JobDump jobmod.JobDump `json:"jobDump"`
|
||||
}
|
||||
|
||||
func NewMakeAdjustScheme(jobStatus jobmod.JobStatus) *MakeAdjustScheme {
|
||||
func NewMakeAdjustScheme(jobDump jobmod.JobDump) *MakeAdjustScheme {
|
||||
return &MakeAdjustScheme{
|
||||
JobStatus: jobStatus,
|
||||
JobDump: jobDump,
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -14,7 +14,7 @@ type JobService interface {
|
|||
|
||||
GetServiceList(msg *GetServiceList) (*GetServiceListResp, *mq.CodeMessage)
|
||||
|
||||
GetJobSetStatus(msg *GetJobSetStatus) (*GetJobSetStatusResp, *mq.CodeMessage)
|
||||
GetJobSetDump(msg *GetJobSetDump) (*GetJobSetDumpResp, *mq.CodeMessage)
|
||||
}
|
||||
|
||||
// 提交任务集
|
||||
|
@ -74,32 +74,32 @@ func (c *Client) JobSetLocalFileUploaded(msg *JobSetLocalFileUploaded, opts ...m
|
|||
return mq.Request(Service.JobSetLocalFileUploaded, c.roundTripper, msg, opts...)
|
||||
}
|
||||
|
||||
var _ = Register(Service.GetJobSetStatus)
|
||||
var _ = Register(Service.GetJobSetDump)
|
||||
|
||||
// 获取任务集的状态
|
||||
type GetJobSetStatus struct {
|
||||
type GetJobSetDump struct {
|
||||
mq.MessageBodyBase
|
||||
JobSetID schsdk.JobSetID `json:"jobSetID"`
|
||||
}
|
||||
type GetJobSetStatusResp struct {
|
||||
type GetJobSetDumpResp struct {
|
||||
mq.MessageBodyBase
|
||||
Jobs []jobmod.JobStatus `json:"jobs"`
|
||||
Jobs []jobmod.JobDump `json:"jobs"`
|
||||
}
|
||||
|
||||
func ReqGetJobSetStatus(jobSetID schsdk.JobSetID) *GetJobSetStatus {
|
||||
return &GetJobSetStatus{
|
||||
func ReqGetJobSetDump(jobSetID schsdk.JobSetID) *GetJobSetDump {
|
||||
return &GetJobSetDump{
|
||||
JobSetID: jobSetID,
|
||||
}
|
||||
}
|
||||
|
||||
func RespGetJobSetStatus(jobs []jobmod.JobStatus) *GetJobSetStatusResp {
|
||||
return &GetJobSetStatusResp{
|
||||
func RespGetJobSetDump(jobs []jobmod.JobDump) *GetJobSetDumpResp {
|
||||
return &GetJobSetDumpResp{
|
||||
Jobs: jobs,
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Client) GetJobSetStatus(msg *GetJobSetStatus, opts ...mq.RequestOption) (*GetJobSetStatusResp, error) {
|
||||
return mq.Request(Service.GetJobSetStatus, c.roundTripper, msg, opts...)
|
||||
func (c *Client) GetJobSetDump(msg *GetJobSetDump, opts ...mq.RequestOption) (*GetJobSetDumpResp, error) {
|
||||
return mq.Request(Service.GetJobSetDump, c.roundTripper, msg, opts...)
|
||||
}
|
||||
|
||||
type GetServiceList struct {
|
||||
|
|
|
@ -1,12 +0,0 @@
|
|||
package event
|
||||
|
||||
// 终止处理
|
||||
type Abort struct {
|
||||
JobID string
|
||||
}
|
||||
|
||||
func NewAbort(jobID string) *Abort {
|
||||
return &Abort{
|
||||
JobID: jobID,
|
||||
}
|
||||
}
|
|
@ -72,8 +72,8 @@ func (j *Job) GetInfo() schsdk.JobInfo {
|
|||
return j.Body.GetInfo()
|
||||
}
|
||||
|
||||
func (j *Job) Dump(ctx JobStateRunContext, job *Job, curState JobState) jobmod.JobStatus {
|
||||
return jobmod.JobStatus{
|
||||
func (j *Job) Dump(ctx JobStateRunContext, job *Job, curState JobState) jobmod.JobDump {
|
||||
return jobmod.JobDump{
|
||||
JobID: j.JobID,
|
||||
JobSetID: j.JobSetID,
|
||||
Info: j.GetInfo(),
|
||||
|
@ -84,5 +84,5 @@ func (j *Job) Dump(ctx JobStateRunContext, job *Job, curState JobState) jobmod.J
|
|||
|
||||
type JobBody interface {
|
||||
GetInfo() schsdk.JobInfo
|
||||
Dump() jobmod.JobBodyStatus
|
||||
Dump() jobmod.JobBodyDump
|
||||
}
|
||||
|
|
|
@ -23,8 +23,8 @@ func (j *DataReturnJob) GetInfo() schsdk.JobInfo {
|
|||
return &j.Info
|
||||
}
|
||||
|
||||
func (j *DataReturnJob) Dump() jobmod.JobBodyStatus {
|
||||
return jobmod.DataReturnJobStatus{
|
||||
func (j *DataReturnJob) Dump() jobmod.JobBodyDump {
|
||||
return &jobmod.DataReturnJobDump{
|
||||
DataReturnPackageID: j.DataReturnPackageID,
|
||||
}
|
||||
}
|
||||
|
|
|
@ -22,8 +22,8 @@ func (j *NormalJob) GetInfo() schsdk.JobInfo {
|
|||
return &j.Info
|
||||
}
|
||||
|
||||
func (j *NormalJob) Dump() jobmod.JobBodyStatus {
|
||||
return &jobmod.NormalJobStatus{
|
||||
func (j *NormalJob) Dump() jobmod.JobBodyDump {
|
||||
return &jobmod.NormalJobDump{
|
||||
Files: j.Files,
|
||||
TargetCCID: j.TargetCCID,
|
||||
}
|
||||
|
|
|
@ -39,9 +39,10 @@ func (s *Adjusting) Run(rtx jobmgr.JobStateRunContext, jo *jobmgr.Job) {
|
|||
}
|
||||
}
|
||||
|
||||
func (s *Adjusting) Dump(rtx jobmgr.JobStateRunContext, jo *jobmgr.Job) jobmod.JobStateStatus {
|
||||
// TODO
|
||||
return nil
|
||||
func (s *Adjusting) Dump(rtx jobmgr.JobStateRunContext, jo *jobmgr.Job) jobmod.JobStateDump {
|
||||
return &jobmod.AdjustingDump{
|
||||
Scheme: s.scheme,
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Adjusting) do(rtx jobmgr.JobStateRunContext, jo *jobmgr.Job) error {
|
||||
|
|
|
@ -29,9 +29,14 @@ func (c *Completed) Run(rtx jobmgr.JobStateRunContext, jo *jobmgr.Job) {
|
|||
}
|
||||
}
|
||||
|
||||
func (s *Completed) Dump(rtx jobmgr.JobStateRunContext, jo *jobmgr.Job) jobmod.JobStateStatus {
|
||||
// TODO
|
||||
return nil
|
||||
func (s *Completed) Dump(rtx jobmgr.JobStateRunContext, jo *jobmgr.Job) jobmod.JobStateDump {
|
||||
err := ""
|
||||
if s.err != nil {
|
||||
err = s.err.Error()
|
||||
}
|
||||
return &jobmod.CompletedDump{
|
||||
Error: err,
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Completed) handleSuccess(rtx jobmgr.JobStateRunContext, job *jobmgr.Job) {
|
||||
|
|
|
@ -10,6 +10,7 @@ import (
|
|||
exetsk "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq/executor/task"
|
||||
"gitlink.org.cn/cloudream/scheduler/common/utils"
|
||||
"gitlink.org.cn/cloudream/scheduler/manager/internal/jobmgr"
|
||||
"gitlink.org.cn/cloudream/scheduler/manager/internal/jobmgr/event"
|
||||
"gitlink.org.cn/cloudream/scheduler/manager/internal/jobmgr/job"
|
||||
)
|
||||
|
||||
|
@ -32,8 +33,8 @@ func (s *NormalJobExecuting) Run(rtx jobmgr.JobStateRunContext, jo *jobmgr.Job)
|
|||
}
|
||||
}
|
||||
|
||||
func (s *NormalJobExecuting) Dump(rtx jobmgr.JobStateRunContext, jo *jobmgr.Job) jobmod.JobStateStatus {
|
||||
return &jobmod.NormalJobExecutingStatus{
|
||||
func (s *NormalJobExecuting) Dump(rtx jobmgr.JobStateRunContext, jo *jobmgr.Job) jobmod.JobStateDump {
|
||||
return &jobmod.NormalJobExecutingDump{
|
||||
TaskStatus: s.lastStatus,
|
||||
}
|
||||
}
|
||||
|
@ -116,9 +117,8 @@ func (s *DataReturnJobExecuting) Run(rtx jobmgr.JobStateRunContext, jo *jobmgr.J
|
|||
}
|
||||
}
|
||||
|
||||
func (s *DataReturnJobExecuting) Dump(rtx jobmgr.JobStateRunContext, jo *jobmgr.Job) jobmod.JobStateStatus {
|
||||
// TODO
|
||||
return nil
|
||||
func (s *DataReturnJobExecuting) Dump(rtx jobmgr.JobStateRunContext, jo *jobmgr.Job) jobmod.JobStateDump {
|
||||
return &jobmod.DataReturnExecutingDump{}
|
||||
}
|
||||
|
||||
func (s *DataReturnJobExecuting) do(rtx jobmgr.JobStateRunContext, jo *jobmgr.Job) error {
|
||||
|
@ -127,6 +127,12 @@ func (s *DataReturnJobExecuting) do(rtx jobmgr.JobStateRunContext, jo *jobmgr.Jo
|
|||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
// 监听取消事件
|
||||
go func() {
|
||||
event.WaitType[event.Cancel](ctx, rtx.EventSet)
|
||||
cancel()
|
||||
}()
|
||||
|
||||
ccInfo, err := rtx.Mgr.DB.ComputingCenter().GetByID(rtx.Mgr.DB.SQLCtx(), reJob.TargetJobCCID)
|
||||
if err != nil {
|
||||
return fmt.Errorf("getting computing center info: %w", err)
|
||||
|
|
|
@ -30,6 +30,7 @@ func (s *MakingAdjustScheme) do(rtx jobmgr.JobStateRunContext, jo *jobmgr.Job) (
|
|||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
// 监听取消事件
|
||||
go func() {
|
||||
event.WaitType[event.Cancel](ctx, rtx.EventSet)
|
||||
cancel()
|
||||
|
@ -51,7 +52,6 @@ func (s *MakingAdjustScheme) do(rtx jobmgr.JobStateRunContext, jo *jobmgr.Job) (
|
|||
return &mkStatus.Scheme, nil
|
||||
}
|
||||
|
||||
func (s *MakingAdjustScheme) Dump(rtx jobmgr.JobStateRunContext, jo *jobmgr.Job) jobmod.JobStateStatus {
|
||||
// TODO
|
||||
return nil
|
||||
func (s *MakingAdjustScheme) Dump(rtx jobmgr.JobStateRunContext, jo *jobmgr.Job) jobmod.JobStateDump {
|
||||
return &jobmod.MakeingAdjustSchemeDump{}
|
||||
}
|
||||
|
|
|
@ -85,9 +85,10 @@ func (s *PreScheduling) Run(rtx jobmgr.JobStateRunContext, jo *jobmgr.Job) {
|
|||
}
|
||||
}
|
||||
|
||||
func (s *PreScheduling) Dump(rtx jobmgr.JobStateRunContext, jo *jobmgr.Job) jobmod.JobStateStatus {
|
||||
// TODO
|
||||
return nil
|
||||
func (s *PreScheduling) Dump(rtx jobmgr.JobStateRunContext, jo *jobmgr.Job) jobmod.JobStateDump {
|
||||
return &jobmod.PreSchedulingDump{
|
||||
Scheme: s.scheme,
|
||||
}
|
||||
}
|
||||
|
||||
func (s *PreScheduling) doPackageScheduling(ctx context.Context, rtx jobmgr.JobStateRunContext, norJob *job.NormalJob, fileInfo schsdk.JobFileInfo, file *jobmod.PackageJobFile, scheme *jobmod.FileScheduleScheme) error {
|
||||
|
|
|
@ -33,6 +33,7 @@ func (s *ReadyToAdjust) do(rtx jobmgr.JobStateRunContext, jo *jobmgr.Job) error
|
|||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
// 监听取消事件
|
||||
go func() {
|
||||
event.WaitType[event.Cancel](ctx, rtx.EventSet)
|
||||
cancel()
|
||||
|
@ -59,7 +60,6 @@ func (s *ReadyToAdjust) do(rtx jobmgr.JobStateRunContext, jo *jobmgr.Job) error
|
|||
return nil
|
||||
}
|
||||
|
||||
func (s *ReadyToAdjust) Dump(rtx jobmgr.JobStateRunContext, jo *jobmgr.Job) jobmod.JobStateStatus {
|
||||
// TODO
|
||||
return nil
|
||||
func (s *ReadyToAdjust) Dump(rtx jobmgr.JobStateRunContext, jo *jobmgr.Job) jobmod.JobStateDump {
|
||||
return &jobmod.ReadyToAdjustDump{}
|
||||
}
|
||||
|
|
|
@ -17,9 +17,8 @@ func (s *NormalJobReadyToExecute) Run(rtx jobmgr.JobStateRunContext, jo *jobmgr.
|
|||
rtx.Mgr.ChangeState(jo, NewNormalJobExecuting())
|
||||
}
|
||||
|
||||
func (s *NormalJobReadyToExecute) Dump(rtx jobmgr.JobStateRunContext, jo *jobmgr.Job) jobmod.JobStateStatus {
|
||||
// TODO
|
||||
return nil
|
||||
func (s *NormalJobReadyToExecute) Dump(rtx jobmgr.JobStateRunContext, jo *jobmgr.Job) jobmod.JobStateDump {
|
||||
return &jobmod.NormalJobReadyToExecuteDump{}
|
||||
}
|
||||
|
||||
type DataReturnJobReadyToExecute struct {
|
||||
|
@ -34,7 +33,6 @@ func (s *DataReturnJobReadyToExecute) Run(rtx jobmgr.JobStateRunContext, jo *job
|
|||
rtx.Mgr.ChangeState(jo, NewDataReturnJobExecuting())
|
||||
}
|
||||
|
||||
func (s *DataReturnJobReadyToExecute) Dump(rtx jobmgr.JobStateRunContext, jo *jobmgr.Job) jobmod.JobStateStatus {
|
||||
// TODO
|
||||
return nil
|
||||
func (s *DataReturnJobReadyToExecute) Dump(rtx jobmgr.JobStateRunContext, jo *jobmgr.Job) jobmod.JobStateDump {
|
||||
return &jobmod.DataReturnReadyToExecuteDump{}
|
||||
}
|
||||
|
|
|
@ -32,6 +32,7 @@ func (s *WaitTargetComplete) do(rtx jobmgr.JobStateRunContext, jo *jobmgr.Job) e
|
|||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
// 监听取消事件
|
||||
go func() {
|
||||
event.WaitType[event.Cancel](ctx, rtx.EventSet)
|
||||
cancel()
|
||||
|
@ -56,7 +57,6 @@ func (s *WaitTargetComplete) do(rtx jobmgr.JobStateRunContext, jo *jobmgr.Job) e
|
|||
return nil
|
||||
}
|
||||
|
||||
func (s *WaitTargetComplete) Dump(rtx jobmgr.JobStateRunContext, jo *jobmgr.Job) jobmod.JobStateStatus {
|
||||
// TODO
|
||||
return nil
|
||||
func (s *WaitTargetComplete) Dump(rtx jobmgr.JobStateRunContext, jo *jobmgr.Job) jobmod.JobStateDump {
|
||||
return &jobmod.WaitTargetCompleteDump{}
|
||||
}
|
||||
|
|
|
@ -10,5 +10,5 @@ type JobStateRunContext struct {
|
|||
|
||||
type JobState interface {
|
||||
Run(ctx JobStateRunContext, job *Job)
|
||||
Dump(ctx JobStateRunContext, job *Job) jobmod.JobStateStatus
|
||||
Dump(ctx JobStateRunContext, job *Job) jobmod.JobStateDump
|
||||
}
|
||||
|
|
|
@ -147,7 +147,7 @@ func (m *Manager) SubmitJobSet(jobs []SubmittingJob) schsdk.JobSetID {
|
|||
return jobSetID
|
||||
}
|
||||
|
||||
func (m *Manager) DumpJobSet(jobSetID schsdk.JobSetID) []jobmod.JobStatus {
|
||||
func (m *Manager) DumpJobSet(jobSetID schsdk.JobSetID) []jobmod.JobDump {
|
||||
m.pubLock.Lock()
|
||||
defer m.pubLock.Unlock()
|
||||
|
||||
|
@ -156,14 +156,14 @@ func (m *Manager) DumpJobSet(jobSetID schsdk.JobSetID) []jobmod.JobStatus {
|
|||
return nil
|
||||
}
|
||||
|
||||
var jobStatuses []jobmod.JobStatus
|
||||
var jobDumps []jobmod.JobDump
|
||||
for _, mgrJob := range jobSet.jobs {
|
||||
jobStatuses = append(jobStatuses, mgrJob.job.Dump(JobStateRunContext{
|
||||
jobDumps = append(jobDumps, mgrJob.job.Dump(JobStateRunContext{
|
||||
Mgr: m,
|
||||
EventSet: &mgrJob.eventSet,
|
||||
LastState: mgrJob.state,
|
||||
}, &mgrJob.job, mgrJob.state))
|
||||
}
|
||||
|
||||
return jobStatuses
|
||||
return jobDumps
|
||||
}
|
||||
|
|
|
@ -59,13 +59,13 @@ func (svc *Service) JobSetLocalFileUploaded(msg *mgrmq.JobSetLocalFileUploaded)
|
|||
return mq.ReplyOK(mgrmq.NewJobSetLocalFileUploadedResp())
|
||||
}
|
||||
|
||||
func (svc *Service) GetJobSetStatus(msg *mgrmq.GetJobSetStatus) (*mgrmq.GetJobSetStatusResp, *mq.CodeMessage) {
|
||||
func (svc *Service) GetJobSetDump(msg *mgrmq.GetJobSetDump) (*mgrmq.GetJobSetDumpResp, *mq.CodeMessage) {
|
||||
jobs := svc.jobMgr.DumpJobSet(msg.JobSetID)
|
||||
if len(jobs) == 0 {
|
||||
return nil, mq.Failed(errorcode.OperationFailed, "job set not found")
|
||||
}
|
||||
|
||||
return mq.ReplyOK(mgrmq.RespGetJobSetStatus(jobs))
|
||||
return mq.ReplyOK(mgrmq.RespGetJobSetDump(jobs))
|
||||
}
|
||||
|
||||
func (svc *Service) GetServiceList(msg *mgrmq.GetServiceList) (*mgrmq.GetServiceListResp, *mq.CodeMessage) {
|
||||
|
@ -76,12 +76,12 @@ func (svc *Service) GetServiceList(msg *mgrmq.GetServiceList) (*mgrmq.GetService
|
|||
for _, jo := range jobs {
|
||||
var cdsNodeID *cdssdk.NodeID
|
||||
|
||||
norJob, ok := jo.Body.(*jobmod.NormalJobStatus)
|
||||
norJob, ok := jo.Body.(*jobmod.NormalJobDump)
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
|
||||
_, ok = jo.State.(*jobmod.NormalJobExecutingStatus)
|
||||
_, ok = jo.State.(*jobmod.NormalJobExecutingDump)
|
||||
if ok {
|
||||
computingCenter, err := svc.db.ComputingCenter().GetByID(svc.db.SQLCtx(), norJob.TargetCCID)
|
||||
if err != nil {
|
||||
|
|
Loading…
Reference in New Issue