解决错误
This commit is contained in:
parent
3169870530
commit
02a03ebb73
|
@ -38,7 +38,7 @@ const (
|
|||
var ErrNoAvailableScheme = fmt.Errorf("no appropriate scheduling node found, please wait")
|
||||
|
||||
type Scheduler interface {
|
||||
Schedule(info *schsdk.NormalJobInfo, status jobmod.NormalJobStatus) (*jobmod.JobScheduleScheme, error)
|
||||
Schedule(status *jobmod.JobStatus) (*jobmod.JobScheduleScheme, error)
|
||||
}
|
||||
|
||||
type candidate struct {
|
||||
|
@ -129,7 +129,12 @@ func NewDefaultSchedule() *DefaultScheduler {
|
|||
return &DefaultScheduler{}
|
||||
}
|
||||
|
||||
func (s *DefaultScheduler) Schedule(info *schsdk.NormalJobInfo, status jobmod.NormalJobStatus) (*jobmod.JobScheduleScheme, error) {
|
||||
func (s *DefaultScheduler) Schedule(status *jobmod.JobStatus) (*jobmod.JobScheduleScheme, error) {
|
||||
norJob, ok := status.Body.(*jobmod.NormalJobStatus)
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("only normal job can be scheduled, but got %T", status.Body)
|
||||
}
|
||||
|
||||
mgrCli, err := schglb.ManagerMQPool.Acquire()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("new collector client: %w", err)
|
||||
|
@ -151,17 +156,17 @@ func (s *DefaultScheduler) Schedule(info *schsdk.NormalJobInfo, status jobmod.No
|
|||
for _, cc := range allCC.ComputingCenters {
|
||||
allCCs[cc.CCID] = &candidate{
|
||||
CC: cc,
|
||||
IsPreScheduled: cc.CCID == status.TargetCCID,
|
||||
IsPreScheduled: cc.CCID == norJob.TargetCCID,
|
||||
}
|
||||
}
|
||||
|
||||
// 计算
|
||||
err = s.calcFileScore(status.Files, allCCs)
|
||||
err = s.calcFileScore(norJob.Files, allCCs)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
err = s.calcResourceScore(info, allCCs)
|
||||
err = s.calcResourceScore(status.Info.(*schsdk.NormalJobInfo), allCCs)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
|
@ -8,12 +8,11 @@ import (
|
|||
"github.com/samber/lo"
|
||||
|
||||
"gitlink.org.cn/cloudream/common/pkgs/future"
|
||||
schsdk "gitlink.org.cn/cloudream/common/sdks/scheduler"
|
||||
jobmod "gitlink.org.cn/cloudream/scheduler/common/models/job"
|
||||
)
|
||||
|
||||
type schedulingJob struct {
|
||||
Job jobmod.NormalJob
|
||||
Status jobmod.JobStatus
|
||||
Callback *future.SetValueFuture[*jobmod.JobScheduleScheme]
|
||||
}
|
||||
|
||||
|
@ -31,11 +30,11 @@ func NewService(scheduler Scheduler) *Service {
|
|||
}
|
||||
}
|
||||
|
||||
func (s *Service) MakeScheme(job schsdk.NormalJobInfo) (*jobmod.JobScheduleScheme, error) {
|
||||
func (s *Service) MakeScheme(status jobmod.JobStatus) (*jobmod.JobScheduleScheme, error) {
|
||||
s.lock.Lock()
|
||||
callback := future.NewSetValue[*jobmod.JobScheduleScheme]()
|
||||
s.jobs = append(s.jobs, &schedulingJob{
|
||||
Job: job,
|
||||
Status: status,
|
||||
Callback: callback,
|
||||
})
|
||||
s.lock.Unlock()
|
||||
|
@ -67,7 +66,7 @@ func (s *Service) tryMakeScheme() {
|
|||
defer s.lock.Unlock()
|
||||
|
||||
for i, job := range s.jobs {
|
||||
scheme, err := s.scheduler.Schedule(&job.Job)
|
||||
scheme, err := s.scheduler.Schedule(&job.Status)
|
||||
if err == nil {
|
||||
job.Callback.SetValue(scheme)
|
||||
s.jobs[i] = nil
|
||||
|
|
|
@ -39,7 +39,7 @@ func (t *MakeScheduleScheme) Execute(task *task.Task[TaskContext], ctx TaskConte
|
|||
}
|
||||
|
||||
func (t *MakeScheduleScheme) do(taskID string, ctx TaskContext) (*jobmod.JobScheduleScheme, error) {
|
||||
scheme, err := ctx.scheduleSvc.MakeScheme(t.JobInfo)
|
||||
scheme, err := ctx.scheduleSvc.MakeScheme(t.JobStatus)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
|
@ -0,0 +1,7 @@
|
|||
package jobmod
|
||||
|
||||
import pcmsdk "gitlink.org.cn/cloudream/common/sdks/pcm"
|
||||
|
||||
type NormalJobExecutingStatus struct {
|
||||
TaskStatus pcmsdk.TaskStatus
|
||||
}
|
|
@ -1,19 +1,16 @@
|
|||
package task
|
||||
|
||||
import (
|
||||
schsdk "gitlink.org.cn/cloudream/common/sdks/scheduler"
|
||||
jobmod "gitlink.org.cn/cloudream/scheduler/common/models/job"
|
||||
)
|
||||
|
||||
type MakeAdjustScheme struct {
|
||||
TaskInfoBase
|
||||
JobInfo schsdk.NormalJobInfo `json:"jobInfo"`
|
||||
JobStatus jobmod.NormalJobStatus `json:"jobStatus"`
|
||||
JobStatus jobmod.JobStatus `json:"jobStatus"`
|
||||
}
|
||||
|
||||
func NewMakeAdjustScheme(jobInfo schsdk.NormalJobInfo, jobStatus jobmod.NormalJobStatus) *MakeAdjustScheme {
|
||||
func NewMakeAdjustScheme(jobStatus jobmod.JobStatus) *MakeAdjustScheme {
|
||||
return &MakeAdjustScheme{
|
||||
JobInfo: jobInfo,
|
||||
JobStatus: jobStatus,
|
||||
}
|
||||
}
|
||||
|
|
|
@ -33,8 +33,9 @@ func (s *NormalJobExecuting) Run(rtx jobmgr.JobStateRunContext, jo *jobmgr.Job)
|
|||
}
|
||||
|
||||
func (s *NormalJobExecuting) Dump(rtx jobmgr.JobStateRunContext, jo *jobmgr.Job) jobmod.JobStateStatus {
|
||||
// TODO
|
||||
return nil
|
||||
return &jobmod.NormalJobExecutingStatus{
|
||||
TaskStatus: s.lastStatus,
|
||||
}
|
||||
}
|
||||
|
||||
func (s *NormalJobExecuting) do(rtx jobmgr.JobStateRunContext, jo *jobmgr.Job) error {
|
||||
|
|
|
@ -8,7 +8,6 @@ import (
|
|||
advtsk "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq/advisor/task"
|
||||
"gitlink.org.cn/cloudream/scheduler/manager/internal/jobmgr"
|
||||
"gitlink.org.cn/cloudream/scheduler/manager/internal/jobmgr/event"
|
||||
"gitlink.org.cn/cloudream/scheduler/manager/internal/jobmgr/job"
|
||||
)
|
||||
|
||||
type MakingAdjustScheme struct {
|
||||
|
@ -19,7 +18,7 @@ func NewMakeingAdjustScheme() *MakingAdjustScheme {
|
|||
}
|
||||
|
||||
func (s *MakingAdjustScheme) Run(rtx jobmgr.JobStateRunContext, jo *jobmgr.Job) {
|
||||
scheme, err := s.do(rtx, jo.Body.(*job.NormalJob))
|
||||
scheme, err := s.do(rtx, jo)
|
||||
if err != nil {
|
||||
rtx.Mgr.ChangeState(jo, FailureComplete(err))
|
||||
} else {
|
||||
|
@ -27,7 +26,7 @@ func (s *MakingAdjustScheme) Run(rtx jobmgr.JobStateRunContext, jo *jobmgr.Job)
|
|||
}
|
||||
}
|
||||
|
||||
func (s *MakingAdjustScheme) do(rtx jobmgr.JobStateRunContext, norJob *job.NormalJob) (*jobmod.JobScheduleScheme, error) {
|
||||
func (s *MakingAdjustScheme) do(rtx jobmgr.JobStateRunContext, jo *jobmgr.Job) (*jobmod.JobScheduleScheme, error) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
|
@ -36,10 +35,7 @@ func (s *MakingAdjustScheme) do(rtx jobmgr.JobStateRunContext, norJob *job.Norma
|
|||
cancel()
|
||||
}()
|
||||
|
||||
wt := rtx.Mgr.AdvMgr.StartTask(advtsk.NewMakeAdjustScheme(norJob.Info, jobmod.NormalJobStatus{
|
||||
TargetCCID: norJob.TargetCCID,
|
||||
Files: norJob.Files,
|
||||
}))
|
||||
wt := rtx.Mgr.AdvMgr.StartTask(advtsk.NewMakeAdjustScheme(jo.Dump(rtx, jo, s)))
|
||||
defer wt.Close()
|
||||
|
||||
status, err := wt.Receive(ctx)
|
||||
|
|
|
@ -104,10 +104,10 @@ func (m *Manager) BroadcastEvent(jobSetID schsdk.JobSetID, evt Event) {
|
|||
return
|
||||
}
|
||||
|
||||
for _, mgrJob := range jobSet.jobs {
|
||||
go func() {
|
||||
mgrJob.eventSet.Post(evt)
|
||||
}()
|
||||
for _, mjob := range jobSet.jobs {
|
||||
go func(j *mgrJob) {
|
||||
j.eventSet.Post(evt)
|
||||
}(mjob)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -167,42 +167,3 @@ func (m *Manager) DumpJobSet(jobSetID schsdk.JobSetID) []jobmod.JobStatus {
|
|||
|
||||
return jobStatuses
|
||||
}
|
||||
|
||||
// func (m *Manager) GetServiceList(jobSetID schsdk.JobSetID) ([]schsdk.JobSetServiceInfo, error) {
|
||||
// m.pubLock.Lock()
|
||||
// defer m.pubLock.Unlock()
|
||||
|
||||
// var jobSetServiceInfos []schsdk.JobSetServiceInfo
|
||||
|
||||
// jobSet, ok := m.jobSets[jobSetID]
|
||||
// if !ok {
|
||||
// return nil, fmt.Errorf("jobSet not found")
|
||||
// }
|
||||
|
||||
// for _, job := range jobSet.jobs {
|
||||
// var cdsNodeID *cdssdk.NodeID
|
||||
// _, ok = job.state.(*state.NormalJobExecuting)
|
||||
// if ok {
|
||||
// computingCenter, err := m.DB.ComputingCenter().GetByID(m.DB.SQLCtx(), norJob.TargetCCID)
|
||||
// if err != nil {
|
||||
// return nil, fmt.Errorf("get cdsNodeID failed by CCID: %s", err.Error())
|
||||
// }
|
||||
// cdsNodeID = &computingCenter.CDSNodeID
|
||||
// } else {
|
||||
// //返回空指针,表明查询任务不在执行状态,没有id
|
||||
// cdsNodeID = nil
|
||||
// }
|
||||
|
||||
// for _, servicePortInfo := range norJob.Info.Services.ServicePortInfos {
|
||||
// jobSetServiceInfo := schsdk.JobSetServiceInfo{
|
||||
// Name: servicePortInfo.Name,
|
||||
// Port: servicePortInfo.Port,
|
||||
// CDSNodeID: cdsNodeID,
|
||||
// LocalJobID: job.LocalJobID,
|
||||
// }
|
||||
// jobSetServiceInfos = append(jobSetServiceInfos, jobSetServiceInfo)
|
||||
// }
|
||||
// }
|
||||
|
||||
// return jobSetServiceInfos, nil
|
||||
// }
|
||||
|
|
|
@ -8,6 +8,8 @@ import (
|
|||
"gitlink.org.cn/cloudream/common/pkgs/logger"
|
||||
"gitlink.org.cn/cloudream/common/pkgs/mq"
|
||||
schsdk "gitlink.org.cn/cloudream/common/sdks/scheduler"
|
||||
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
|
||||
jobmod "gitlink.org.cn/cloudream/scheduler/common/models/job"
|
||||
mgrmq "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq/manager"
|
||||
"gitlink.org.cn/cloudream/scheduler/manager/internal/jobmgr"
|
||||
"gitlink.org.cn/cloudream/scheduler/manager/internal/jobmgr/event"
|
||||
|
@ -67,11 +69,43 @@ func (svc *Service) GetJobSetStatus(msg *mgrmq.GetJobSetStatus) (*mgrmq.GetJobSe
|
|||
}
|
||||
|
||||
func (svc *Service) GetServiceList(msg *mgrmq.GetServiceList) (*mgrmq.GetServiceListResp, *mq.CodeMessage) {
|
||||
// serviceList, err := svc.jobMgr.GetServiceList(msg.JobSetID)
|
||||
// if err != nil {
|
||||
// logger.WithField("JobSetID", msg.JobSetID).Warnf("get service list: %s", err.Error())
|
||||
// return nil, mq.Failed(errorcode.OperationFailed, "get service list failed")
|
||||
// }
|
||||
jobs := svc.jobMgr.DumpJobSet(msg.JobSetID)
|
||||
|
||||
// return mq.ReplyOK(mgrmq.NewGetServiceListResp(serviceList))
|
||||
var jobSetServiceInfos []schsdk.JobSetServiceInfo
|
||||
|
||||
for _, jo := range jobs {
|
||||
var cdsNodeID *cdssdk.NodeID
|
||||
|
||||
norJob, ok := jo.Body.(*jobmod.NormalJobStatus)
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
|
||||
_, ok = jo.State.(*jobmod.NormalJobExecutingStatus)
|
||||
if ok {
|
||||
computingCenter, err := svc.db.ComputingCenter().GetByID(svc.db.SQLCtx(), norJob.TargetCCID)
|
||||
if err != nil {
|
||||
return nil, mq.Failed(errorcode.OperationFailed, fmt.Sprintf("get cdsNodeID failed by CCID: %s", err.Error()))
|
||||
}
|
||||
|
||||
cdsNodeID = &computingCenter.CDSNodeID
|
||||
|
||||
} else {
|
||||
//返回空指针,表明查询任务不在执行状态,没有id
|
||||
cdsNodeID = nil
|
||||
}
|
||||
|
||||
norJobInfo := jo.Info.(*schsdk.NormalJobInfo)
|
||||
for _, servicePortInfo := range norJobInfo.Services.ServicePortInfos {
|
||||
jobSetServiceInfo := schsdk.JobSetServiceInfo{
|
||||
Name: servicePortInfo.Name,
|
||||
Port: servicePortInfo.Port,
|
||||
CDSNodeID: cdsNodeID,
|
||||
LocalJobID: norJobInfo.LocalJobID,
|
||||
}
|
||||
jobSetServiceInfos = append(jobSetServiceInfos, jobSetServiceInfo)
|
||||
}
|
||||
}
|
||||
|
||||
return mq.ReplyOK(mgrmq.NewGetServiceListResp(jobSetServiceInfos))
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue