From cce21aedea4b1b3c6f51d1c742f4b5837e74d029 Mon Sep 17 00:00:00 2001 From: Sydonian <794346190@qq.com> Date: Sat, 7 Oct 2023 11:13:48 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E5=A4=8D=E8=B0=83=E8=AF=95=E9=97=AE?= =?UTF-8?q?=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- advisor/internal/reporter/reporter.go | 8 +- advisor/internal/scheduler/scheduler.go | 10 +- advisor/internal/task/task.go | 8 +- advisor/main.go | 6 +- .../prescheduler/default_prescheduler.go | 48 ++++--- .../prescheduler/default_prescheduler_test.go | 117 ++++++++++++++++++ client/internal/services/jobset.go | 2 +- collector/main.go | 4 +- common/assets/confs/advisor.config.json | 21 ++++ common/assets/confs/collector.config.json | 11 +- common/assets/confs/executor.config.json | 3 + common/assets/confs/manager.config.json | 4 +- common/models/job/job.go | 9 +- common/models/job/state.go | 21 ++-- common/pkgs/mq/advisor/server.go | 6 +- common/pkgs/mq/advisor/task.go | 8 +- .../mq/advisor/task/make_adjust_scheme.go | 6 +- common/pkgs/mq/advisor/task/task.go | 24 +++- common/pkgs/mq/collector/resource.go | 4 - common/pkgs/mq/collector/server.go | 6 +- common/pkgs/mq/executor/server.go | 6 +- common/pkgs/mq/executor/task.go | 8 +- .../mq/executor/task/cache_move_package.go | 6 +- common/pkgs/mq/executor/task/schedule_task.go | 6 +- .../executor/task/storage_create_package.go | 6 +- .../mq/executor/task/storage_load_package.go | 6 +- common/pkgs/mq/executor/task/task.go | 22 +++- common/pkgs/mq/executor/task/upload_image.go | 6 +- common/pkgs/mq/manager/advisor.go | 4 +- common/pkgs/mq/manager/executor.go | 8 +- common/pkgs/mq/manager/job.go | 4 - common/pkgs/mq/manager/server.go | 6 +- executor/internal/reporter/reporter.go | 8 +- executor/internal/task/task.go | 8 +- executor/main.go | 5 +- go.mod | 7 ++ go.sum | 4 + magefiles/main.go | 42 ++++++- manager/internal/advisormgr/advisormgr.go | 6 +- manager/internal/executormgr/executormgr.go | 6 +- manager/internal/jobmgr/complete_handler.go | 8 ++ manager/internal/jobmgr/default_handler.go | 50 ++++++++ .../jobmgr/event/advisor_task_updated.go | 6 +- .../jobmgr/event/executor_task_updated.go | 6 +- manager/internal/jobmgr/executing_handler.go | 2 +- manager/internal/jobmgr/jobmgr.go | 47 +++++-- .../internal/jobmgr/prescheduling_handler.go | 18 ++- .../jobmgr/ready_to_adjust_handler.go | 5 +- manager/internal/mq/job.go | 6 + manager/main.go | 17 ++- 50 files changed, 503 insertions(+), 162 deletions(-) create mode 100644 client/internal/prescheduler/default_prescheduler_test.go create mode 100644 common/assets/confs/advisor.config.json create mode 100644 manager/internal/jobmgr/default_handler.go diff --git a/advisor/internal/reporter/reporter.go b/advisor/internal/reporter/reporter.go index 1d71fe7..bcd6ca0 100644 --- a/advisor/internal/reporter/reporter.go +++ b/advisor/internal/reporter/reporter.go @@ -15,7 +15,7 @@ import ( type Reporter struct { advisorID schmod.AdvisorID reportInterval time.Duration - taskStatus map[string]advtsk.TaskStatus + taskStatus map[string]advtsk.AdvTaskStatus taskStatusLock sync.Mutex reportNow chan bool } @@ -24,12 +24,12 @@ func NewReporter(advisorID schmod.AdvisorID, reportInterval time.Duration) *Repo return &Reporter{ advisorID: advisorID, reportInterval: reportInterval, - taskStatus: make(map[string]advtsk.TaskStatus), + taskStatus: make(map[string]advtsk.AdvTaskStatus), reportNow: make(chan bool), } } -func (r *Reporter) Report(taskID string, taskStatus advtsk.TaskStatus) { +func (r *Reporter) Report(taskID string, taskStatus advtsk.AdvTaskStatus) { r.taskStatusLock.Lock() defer r.taskStatusLock.Unlock() @@ -65,7 +65,7 @@ func (r *Reporter) Serve() error { for taskID, status := range r.taskStatus { taskStatus = append(taskStatus, mgrmq.NewAdvisorTaskStatus(taskID, status)) } - r.taskStatus = make(map[string]advtsk.TaskStatus) + r.taskStatus = make(map[string]advtsk.AdvTaskStatus) r.taskStatusLock.Unlock() _, err := magCli.ReportAdvisorTaskStatus(mgrmq.NewReportAdvisorTaskStatus(r.advisorID, taskStatus)) diff --git a/advisor/internal/scheduler/scheduler.go b/advisor/internal/scheduler/scheduler.go index 656728b..0b0893a 100644 --- a/advisor/internal/scheduler/scheduler.go +++ b/advisor/internal/scheduler/scheduler.go @@ -183,14 +183,20 @@ func (s *DefaultScheduler) makeSchemeForNode(targetSlwNode *candidateSlwNode) jo if !targetSlwNode.Files.Dataset.IsLoaded { scheme.Dataset.Action = jobmod.ActionLoad + } else { + scheme.Dataset.Action = jobmod.ActionNo } if !targetSlwNode.Files.Code.IsLoaded { - scheme.Dataset.Action = jobmod.ActionLoad + scheme.Code.Action = jobmod.ActionLoad + } else { + scheme.Code.Action = jobmod.ActionNo } if !targetSlwNode.Files.Image.IsLoaded { - scheme.Dataset.Action = jobmod.ActionImportImage + scheme.Image.Action = jobmod.ActionImportImage + } else { + scheme.Image.Action = jobmod.ActionNo } return scheme diff --git a/advisor/internal/task/task.go b/advisor/internal/task/task.go index 27895fc..00e99df 100644 --- a/advisor/internal/task/task.go +++ b/advisor/internal/task/task.go @@ -39,7 +39,7 @@ func NewManager(reporter *reporter.Reporter, scheduleSvc *scheduler.Service) Man } } -func (m *Manager) StartByInfo(info advtsk.TaskInfo) (*Task, error) { +func (m *Manager) StartByInfo(info advtsk.AdvTaskInfo) (*Task, error) { infoType := myreflect.TypeOfValue(info) ctor, ok := taskFromInfoCtors[infoType] @@ -50,10 +50,10 @@ func (m *Manager) StartByInfo(info advtsk.TaskInfo) (*Task, error) { return m.StartNew(ctor(info)), nil } -var taskFromInfoCtors map[reflect.Type]func(advtsk.TaskInfo) TaskBody +var taskFromInfoCtors map[reflect.Type]func(advtsk.AdvTaskInfo) TaskBody = make(map[reflect.Type]func(advtsk.AdvTaskInfo) task.TaskBody[TaskContext]) -func Register[TInfo advtsk.TaskInfo, TTaskBody TaskBody](ctor func(info TInfo) TTaskBody) { - taskFromInfoCtors[myreflect.TypeOf[TInfo]()] = func(info advtsk.TaskInfo) TaskBody { +func Register[TInfo advtsk.AdvTaskInfo, TTaskBody TaskBody](ctor func(info TInfo) TTaskBody) { + taskFromInfoCtors[myreflect.TypeOf[TInfo]()] = func(info advtsk.AdvTaskInfo) TaskBody { return ctor(info.(TInfo)) } } diff --git a/advisor/main.go b/advisor/main.go index 915ddb1..cb62f17 100644 --- a/advisor/main.go +++ b/advisor/main.go @@ -12,6 +12,7 @@ import ( "gitlink.org.cn/cloudream/scheduler/advisor/internal/scheduler" "gitlink.org.cn/cloudream/scheduler/advisor/internal/services" "gitlink.org.cn/cloudream/scheduler/advisor/internal/task" + schglb "gitlink.org.cn/cloudream/scheduler/common/globals" advmq "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq/advisor" ) @@ -28,6 +29,7 @@ func main() { os.Exit(1) } + schglb.InitMQPool(&config.Cfg().RabbitMQ) myglbs.Init() rpter := reporter.NewReporter(myglbs.AdvisorID, time.Second*time.Duration(config.Cfg().ReportIntervalSec)) @@ -41,9 +43,9 @@ func main() { logger.Fatalf("new advisor server failed, err: %s", err.Error()) } - mqSvr.OnError = func(err error) { + mqSvr.OnError(func(err error) { logger.Warnf("advisor server err: %s", err.Error()) - } + }) // 启动服务 go serveMQServer(mqSvr) diff --git a/client/internal/prescheduler/default_prescheduler.go b/client/internal/prescheduler/default_prescheduler.go index da0f4b9..b5c7038 100644 --- a/client/internal/prescheduler/default_prescheduler.go +++ b/client/internal/prescheduler/default_prescheduler.go @@ -149,10 +149,14 @@ func (s *DefaultPreScheduler) Schedule(info *schsdk.JobSetInfo) (*jobmod.JobSetP slwNodes[node.ID] = node } + if len(slwNodes) == 0 { + return nil, nil, ErrNoAvailableScheme + } + // 先根据任务配置,收集它们依赖的任务的LocalID - var schJobs []schedulingJob + var schJobs []*schedulingJob for _, job := range info.Jobs { - j := schedulingJob{ + j := &schedulingJob{ Job: job, } @@ -171,8 +175,8 @@ func (s *DefaultPreScheduler) Schedule(info *schsdk.JobSetInfo) (*jobmod.JobSetP schJobs = append(schJobs, j) } - // 然后根据引用进行排序 - schJobs, ok := s.orderByReference(schJobs) + // 然后根据依赖进行排序 + schJobs, ok := s.orderByAfters(schJobs) if !ok { return nil, nil, fmt.Errorf("circular reference detected between jobs in the job set") } @@ -180,7 +184,7 @@ func (s *DefaultPreScheduler) Schedule(info *schsdk.JobSetInfo) (*jobmod.JobSetP // 经过排序后,按顺序生成调度方案 for _, job := range schJobs { if norJob, ok := job.Job.(*schsdk.NormalJobInfo); ok { - scheme, err := s.scheduleForNormalJob(info, &job, slwNodes, jobSetScheme.JobSchemes) + scheme, err := s.scheduleForNormalJob(info, job, slwNodes, jobSetScheme.JobSchemes) if err != nil { return nil, nil, err } @@ -199,16 +203,17 @@ func (s *DefaultPreScheduler) Schedule(info *schsdk.JobSetInfo) (*jobmod.JobSetP }, nil } -func (s *DefaultPreScheduler) orderByReference(jobs []schedulingJob) ([]schedulingJob, bool) { +func (s *DefaultPreScheduler) orderByAfters(jobs []*schedulingJob) ([]*schedulingJob, bool) { type jobOrder struct { - Job schedulingJob + Job *schedulingJob Afters []string } - var jobOrders []jobOrder + var jobOrders []*jobOrder for _, job := range jobs { - od := jobOrder{ - Job: job, + od := &jobOrder{ + Job: job, + Afters: make([]string, len(job.Afters)), } copy(od.Afters, job.Afters) @@ -217,7 +222,7 @@ func (s *DefaultPreScheduler) orderByReference(jobs []schedulingJob) ([]scheduli } // 然后排序 - var orderedJob []schedulingJob + var orderedJob []*schedulingJob for { rm := 0 for i, jo := range jobOrders { @@ -231,6 +236,7 @@ func (s *DefaultPreScheduler) orderByReference(jobs []schedulingJob) ([]scheduli } rm++ + continue } jobOrders[i-rm] = jobOrders[i] @@ -344,16 +350,18 @@ func (s *DefaultPreScheduler) makeSchemeForNode(targetSlwNode *candidateSlwNode) TargetSlwNodeID: targetSlwNode.SlwNode.ID, } + // TODO 根据实际情况选择Move或者Load + if !targetSlwNode.Files.Dataset.IsLoaded { scheme.Dataset.Action = jobmod.ActionLoad } if !targetSlwNode.Files.Code.IsLoaded { - scheme.Dataset.Action = jobmod.ActionLoad + scheme.Code.Action = jobmod.ActionLoad } if !targetSlwNode.Files.Image.IsLoaded { - scheme.Dataset.Action = jobmod.ActionImportImage + scheme.Image.Action = jobmod.ActionImportImage } return scheme @@ -517,7 +525,7 @@ func (s *DefaultPreScheduler) calcResourceLevel(avai float64, need float64) int // 计算节点得分情况 func (s *DefaultPreScheduler) calcFileScore(files schsdk.JobFilesInfo, allSlwNodes map[uopsdk.SlwNodeID]*candidateSlwNode) error { - // 只计算运控返回的计算中心上的存储服务的数据权重 + // 只计算运控返回的可用计算中心上的存储服务的数据权重 stgNodeToSlwNode := make(map[int64]*candidateSlwNode) for _, slwNode := range allSlwNodes { stgNodeToSlwNode[slwNode.SlwNode.StgNodeID] = slwNode @@ -579,6 +587,7 @@ func (s *DefaultPreScheduler) calcPackageFileScore(packageID int64, stgNodeToSlw slwNodeFileScores := make(map[uopsdk.SlwNodeID]*fileDetail) + // TODO UserID cachedResp, err := colCli.PackageGetCachedStgNodes(collector.NewPackageGetCachedStgNodes(0, packageID)) if err != nil { return nil, err @@ -596,6 +605,7 @@ func (s *DefaultPreScheduler) calcPackageFileScore(packageID int64, stgNodeToSlw } } + // TODO UserID loadedResp, err := colCli.PackageGetLoadedStgNodes(collector.NewPackageGetLoadedStgNodes(0, packageID)) if err != nil { return nil, err @@ -607,14 +617,14 @@ func (s *DefaultPreScheduler) calcPackageFileScore(packageID int64, stgNodeToSlw continue } - fsc, ok := slwNodeFileScores[slwNode.SlwNode.ID] + sfc, ok := slwNodeFileScores[slwNode.SlwNode.ID] if !ok { - fsc = &fileDetail{} - slwNodeFileScores[slwNode.SlwNode.ID] = fsc + sfc = &fileDetail{} + slwNodeFileScores[slwNode.SlwNode.ID] = sfc } - fsc.LoadingScore = 1 * LoadedWeight - fsc.IsLoaded = true + sfc.LoadingScore = 1 * LoadedWeight + sfc.IsLoaded = true } return slwNodeFileScores, nil diff --git a/client/internal/prescheduler/default_prescheduler_test.go b/client/internal/prescheduler/default_prescheduler_test.go new file mode 100644 index 0000000..e10ae82 --- /dev/null +++ b/client/internal/prescheduler/default_prescheduler_test.go @@ -0,0 +1,117 @@ +package prescheduler + +import ( + "testing" + + "github.com/samber/lo" + . "github.com/smartystreets/goconvey/convey" + + schsdk "gitlink.org.cn/cloudream/common/sdks/scheduler" +) + +func TestOrderByAfters(t *testing.T) { + cases := []struct { + title string + jobs []*schedulingJob + wants []string + }{ + { + title: "所有Job都有直接或间接的依赖关系", + jobs: []*schedulingJob{ + { + Job: &schsdk.NormalJobInfo{JobInfoBase: schsdk.JobInfoBase{LocalJobID: "1"}}, + Afters: []string{"2"}, + }, + + { + Job: &schsdk.NormalJobInfo{JobInfoBase: schsdk.JobInfoBase{LocalJobID: "2"}}, + Afters: []string{}, + }, + + { + Job: &schsdk.NormalJobInfo{JobInfoBase: schsdk.JobInfoBase{LocalJobID: "3"}}, + Afters: []string{"1"}, + }, + }, + wants: []string{"2", "1", "3"}, + }, + + { + title: "部分Job之间无依赖关系", + jobs: []*schedulingJob{ + { + Job: &schsdk.NormalJobInfo{JobInfoBase: schsdk.JobInfoBase{LocalJobID: "1"}}, + Afters: []string{"2"}, + }, + + { + Job: &schsdk.NormalJobInfo{JobInfoBase: schsdk.JobInfoBase{LocalJobID: "2"}}, + Afters: []string{}, + }, + + { + Job: &schsdk.NormalJobInfo{JobInfoBase: schsdk.JobInfoBase{LocalJobID: "3"}}, + Afters: []string{"1"}, + }, + + { + Job: &schsdk.NormalJobInfo{JobInfoBase: schsdk.JobInfoBase{LocalJobID: "4"}}, + Afters: []string{"5"}, + }, + + { + Job: &schsdk.NormalJobInfo{JobInfoBase: schsdk.JobInfoBase{LocalJobID: "5"}}, + Afters: []string{}, + }, + }, + wants: []string{"2", "5", "1", "3", "4"}, + }, + + { + title: "存在循环依赖", + jobs: []*schedulingJob{ + { + Job: &schsdk.NormalJobInfo{JobInfoBase: schsdk.JobInfoBase{LocalJobID: "1"}}, + Afters: []string{"2"}, + }, + + { + Job: &schsdk.NormalJobInfo{JobInfoBase: schsdk.JobInfoBase{LocalJobID: "2"}}, + Afters: []string{"1"}, + }, + }, + wants: nil, + }, + + { + title: "完全不依赖", + jobs: []*schedulingJob{ + { + Job: &schsdk.NormalJobInfo{JobInfoBase: schsdk.JobInfoBase{LocalJobID: "1"}}, + Afters: []string{}, + }, + + { + Job: &schsdk.NormalJobInfo{JobInfoBase: schsdk.JobInfoBase{LocalJobID: "2"}}, + Afters: []string{}, + }, + }, + wants: []string{"1", "2"}, + }, + } + + sch := NewDefaultPreScheduler() + for _, c := range cases { + Convey(c.title, t, func() { + ordered, ok := sch.orderByAfters(c.jobs) + if c.wants == nil { + So(ok, ShouldBeFalse) + } else { + So(ok, ShouldBeTrue) + + ids := lo.Map(ordered, func(item *schedulingJob, idx int) string { return item.Job.GetLocalJobID() }) + So(ids, ShouldResemble, c.wants) + } + }) + } +} diff --git a/client/internal/services/jobset.go b/client/internal/services/jobset.go index 3d316eb..dc1a532 100644 --- a/client/internal/services/jobset.go +++ b/client/internal/services/jobset.go @@ -26,7 +26,7 @@ func (svc *JobSetService) Submit(info schsdk.JobSetInfo) (schsdk.JobSetID, *schs schScheme, uploadScheme, err := svc.preScheduler.Schedule(&info) if err != nil { - return "", nil, fmt.Errorf("") + return "", nil, fmt.Errorf("pre scheduling: %w", err) } resp, err := mgrCli.SubmitJobSet(mgrmq.NewSubmitJobSet(info, *schScheme)) diff --git a/collector/main.go b/collector/main.go index e45e12c..445295f 100644 --- a/collector/main.go +++ b/collector/main.go @@ -32,9 +32,9 @@ func main() { logger.Fatalf("new collector server failed, err: %s", err.Error()) } - mqSvr.OnError = func(err error) { + mqSvr.OnError(func(err error) { logger.Warnf("collector server err: %s", err.Error()) - } + }) // 启动服务 go serveColServer(mqSvr) diff --git a/common/assets/confs/advisor.config.json b/common/assets/confs/advisor.config.json new file mode 100644 index 0000000..e699e44 --- /dev/null +++ b/common/assets/confs/advisor.config.json @@ -0,0 +1,21 @@ +{ + "logger": { + "output": "file", + "outputFileName": "advisor", + "outputDirectory": "log", + "level": "debug" + }, + "rabbitMQ": { + "address": "127.0.0.1:5672", + "account": "cloudream", + "password": "123456", + "vhost": "/" + }, + "cloudreamStorage": { + "url": "http://localhost:7890" + }, + "pcm": { + "url": "http://localhost:7892" + }, + "reportIntervalSec": 10 +} \ No newline at end of file diff --git a/common/assets/confs/collector.config.json b/common/assets/confs/collector.config.json index 4afb736..3725c1b 100644 --- a/common/assets/confs/collector.config.json +++ b/common/assets/confs/collector.config.json @@ -15,6 +15,13 @@ "url": "http://localhost:7890" }, "unifyOps": { - "url": "http://localhost:7890" - } + "url": "http://localhost:7892" + }, + "slwNodes": [ + { + "slwNodeID": 1, + "stgNodeID": 1, + "storageID": 1 + } + ] } \ No newline at end of file diff --git a/common/assets/confs/executor.config.json b/common/assets/confs/executor.config.json index de154a6..1439ccc 100644 --- a/common/assets/confs/executor.config.json +++ b/common/assets/confs/executor.config.json @@ -14,5 +14,8 @@ "cloudreamStorage": { "url": "http://localhost:7890" }, + "pcm": { + "url": "http://localhost:7892" + }, "reportIntervalSec": 10 } \ No newline at end of file diff --git a/common/assets/confs/manager.config.json b/common/assets/confs/manager.config.json index d95c4a8..5c64794 100644 --- a/common/assets/confs/manager.config.json +++ b/common/assets/confs/manager.config.json @@ -1,6 +1,8 @@ { "logger": { - "output": "stdout", + "output": "file", + "outputFileName": "manager", + "outputDirectory": "log", "level": "debug" }, "rabbitMQ": { diff --git a/common/models/job/job.go b/common/models/job/job.go index cd9c8fb..a48a4da 100644 --- a/common/models/job/job.go +++ b/common/models/job/job.go @@ -2,10 +2,10 @@ package jobmod import ( "github.com/samber/lo" + "gitlink.org.cn/cloudream/common/pkgs/mq" "gitlink.org.cn/cloudream/common/pkgs/types" schsdk "gitlink.org.cn/cloudream/common/sdks/scheduler" uopsdk "gitlink.org.cn/cloudream/common/sdks/unifyops" - "gitlink.org.cn/cloudream/common/utils/reflect" ) type FileScheduleAction string @@ -73,9 +73,12 @@ type Job interface { } var JobTypeUnion = types.NewTypeUnion[Job]( - reflect.TypeOf[NormalJob](), - reflect.TypeOf[ResourceJob](), + (*NormalJob)(nil), + (*ResourceJob)(nil), ) +var _ = mq.RegisterUnionType(JobTypeUnion) + +// TODO var _ = serder.RegisterNewTaggedTypeUnion(JobTypeUnion, "Type", "type") type JobBase struct { JobSetID schsdk.JobSetID `json:"jobSetID"` // 任务集ID diff --git a/common/models/job/state.go b/common/models/job/state.go index 15c377c..14d1761 100644 --- a/common/models/job/state.go +++ b/common/models/job/state.go @@ -1,8 +1,8 @@ package jobmod import ( + "gitlink.org.cn/cloudream/common/pkgs/mq" "gitlink.org.cn/cloudream/common/pkgs/types" - myreflect "gitlink.org.cn/cloudream/common/utils/reflect" ) type JobState interface { @@ -11,15 +11,18 @@ type JobState interface { type JobStateBase struct{} var JobStateTypeUnion = types.NewTypeUnion[JobState]( - myreflect.TypeOf[StatePreScheduling](), - myreflect.TypeOf[StateReadyToAdjust](), - myreflect.TypeOf[StateMakingAdjustScheme](), - myreflect.TypeOf[StateAdjusting](), - myreflect.TypeOf[StateReadyToExecute](), - myreflect.TypeOf[StateExecuting](), - myreflect.TypeOf[StateFailed](), - myreflect.TypeOf[StateSuccess](), + (*StatePreScheduling)(nil), + (*StateReadyToAdjust)(nil), + (*StateMakingAdjustScheme)(nil), + (*StateAdjusting)(nil), + (*StateReadyToExecute)(nil), + (*StateExecuting)(nil), + (*StateFailed)(nil), + (*StateSuccess)(nil), ) +var _ = mq.RegisterUnionType(JobStateTypeUnion) + +// TODO var _ = serder.RegisterNewTaggedTypeUnion(JobStateTypeUnion, "Type", "type") type FileSchedulingStep string diff --git a/common/pkgs/mq/advisor/server.go b/common/pkgs/mq/advisor/server.go index 16c6966..c898db5 100644 --- a/common/pkgs/mq/advisor/server.go +++ b/common/pkgs/mq/advisor/server.go @@ -16,8 +16,6 @@ type Service interface { type Server struct { service Service rabbitSvr mq.RabbitMQServer - - OnError func(err error) } func NewServer(svc Service, cfg *mymq.Config) (*Server, error) { @@ -48,6 +46,10 @@ func (s *Server) Serve() error { return s.rabbitSvr.Serve() } +func (s *Server) OnError(callback func(error)) { + s.rabbitSvr.OnError = callback +} + var msgDispatcher mq.MessageDispatcher = mq.NewMessageDispatcher() // Register 将Service中的一个接口函数作为指定类型消息的处理函数,同时会注册请求和响应的消息类型 diff --git a/common/pkgs/mq/advisor/task.go b/common/pkgs/mq/advisor/task.go index 5f86288..54bea85 100644 --- a/common/pkgs/mq/advisor/task.go +++ b/common/pkgs/mq/advisor/task.go @@ -15,7 +15,7 @@ var _ = Register(Service.StartTask) type StartTask struct { mq.MessageBodyBase - Info advtsk.TaskInfo `json:"info"` + Info advtsk.AdvTaskInfo `json:"info"` } type StartTaskResp struct { mq.MessageBodyBase @@ -23,7 +23,7 @@ type StartTaskResp struct { TaskID string `json:"taskID"` } -func NewStartTask(info advtsk.TaskInfo) *StartTask { +func NewStartTask(info advtsk.AdvTaskInfo) *StartTask { return &StartTask{ Info: info, } @@ -37,7 +37,3 @@ func NewStartTaskResp(advID schmod.AdvisorID, taskID string) *StartTaskResp { func (c *Client) StartTask(msg *StartTask, opts ...mq.RequestOption) (*StartTaskResp, error) { return mq.Request(Service.StartTask, c.rabbitCli, msg, opts...) } - -func init() { - mq.RegisterUnionType(advtsk.TaskInfoTypeUnion) -} diff --git a/common/pkgs/mq/advisor/task/make_adjust_scheme.go b/common/pkgs/mq/advisor/task/make_adjust_scheme.go index d32599d..aecea9d 100644 --- a/common/pkgs/mq/advisor/task/make_adjust_scheme.go +++ b/common/pkgs/mq/advisor/task/make_adjust_scheme.go @@ -4,6 +4,8 @@ import ( jobmod "gitlink.org.cn/cloudream/scheduler/common/models/job" ) +var _ = Register[*MakeAdjustScheme, *MakeAdjustSchemeStatus]() + type MakeAdjustScheme struct { TaskInfoBase Job jobmod.NormalJob `json:"job"` @@ -27,7 +29,3 @@ func NewMakeAdjustSchemeStatus(err string, scheme jobmod.JobScheduleScheme) *Mak Scheme: scheme, } } - -func init() { - Register[MakeAdjustScheme, MakeAdjustSchemeStatus]() -} diff --git a/common/pkgs/mq/advisor/task/task.go b/common/pkgs/mq/advisor/task/task.go index 3b9058e..6e28f92 100644 --- a/common/pkgs/mq/advisor/task/task.go +++ b/common/pkgs/mq/advisor/task/task.go @@ -1,36 +1,48 @@ package task import ( + "gitlink.org.cn/cloudream/common/pkgs/mq" "gitlink.org.cn/cloudream/common/pkgs/types" myreflect "gitlink.org.cn/cloudream/common/utils/reflect" ) -// 任务 -type TaskInfo interface { +// 任务。 +// 由于json-iter库的缺陷,这个类型名必须加一点前缀,否则会和executor中的重名导致代码异常 +type AdvTaskInfo interface { Noop() } // 增加了新类型后需要在这里也同步添加 -var TaskInfoTypeUnion = types.NewTypeUnion[TaskInfo]() +var TaskInfoTypeUnion = types.NewTypeUnion[AdvTaskInfo]() type TaskInfoBase struct{} func (s *TaskInfoBase) Noop() {} // 任务上报的状态 -type TaskStatus interface { +// 由于json-iter库的缺陷,这个类型名必须加一点前缀,否则会和executor中的重名导致代码异常 +type AdvTaskStatus interface { Noop() } // 增加了新类型后需要在这里也同步添加 -var TaskStatusTypeUnion = types.NewTypeUnion[TaskStatus]() +var TaskStatusTypeUnion = types.NewTypeUnion[AdvTaskStatus]() type TaskStatusBase struct{} func (s *TaskStatusBase) Noop() {} -func Register[TTaskInfo any, TTaskStatus any]() { +// 注:此函数必须以var _ = Register[xxx, xxx]()的形式被调用,这样才能保证init中RegisterUnionType时 +// TypeUnion不是空的。(因为包级变量初始化比init函数调用先进行) +func Register[TTaskInfo AdvTaskInfo, TTaskStatus AdvTaskStatus]() any { TaskInfoTypeUnion.Add(myreflect.TypeOf[TTaskInfo]()) TaskStatusTypeUnion.Add(myreflect.TypeOf[TTaskStatus]()) + + return nil +} + +func init() { + mq.RegisterUnionType(TaskInfoTypeUnion) + mq.RegisterUnionType(TaskStatusTypeUnion) } diff --git a/common/pkgs/mq/collector/resource.go b/common/pkgs/mq/collector/resource.go index 63277a6..2ac1694 100644 --- a/common/pkgs/mq/collector/resource.go +++ b/common/pkgs/mq/collector/resource.go @@ -64,7 +64,3 @@ func NewGetAllResourceDataResp(datas []uopsdk.ResourceData) *GetAllResourceDataR func (c *Client) GetAllResourceData(msg *GetAllResourceData, opts ...mq.RequestOption) (*GetAllResourceDataResp, error) { return mq.Request(Service.GetAllResourceData, c.rabbitCli, msg, opts...) } - -func init() { - mq.RegisterUnionType(uopsdk.ResourceDataTypeUnion) -} diff --git a/common/pkgs/mq/collector/server.go b/common/pkgs/mq/collector/server.go index b67958f..f3c1c01 100644 --- a/common/pkgs/mq/collector/server.go +++ b/common/pkgs/mq/collector/server.go @@ -22,8 +22,6 @@ type Service interface { type Server struct { service Service rabbitSvr mq.RabbitMQServer - - OnError func(err error) } func NewServer(svc Service, cfg *mymq.Config) (*Server, error) { @@ -54,6 +52,10 @@ func (s *Server) Serve() error { return s.rabbitSvr.Serve() } +func (s *Server) OnError(callback func(error)) { + s.rabbitSvr.OnError = callback +} + var msgDispatcher mq.MessageDispatcher = mq.NewMessageDispatcher() // Register 将Service中的一个接口函数作为指定类型消息的处理函数,同时会注册请求和响应的消息类型 diff --git a/common/pkgs/mq/executor/server.go b/common/pkgs/mq/executor/server.go index 52d1ec2..201e61f 100644 --- a/common/pkgs/mq/executor/server.go +++ b/common/pkgs/mq/executor/server.go @@ -19,8 +19,6 @@ const ( type Server struct { service Service rabbitSvr mq.RabbitMQServer - - OnError func(err error) } func NewServer(svc Service, cfg *mymq.Config) (*Server, error) { @@ -51,6 +49,10 @@ func (s *Server) Serve() error { return s.rabbitSvr.Serve() } +func (s *Server) OnError(callback func(error)) { + s.rabbitSvr.OnError = callback +} + var msgDispatcher mq.MessageDispatcher = mq.NewMessageDispatcher() // Register 将Service中的一个接口函数作为指定类型消息的处理函数,同时会注册请求和响应的消息类型 diff --git a/common/pkgs/mq/executor/task.go b/common/pkgs/mq/executor/task.go index c0d425e..6ad0d72 100644 --- a/common/pkgs/mq/executor/task.go +++ b/common/pkgs/mq/executor/task.go @@ -16,7 +16,7 @@ var _ = Register(Service.StartTask) type StartTask struct { mq.MessageBodyBase - Info exectsk.TaskInfo `json:"info"` + Info exectsk.ExeTaskInfo `json:"info"` } type StartTaskResp struct { mq.MessageBodyBase @@ -24,7 +24,7 @@ type StartTaskResp struct { TaskID string `json:"taskID"` } -func NewStartTask(info exectsk.TaskInfo) *StartTask { +func NewStartTask(info exectsk.ExeTaskInfo) *StartTask { return &StartTask{ Info: info, } @@ -38,7 +38,3 @@ func NewStartTaskResp(execID schmod.ExecutorID, taskID string) *StartTaskResp { func (c *Client) StartTask(msg *StartTask, opts ...mq.RequestOption) (*StartTaskResp, error) { return mq.Request(Service.StartTask, c.rabbitCli, msg, opts...) } - -func init() { - mq.RegisterUnionType(exectsk.TaskInfoTypeUnion) -} diff --git a/common/pkgs/mq/executor/task/cache_move_package.go b/common/pkgs/mq/executor/task/cache_move_package.go index 291f094..2f5c521 100644 --- a/common/pkgs/mq/executor/task/cache_move_package.go +++ b/common/pkgs/mq/executor/task/cache_move_package.go @@ -2,6 +2,8 @@ package task import stgsdk "gitlink.org.cn/cloudream/common/sdks/storage" +var _ = Register[*CacheMovePackage, *CacheMovePackageStatus]() + type CacheMovePackage struct { TaskInfoBase UserID int64 `json:"userID"` @@ -27,7 +29,3 @@ func NewCacheMovePackageStatus(err string, cacheInfos []stgsdk.ObjectCacheInfo) CacheInfos: cacheInfos, } } - -func init() { - Register[CacheMovePackage, CacheMovePackageStatus]() -} diff --git a/common/pkgs/mq/executor/task/schedule_task.go b/common/pkgs/mq/executor/task/schedule_task.go index 9965457..9d10b43 100644 --- a/common/pkgs/mq/executor/task/schedule_task.go +++ b/common/pkgs/mq/executor/task/schedule_task.go @@ -5,6 +5,8 @@ import ( uopsdk "gitlink.org.cn/cloudream/common/sdks/unifyops" ) +var _ = Register[*ScheduleTask, *ScheduleTaskStatus]() + type ScheduleTask struct { TaskInfoBase SlwNodeID uopsdk.SlwNodeID `json:"slwNodeID"` @@ -35,7 +37,3 @@ func NewScheduleTaskStatus(status string, err string, pcmJobID int64) *ScheduleT PCMJobID: pcmJobID, } } - -func init() { - Register[ScheduleTask, ScheduleTaskStatus]() -} diff --git a/common/pkgs/mq/executor/task/storage_create_package.go b/common/pkgs/mq/executor/task/storage_create_package.go index 3462ac9..1f07959 100644 --- a/common/pkgs/mq/executor/task/storage_create_package.go +++ b/common/pkgs/mq/executor/task/storage_create_package.go @@ -2,6 +2,8 @@ package task import stgsdk "gitlink.org.cn/cloudream/common/sdks/storage" +var _ = Register[*StorageCreatePackage, *StorageCreatePackageStatus]() + type StorageCreatePackage struct { TaskInfoBase UserID int64 `json:"userID"` @@ -35,7 +37,3 @@ func NewStorageCreatePackageStatus(status string, err string, packageID int64) * PackageID: packageID, } } - -func init() { - Register[StorageCreatePackage, StorageCreatePackageStatus]() -} diff --git a/common/pkgs/mq/executor/task/storage_load_package.go b/common/pkgs/mq/executor/task/storage_load_package.go index 24203d2..c753555 100644 --- a/common/pkgs/mq/executor/task/storage_load_package.go +++ b/common/pkgs/mq/executor/task/storage_load_package.go @@ -1,5 +1,7 @@ package task +var _ = Register[*StorageLoadPackage, *StorageLoadPackageStatus]() + type StorageLoadPackage struct { TaskInfoBase UserID int64 `json:"userID"` @@ -25,7 +27,3 @@ func NewStorageLoadPackageStatus(status string, err string) *StorageLoadPackageS Error: err, } } - -func init() { - Register[StorageLoadPackage, StorageCreatePackageStatus]() -} diff --git a/common/pkgs/mq/executor/task/task.go b/common/pkgs/mq/executor/task/task.go index 3b9058e..00e6f35 100644 --- a/common/pkgs/mq/executor/task/task.go +++ b/common/pkgs/mq/executor/task/task.go @@ -1,36 +1,48 @@ package task import ( + "gitlink.org.cn/cloudream/common/pkgs/mq" "gitlink.org.cn/cloudream/common/pkgs/types" myreflect "gitlink.org.cn/cloudream/common/utils/reflect" ) // 任务 -type TaskInfo interface { +// 由于json-iter库的缺陷,这个类型名必须加一点前缀,否则会和advisor中的重名导致代码异常 +type ExeTaskInfo interface { Noop() } // 增加了新类型后需要在这里也同步添加 -var TaskInfoTypeUnion = types.NewTypeUnion[TaskInfo]() +var TaskInfoTypeUnion = types.NewTypeUnion[ExeTaskInfo]() type TaskInfoBase struct{} func (s *TaskInfoBase) Noop() {} // 任务上报的状态 -type TaskStatus interface { +// 由于json-iter库的缺陷,这个类型名必须加一点前缀,否则会和advisor中的重名导致代码异常 +type ExeTaskStatus interface { Noop() } // 增加了新类型后需要在这里也同步添加 -var TaskStatusTypeUnion = types.NewTypeUnion[TaskStatus]() +var TaskStatusTypeUnion = types.NewTypeUnion[ExeTaskStatus]() type TaskStatusBase struct{} func (s *TaskStatusBase) Noop() {} -func Register[TTaskInfo any, TTaskStatus any]() { +// 注:此函数必须以var _ = Register[xxx, xxx]()的形式被调用,这样才能保证init中RegisterUnionType时 +// TypeUnion不是空的。(因为包级变量初始化比init函数调用先进行) +func Register[TTaskInfo ExeTaskInfo, TTaskStatus ExeTaskStatus]() any { TaskInfoTypeUnion.Add(myreflect.TypeOf[TTaskInfo]()) TaskStatusTypeUnion.Add(myreflect.TypeOf[TTaskStatus]()) + + return nil +} + +func init() { + mq.RegisterUnionType(TaskInfoTypeUnion) + mq.RegisterUnionType(TaskStatusTypeUnion) } diff --git a/common/pkgs/mq/executor/task/upload_image.go b/common/pkgs/mq/executor/task/upload_image.go index 9ed7c3c..f505a91 100644 --- a/common/pkgs/mq/executor/task/upload_image.go +++ b/common/pkgs/mq/executor/task/upload_image.go @@ -2,6 +2,8 @@ package task import uopsdk "gitlink.org.cn/cloudream/common/sdks/unifyops" +var _ = Register[*UploadImage, *UploadImageStatus]() + type UploadImage struct { TaskInfoBase SlwNodeID uopsdk.SlwNodeID `json:"slwNodeID"` @@ -27,7 +29,3 @@ func NewUploadImageStatus(status string, err string, imageID uopsdk.SlwNodeImage ImageID: imageID, } } - -func init() { - Register[UploadImage, UploadImageStatus]() -} diff --git a/common/pkgs/mq/manager/advisor.go b/common/pkgs/mq/manager/advisor.go index b9860dd..17843fb 100644 --- a/common/pkgs/mq/manager/advisor.go +++ b/common/pkgs/mq/manager/advisor.go @@ -25,7 +25,7 @@ type ReportAdvisorTaskStatusResp struct { } type AdvisorTaskStatus struct { TaskID string - Status advtsk.TaskStatus + Status advtsk.AdvTaskStatus } func NewReportAdvisorTaskStatus(advisorID schmod.AdvisorID, taskStatus []AdvisorTaskStatus) *ReportAdvisorTaskStatus { @@ -37,7 +37,7 @@ func NewReportAdvisorTaskStatus(advisorID schmod.AdvisorID, taskStatus []Advisor func NewReportAdvisorTaskStatusResp() *ReportAdvisorTaskStatusResp { return &ReportAdvisorTaskStatusResp{} } -func NewAdvisorTaskStatus(taskID string, status exectsk.TaskStatus) AdvisorTaskStatus { +func NewAdvisorTaskStatus(taskID string, status exectsk.ExeTaskStatus) AdvisorTaskStatus { return AdvisorTaskStatus{ TaskID: taskID, Status: status, diff --git a/common/pkgs/mq/manager/executor.go b/common/pkgs/mq/manager/executor.go index a18f51a..a8052d8 100644 --- a/common/pkgs/mq/manager/executor.go +++ b/common/pkgs/mq/manager/executor.go @@ -24,7 +24,7 @@ type ReportExecutorTaskStatusResp struct { } type ExecutorTaskStatus struct { TaskID string - Status exectsk.TaskStatus + Status exectsk.ExeTaskStatus } func NewReportExecutorTaskStatus(executorID schmod.ExecutorID, taskStatus []ExecutorTaskStatus) *ReportExecutorTaskStatus { @@ -36,7 +36,7 @@ func NewReportExecutorTaskStatus(executorID schmod.ExecutorID, taskStatus []Exec func NewReportExecutorTaskStatusResp() *ReportExecutorTaskStatusResp { return &ReportExecutorTaskStatusResp{} } -func NewExecutorTaskStatus(taskID string, status exectsk.TaskStatus) ExecutorTaskStatus { +func NewExecutorTaskStatus(taskID string, status exectsk.ExeTaskStatus) ExecutorTaskStatus { return ExecutorTaskStatus{ TaskID: taskID, Status: status, @@ -45,7 +45,3 @@ func NewExecutorTaskStatus(taskID string, status exectsk.TaskStatus) ExecutorTas func (c *Client) ReportExecutorTaskStatus(msg *ReportExecutorTaskStatus, opts ...mq.RequestOption) (*ReportExecutorTaskStatusResp, error) { return mq.Request(Service.ReportExecutorTaskStatus, c.roundTripper, msg, opts...) } - -func init() { - mq.RegisterUnionType(exectsk.TaskStatusTypeUnion) -} diff --git a/common/pkgs/mq/manager/job.go b/common/pkgs/mq/manager/job.go index ddc32ea..ca933df 100644 --- a/common/pkgs/mq/manager/job.go +++ b/common/pkgs/mq/manager/job.go @@ -122,7 +122,3 @@ func (c *Client) GetJobSetJobs(msg *GetJobSetJobs, opts ...mq.RequestOption) (*G return mq.Request(Service.GetJobSetJobs, c.rabbitCli, msg, opts...) } */ - -func init() { - mq.RegisterUnionType(jobmod.JobTypeUnion) -} diff --git a/common/pkgs/mq/manager/server.go b/common/pkgs/mq/manager/server.go index f929708..1d84085 100644 --- a/common/pkgs/mq/manager/server.go +++ b/common/pkgs/mq/manager/server.go @@ -22,8 +22,6 @@ type Service interface { type Server struct { service Service rabbitSvr mq.RabbitMQServer - - OnError func(err error) } func NewServer(svc Service, cfg *mymq.Config) (*Server, error) { @@ -54,6 +52,10 @@ func (s *Server) Serve() error { return s.rabbitSvr.Serve() } +func (s *Server) OnError(callback func(error)) { + s.rabbitSvr.OnError = callback +} + var msgDispatcher mq.MessageDispatcher = mq.NewMessageDispatcher() // Register 将Service中的一个接口函数作为指定类型消息的处理函数,同时会注册请求和响应的消息类型 diff --git a/executor/internal/reporter/reporter.go b/executor/internal/reporter/reporter.go index e0da653..12bf6b1 100644 --- a/executor/internal/reporter/reporter.go +++ b/executor/internal/reporter/reporter.go @@ -15,7 +15,7 @@ import ( type Reporter struct { executorID schmod.ExecutorID reportInterval time.Duration - taskStatus map[string]exectsk.TaskStatus + taskStatus map[string]exectsk.ExeTaskStatus taskStatusLock sync.Mutex reportNow chan bool } @@ -24,12 +24,12 @@ func NewReporter(executorID schmod.ExecutorID, reportInterval time.Duration) Rep return Reporter{ executorID: executorID, reportInterval: reportInterval, - taskStatus: make(map[string]exectsk.TaskStatus), + taskStatus: make(map[string]exectsk.ExeTaskStatus), reportNow: make(chan bool), } } -func (r *Reporter) Report(taskID string, taskStatus exectsk.TaskStatus) { +func (r *Reporter) Report(taskID string, taskStatus exectsk.ExeTaskStatus) { r.taskStatusLock.Lock() defer r.taskStatusLock.Unlock() @@ -65,7 +65,7 @@ func (r *Reporter) Serve() error { for taskID, status := range r.taskStatus { taskStatus = append(taskStatus, mgrmq.NewExecutorTaskStatus(taskID, status)) } - r.taskStatus = make(map[string]exectsk.TaskStatus) + r.taskStatus = make(map[string]exectsk.ExeTaskStatus) r.taskStatusLock.Unlock() _, err := magCli.ReportExecutorTaskStatus(mgrmq.NewReportExecutorTaskStatus(r.executorID, taskStatus)) diff --git a/executor/internal/task/task.go b/executor/internal/task/task.go index 6f37186..324f03e 100644 --- a/executor/internal/task/task.go +++ b/executor/internal/task/task.go @@ -36,7 +36,7 @@ func NewManager(reporter *reporter.Reporter) Manager { } } -func (m *Manager) StartByInfo(info exectsk.TaskInfo) (*Task, error) { +func (m *Manager) StartByInfo(info exectsk.ExeTaskInfo) (*Task, error) { infoType := myreflect.TypeOfValue(info) ctor, ok := taskFromInfoCtors[infoType] @@ -47,10 +47,10 @@ func (m *Manager) StartByInfo(info exectsk.TaskInfo) (*Task, error) { return m.StartNew(ctor(info)), nil } -var taskFromInfoCtors map[reflect.Type]func(exectsk.TaskInfo) TaskBody +var taskFromInfoCtors map[reflect.Type]func(exectsk.ExeTaskInfo) TaskBody = make(map[reflect.Type]func(exectsk.ExeTaskInfo) task.TaskBody[TaskContext]) -func Register[TInfo exectsk.TaskInfo, TTaskBody TaskBody](ctor func(info TInfo) TTaskBody) { - taskFromInfoCtors[myreflect.TypeOf[TInfo]()] = func(info exectsk.TaskInfo) TaskBody { +func Register[TInfo exectsk.ExeTaskInfo, TTaskBody TaskBody](ctor func(info TInfo) TTaskBody) { + taskFromInfoCtors[myreflect.TypeOf[TInfo]()] = func(info exectsk.ExeTaskInfo) TaskBody { return ctor(info.(TInfo)) } } diff --git a/executor/main.go b/executor/main.go index f5f6fbd..43786c2 100644 --- a/executor/main.go +++ b/executor/main.go @@ -28,6 +28,7 @@ func main() { os.Exit(1) } + schglb.InitMQPool(&config.Cfg().RabbitMQ) schglb.InitCloudreamStoragePool(&config.Cfg().CloudreamStorage) schglb.InitPCMPool(&config.Cfg().PCM) @@ -42,9 +43,9 @@ func main() { logger.Fatalf("new executor server failed, err: %s", err.Error()) } - mqSvr.OnError = func(err error) { + mqSvr.OnError(func(err error) { logger.Warnf("executor server err: %s", err.Error()) - } + }) // 启动服务 go serveMQServer(mqSvr) diff --git a/go.mod b/go.mod index 10190f2..cf27eb2 100644 --- a/go.mod +++ b/go.mod @@ -8,10 +8,17 @@ require ( github.com/gin-gonic/gin v1.9.1 github.com/google/uuid v1.3.0 github.com/samber/lo v1.38.1 + github.com/smartystreets/goconvey v1.8.0 gitlink.org.cn/cloudream/common v0.0.0 google.golang.org/grpc v1.54.0 ) +require ( + github.com/gopherjs/gopherjs v1.17.2 // indirect + github.com/jtolds/gls v4.20.0+incompatible // indirect + github.com/smartystreets/assertions v1.13.1 // indirect +) + require ( github.com/antonfisher/nested-logrus-formatter v1.3.1 // indirect github.com/bytedance/sonic v1.9.1 // indirect diff --git a/go.sum b/go.sum index 99497d7..477afef 100644 --- a/go.sum +++ b/go.sum @@ -34,6 +34,7 @@ github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/ github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I= github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/gopherjs/gopherjs v1.17.2 h1:fQnZVsXk8uxXIStYb0N4bGk7jeyTalG/wsZjQ25dO0g= +github.com/gopherjs/gopherjs v1.17.2/go.mod h1:pRRIvn/QzFLrKfvEz3qUuEhtE/zLCWfreZ6J5gM2i+k= github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4= github.com/hashicorp/errwrap v1.1.0 h1:OxrOeh75EUXMY8TBjag2fzXGZ40LB6IKw45YeGUDY2I= github.com/hashicorp/errwrap v1.1.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4= @@ -46,6 +47,7 @@ github.com/inhies/go-bytesize v0.0.0-20220417184213-4913239db9cf/go.mod h1:yrqSX github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM= github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= github.com/jtolds/gls v4.20.0+incompatible h1:xdiiI2gbIgH/gLH7ADydsJ1uDOEzR8yvV7C0MuV77Wo= +github.com/jtolds/gls v4.20.0+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU= github.com/klauspost/cpuid/v2 v2.0.9/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg= github.com/klauspost/cpuid/v2 v2.2.4 h1:acbojRNwl3o09bUq+yDCtZFc1aiwaAAxtcn8YkZXnvk= github.com/klauspost/cpuid/v2 v2.2.4/go.mod h1:RVVoqg1df56z8g3pUjL/3lE5UfnlrJX8tyFgg4nqhuY= @@ -69,7 +71,9 @@ github.com/samber/lo v1.38.1/go.mod h1:+m/ZKRl6ClXCE2Lgf3MsQlWfh4bn1bz6CXEOxnEXn github.com/sirupsen/logrus v1.9.2 h1:oxx1eChJGI6Uks2ZC4W1zpLlVgqB8ner4EuQwV4Ik1Y= github.com/sirupsen/logrus v1.9.2/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ= github.com/smartystreets/assertions v1.13.1 h1:Ef7KhSmjZcK6AVf9YbJdvPYG9avaF0ZxudX+ThRdWfU= +github.com/smartystreets/assertions v1.13.1/go.mod h1:cXr/IwVfSo/RbCSPhoAPv73p3hlSdrBH/b3SdnW/LMY= github.com/smartystreets/goconvey v1.8.0 h1:Oi49ha/2MURE0WexF052Z0m+BNSGirfjg5RL+JXWq3w= +github.com/smartystreets/goconvey v1.8.0/go.mod h1:EdX8jtrTIj26jmjCOVNMVSIYAtgexqXKHOXW2Dx9JLg= github.com/streadway/amqp v1.1.0 h1:py12iX8XSyI7aN/3dUT8DFIDJazNJsVJdxNVEpnQTZM= github.com/streadway/amqp v1.1.0/go.mod h1:WYSrTEYHOXHd0nwFeUXAe2G2hRnQT+deZJJf88uS9Bg= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= diff --git a/magefiles/main.go b/magefiles/main.go index 5dc86e2..4f1c136 100644 --- a/magefiles/main.go +++ b/magefiles/main.go @@ -55,9 +55,15 @@ func Bin() error { if err := Collector(); err != nil { return err } + if err := Advisor(); err != nil { + return err + } if err := Executor(); err != nil { return err } + if err := Manager(); err != nil { + return err + } return nil } @@ -66,8 +72,13 @@ func Scripts() error { scriptsDir := "./common/assets/scripts" info, err := os.Stat(scriptsDir) - if errors.Is(err, os.ErrNotExist) || !info.IsDir() { - return fmt.Errorf("script directory not exists or is not a directory") + if errors.Is(err, os.ErrNotExist) { + fmt.Printf("no scripts.\n") + return nil + } + + if !info.IsDir() { + return fmt.Errorf("scripts is not a directory") } fullDirPath, err := filepath.Abs(filepath.Join(BuildDir, "scripts")) @@ -84,8 +95,13 @@ func Confs() error { confDir := "./common/assets/confs" info, err := os.Stat(confDir) - if errors.Is(err, os.ErrNotExist) || !info.IsDir() { - return fmt.Errorf("conf directory not exists or is not a directory") + if errors.Is(err, os.ErrNotExist) { + fmt.Printf("no confs.\n") + return nil + } + + if !info.IsDir() { + return fmt.Errorf("confs is not a directory") } fullDirPath, err := filepath.Abs(filepath.Join(BuildDir, "confs")) @@ -116,6 +132,15 @@ func Collector() error { }) } +func Advisor() error { + return magefiles.Build(magefiles.BuildArgs{ + OutputName: "advisor", + OutputDir: "advisor", + AssetsDir: "assets", + EntryFile: "advisor/main.go", + }) +} + func Executor() error { return magefiles.Build(magefiles.BuildArgs{ OutputName: "executor", @@ -124,3 +149,12 @@ func Executor() error { EntryFile: "executor/main.go", }) } + +func Manager() error { + return magefiles.Build(magefiles.BuildArgs{ + OutputName: "manager", + OutputDir: "manager", + AssetsDir: "assets", + EntryFile: "manager/main.go", + }) +} diff --git a/manager/internal/advisormgr/advisormgr.go b/manager/internal/advisormgr/advisormgr.go index 58fcc63..3ea541e 100644 --- a/manager/internal/advisormgr/advisormgr.go +++ b/manager/internal/advisormgr/advisormgr.go @@ -25,7 +25,7 @@ type AdvisorInfo struct { lastReportTime time.Time } -type OnTaskUpdatedCallbackFn func(jobID schsdk.JobID, fullTaskID string, taskStatus advtsk.TaskStatus) +type OnTaskUpdatedCallbackFn func(jobID schsdk.JobID, fullTaskID string, taskStatus advtsk.AdvTaskStatus) type OnTimeoutCallbackFn func(jobID schsdk.JobID, fullTaskID string) type Manager struct { @@ -66,7 +66,7 @@ func (m *Manager) Report(advID schmod.AdvisorID, taskStatus []mgrmq.AdvisorTaskS info, ok := m.advisors[advID] if !ok { - info := &AdvisorInfo{ + info = &AdvisorInfo{ advisorID: advID, jobTasks: make(map[string]jobTask), } @@ -86,7 +86,7 @@ func (m *Manager) Report(advID schmod.AdvisorID, taskStatus []mgrmq.AdvisorTaskS } // 启动一个Task,并将其关联到指定的Job。返回一个在各Executor之间唯一的TaskID -func (m *Manager) StartTask(jobID schsdk.JobID, info advtsk.TaskInfo) (string, error) { +func (m *Manager) StartTask(jobID schsdk.JobID, info advtsk.AdvTaskInfo) (string, error) { m.lock.Lock() defer m.lock.Unlock() diff --git a/manager/internal/executormgr/executormgr.go b/manager/internal/executormgr/executormgr.go index 789c45e..e9cefde 100644 --- a/manager/internal/executormgr/executormgr.go +++ b/manager/internal/executormgr/executormgr.go @@ -26,7 +26,7 @@ type ExecutorInfo struct { lastReportTime time.Time } -type OnTaskUpdatedCallbackFn func(jobID schsdk.JobID, fullTaskID string, taskStatus exetsk.TaskStatus) +type OnTaskUpdatedCallbackFn func(jobID schsdk.JobID, fullTaskID string, taskStatus exetsk.ExeTaskStatus) type OnTimeoutCallbackFn func(jobID schsdk.JobID, fullTaskID string) type Manager struct { @@ -67,7 +67,7 @@ func (m *Manager) Report(execID schmod.ExecutorID, taskStatus []mgrmq.ExecutorTa info, ok := m.executors[execID] if !ok { - info := &ExecutorInfo{ + info = &ExecutorInfo{ executorID: execID, jobTasks: make(map[string]jobTask), } @@ -87,7 +87,7 @@ func (m *Manager) Report(execID schmod.ExecutorID, taskStatus []mgrmq.ExecutorTa } // 启动一个Task,并将其关联到指定的Job。返回一个在各Executor之间唯一的TaskID -func (m *Manager) StartTask(jobID schsdk.JobID, info exetsk.TaskInfo) (string, error) { +func (m *Manager) StartTask(jobID schsdk.JobID, info exetsk.ExeTaskInfo) (string, error) { m.lock.Lock() defer m.lock.Unlock() diff --git a/manager/internal/jobmgr/complete_handler.go b/manager/internal/jobmgr/complete_handler.go index ebc8a94..dd56798 100644 --- a/manager/internal/jobmgr/complete_handler.go +++ b/manager/internal/jobmgr/complete_handler.go @@ -4,6 +4,7 @@ import ( "fmt" "reflect" + "gitlink.org.cn/cloudream/common/pkgs/logger" jobmod "gitlink.org.cn/cloudream/scheduler/common/models/job" "gitlink.org.cn/cloudream/scheduler/manager/internal/jobmgr/event" ) @@ -32,10 +33,17 @@ func (h *CompleteHandler) Handle(job jobmod.Job) { } func (h *CompleteHandler) handleSuccess(job jobmod.Job, state *jobmod.StateSuccess) { + logger.WithField("JobID", job.GetJobID()).Infof("job completed successfuly") + h.mgr.onEvent(event.ToJobSet(job.GetJobSetID()), event.NewJobCompleted(job)) } func (h *CompleteHandler) handleFailed(job jobmod.Job, state *jobmod.StateFailed) { + logger. + WithField("JobID", job.GetJobID()). + WithField("LastState", reflect.TypeOf(state.LastState).String()). + Infof("job failed with: %v", state.Error) + h.mgr.onEvent(event.ToJobSet(job.GetJobSetID()), event.NewJobCompleted(job)) } diff --git a/manager/internal/jobmgr/default_handler.go b/manager/internal/jobmgr/default_handler.go new file mode 100644 index 0000000..5b6bdf0 --- /dev/null +++ b/manager/internal/jobmgr/default_handler.go @@ -0,0 +1,50 @@ +package jobmgr + +import ( + "gitlink.org.cn/cloudream/common/pkgs/logger" + jobmod "gitlink.org.cn/cloudream/scheduler/common/models/job" + "gitlink.org.cn/cloudream/scheduler/manager/internal/jobmgr/event" +) + +type DefaultHandler struct { + mgr *Manager +} + +func NewDefaultHandler(mgr *Manager) *DefaultHandler { + return &DefaultHandler{ + mgr: mgr, + } +} + +// 处理Job。在此期间全局锁已锁定 +func (h *DefaultHandler) Handle(job jobmod.Job) { + state := job.GetState() + if state == nil { + job.SetState(jobmod.NewStateFailed("unexpected nil state", nil)) + h.mgr.handleState(job) + return + } + + if _, ok := state.(*jobmod.StateFailed); ok { + logger.Warnf("state failed should not be handled by default handler") + return + } + + job.SetState(jobmod.NewStateFailed("no handler for this state", state)) + h.mgr.handleState(job) +} + +// 外部发生了一个事件 +func (h *DefaultHandler) OnEvent(broadcast event.Broadcast, evt event.Event) { + +} + +// 运行Handler +func (h *DefaultHandler) Serve() { + +} + +// 停止此Handler +func (h *DefaultHandler) Stop() { + +} diff --git a/manager/internal/jobmgr/event/advisor_task_updated.go b/manager/internal/jobmgr/event/advisor_task_updated.go index 1e3931c..7a0bbff 100644 --- a/manager/internal/jobmgr/event/advisor_task_updated.go +++ b/manager/internal/jobmgr/event/advisor_task_updated.go @@ -5,17 +5,17 @@ import advtsk "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq/advisor/task" // advisor上报任务进度 type AdvisorTaskUpdated struct { FullTaskID string - TaskStatus advtsk.TaskStatus + TaskStatus advtsk.AdvTaskStatus } -func NewAdvisorTaskUpdated(fullTaskID string, taskStatus advtsk.TaskStatus) *AdvisorTaskUpdated { +func NewAdvisorTaskUpdated(fullTaskID string, taskStatus advtsk.AdvTaskStatus) *AdvisorTaskUpdated { return &AdvisorTaskUpdated{ FullTaskID: fullTaskID, TaskStatus: taskStatus, } } -func AssertAdvisorTaskStatus[T advtsk.TaskStatus](evt Event, fullTaskID string) (T, error) { +func AssertAdvisorTaskStatus[T advtsk.AdvTaskStatus](evt Event, fullTaskID string) (T, error) { var ret T if evt == nil { return ret, ErrUnconcernedTask diff --git a/manager/internal/jobmgr/event/executor_task_updated.go b/manager/internal/jobmgr/event/executor_task_updated.go index fe299d6..4ae50a3 100644 --- a/manager/internal/jobmgr/event/executor_task_updated.go +++ b/manager/internal/jobmgr/event/executor_task_updated.go @@ -7,17 +7,17 @@ import ( // executor上报任务进度 type ExecutorTaskUpdated struct { FullTaskID string - TaskStatus exectsk.TaskStatus + TaskStatus exectsk.ExeTaskStatus } -func NewExecutorTaskUpdated(fullTaskID string, taskStatus exectsk.TaskStatus) *ExecutorTaskUpdated { +func NewExecutorTaskUpdated(fullTaskID string, taskStatus exectsk.ExeTaskStatus) *ExecutorTaskUpdated { return &ExecutorTaskUpdated{ FullTaskID: fullTaskID, TaskStatus: taskStatus, } } -func AssertExecutorTaskStatus[T exectsk.TaskStatus](evt Event, fullTaskID string) (T, error) { +func AssertExecutorTaskStatus[T exectsk.ExeTaskStatus](evt Event, fullTaskID string) (T, error) { var ret T if evt == nil { return ret, ErrUnconcernedTask diff --git a/manager/internal/jobmgr/executing_handler.go b/manager/internal/jobmgr/executing_handler.go index 24a7030..a3c0814 100644 --- a/manager/internal/jobmgr/executing_handler.go +++ b/manager/internal/jobmgr/executing_handler.go @@ -74,7 +74,7 @@ func (h *ExecutingHandler) onNormalJobEvent(evt event.Event, job *executingJob, return } - fullTaskID, err := h.mgr.advMgr.StartTask(job.job.GetJobID(), + fullTaskID, err := h.mgr.execMgr.StartTask(job.job.GetJobID(), exetsk.NewScheduleTask( norJob.TargetSlwNodeID, norJob.Info.Runtime.Envs, diff --git a/manager/internal/jobmgr/jobmgr.go b/manager/internal/jobmgr/jobmgr.go index c3c1b35..009c47b 100644 --- a/manager/internal/jobmgr/jobmgr.go +++ b/manager/internal/jobmgr/jobmgr.go @@ -5,7 +5,9 @@ import ( "fmt" "reflect" "sync" + "time" + "gitlink.org.cn/cloudream/common/pkgs/logger" schsdk "gitlink.org.cn/cloudream/common/sdks/scheduler" myreflect "gitlink.org.cn/cloudream/common/utils/reflect" jobmod "gitlink.org.cn/cloudream/scheduler/common/models/job" @@ -44,6 +46,10 @@ func NewManager(execMgr *executormgr.Manager, advMgr *advisormgr.Manager, imageM execMgr: execMgr, advMgr: advMgr, imageMgr: imageMgr, + + handlers: make(map[reflect.Type]StateHandler), + jobSets: make(map[schsdk.JobSetID]*jobmod.JobSet), + jobs: make(map[schsdk.JobID]*mgrJob), } execMgr.OnTaskUpdated(mgr.executorTaskUpdated) @@ -52,16 +58,20 @@ func NewManager(execMgr *executormgr.Manager, advMgr *advisormgr.Manager, imageM advMgr.OnTaskUpdated(mgr.advisorTaskUpdated) advMgr.OnTaskTimeout(mgr.advisorTaskTimeout) - mgr.handlers[myreflect.TypeOf[jobmod.StatePreScheduling]()] = NewPreSchedulingHandler(mgr) - mgr.handlers[myreflect.TypeOf[jobmod.StateReadyToAdjust]()] = NewReadyToAdjustHandler(mgr) - mgr.handlers[myreflect.TypeOf[jobmod.StateMakingAdjustScheme]()] = NewMakingAdjustSchemeHandler(mgr) - mgr.handlers[myreflect.TypeOf[jobmod.StateAdjusting]()] = NewAdjustingHandler(mgr) - mgr.handlers[myreflect.TypeOf[jobmod.StateReadyToExecute]()] = NewReadyToExecuteHandler(mgr) - mgr.handlers[myreflect.TypeOf[jobmod.StateExecuting]()] = NewExecutingHandler(mgr) + // TODO 考虑优化这部分逻辑 + + mgr.handlers[myreflect.TypeOf[*jobmod.StatePreScheduling]()] = NewPreSchedulingHandler(mgr) + mgr.handlers[myreflect.TypeOf[*jobmod.StateReadyToAdjust]()] = NewReadyToAdjustHandler(mgr) + mgr.handlers[myreflect.TypeOf[*jobmod.StateMakingAdjustScheme]()] = NewMakingAdjustSchemeHandler(mgr) + mgr.handlers[myreflect.TypeOf[*jobmod.StateAdjusting]()] = NewAdjustingHandler(mgr) + mgr.handlers[myreflect.TypeOf[*jobmod.StateReadyToExecute]()] = NewReadyToExecuteHandler(mgr) + mgr.handlers[myreflect.TypeOf[*jobmod.StateExecuting]()] = NewExecutingHandler(mgr) compHder := NewCompleteHandler(mgr) - mgr.handlers[myreflect.TypeOf[jobmod.StateFailed]()] = compHder - mgr.handlers[myreflect.TypeOf[jobmod.StateSuccess]()] = compHder + mgr.handlers[myreflect.TypeOf[*jobmod.StateFailed]()] = compHder + mgr.handlers[myreflect.TypeOf[*jobmod.StateSuccess]()] = compHder + + mgr.defaultHandler = NewDefaultHandler(mgr) return mgr, nil } @@ -73,6 +83,19 @@ func (m *Manager) Serve() error { go m.defaultHandler.Serve() + ticker := time.NewTicker(time.Minute) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + // 每一分钟产生一个空事件,防止无限等待 + m.pubLock.Lock() + m.onEvent(event.ToAll(), nil) + m.pubLock.Unlock() + } + } + return nil } @@ -158,7 +181,7 @@ func (m *Manager) LocalFileUploaded(jobSetID schsdk.JobSetID, localPath string, return nil } -func (m *Manager) executorTaskUpdated(jobID schsdk.JobID, fullTaskID string, taskStatus exectsk.TaskStatus) { +func (m *Manager) executorTaskUpdated(jobID schsdk.JobID, fullTaskID string, taskStatus exectsk.ExeTaskStatus) { m.pubLock.Lock() defer m.pubLock.Unlock() @@ -182,7 +205,7 @@ func (m *Manager) executorTaskTimeout(jobID schsdk.JobID, fullTaskID string) { job.Handler.OnEvent(event.ToJob(jobID), event.NewExecutorTaskTimeout(fullTaskID)) } -func (m *Manager) advisorTaskUpdated(jobID schsdk.JobID, fullTaskID string, taskStatus advtsk.TaskStatus) { +func (m *Manager) advisorTaskUpdated(jobID schsdk.JobID, fullTaskID string, taskStatus advtsk.AdvTaskStatus) { m.pubLock.Lock() defer m.pubLock.Unlock() @@ -224,6 +247,10 @@ func (m *Manager) CloneJob(jobID schsdk.JobID) (jobmod.Job, error) { // 根据job状态选择handler进行处理。需要加锁 func (m *Manager) handleState(job jobmod.Job) { + logger.WithField("JobID", job.GetJobID()). + WithField("State", reflect.TypeOf(job.GetState()).String()). + Debugf("job state changed") + runtime, ok := m.jobs[job.GetJobID()] if !ok { return diff --git a/manager/internal/jobmgr/prescheduling_handler.go b/manager/internal/jobmgr/prescheduling_handler.go index edac298..39d1993 100644 --- a/manager/internal/jobmgr/prescheduling_handler.go +++ b/manager/internal/jobmgr/prescheduling_handler.go @@ -135,6 +135,7 @@ func (h *PreSchedulingHandler) changeJobState(job jobmod.Job, state jobmod.JobSt } func (h *PreSchedulingHandler) doPackageScheduling(evt event.Event, job *preSchedulingJob, fileInfo schsdk.JobFileInfo, file *jobmod.PackageJobFile, scheme *jobmod.FileScheduleScheme, state *jobmod.FileSchedulingState) error { + // TODO 考虑拆分成多个函数 if state.Step == jobmod.StepBegin { switch info := fileInfo.(type) { case *schsdk.LocalJobFileInfo: @@ -163,6 +164,10 @@ func (h *PreSchedulingHandler) doPackageScheduling(evt event.Event, job *preSche return nil } + if localFileCmd.Error != "" { + return fmt.Errorf("local file uploading: %s", localFileCmd.Error) + } + file.PackageID = localFileCmd.PackageID state.Step = jobmod.StepUploaded } @@ -243,6 +248,7 @@ func (h *PreSchedulingHandler) doPackageScheduling(evt event.Event, job *preSche } func (h *PreSchedulingHandler) doImageScheduling(evt event.Event, job *preSchedulingJob, fileInfo schsdk.JobFileInfo, file *jobmod.ImageJobFile, scheme *jobmod.FileScheduleScheme, state *jobmod.FileSchedulingState) error { + // TODO 考虑拆分成多个函数 if state.Step == jobmod.StepBegin { switch info := fileInfo.(type) { case *schsdk.LocalJobFileInfo: @@ -277,6 +283,10 @@ func (h *PreSchedulingHandler) doImageScheduling(evt event.Event, job *preSchedu return nil } + if localFileCmd.Error != "" { + return fmt.Errorf("local file uploading: %s", localFileCmd.Error) + } + // 上传完毕,则可以新建一个空的镜像的记录 info, err := h.mgr.imageMgr.CreateImage(localFileCmd.PackageID) if err != nil { @@ -325,8 +335,12 @@ func (h *PreSchedulingHandler) doImageScheduling(evt event.Event, job *preSchedu return fmt.Errorf("cache move pacakge: %s", cacheMoveRet.Error) } - if len(cacheMoveRet.CacheInfos) != 1 { - return fmt.Errorf("there must be only 1 object in the package that will be imported") + if len(cacheMoveRet.CacheInfos) == 0 { + return fmt.Errorf("no object in the package which will be imported") + } + + if len(cacheMoveRet.CacheInfos) > 1 { + return fmt.Errorf("there must be only 1 object in the package which will be imported") } fullTaskID, err := h.mgr.execMgr.StartTask(job.job.JobID, exectsk.NewUploadImage(job.slwNodeInfo.ID, stgsdk.MakeIPFSFilePath(cacheMoveRet.CacheInfos[0].FileHash))) diff --git a/manager/internal/jobmgr/ready_to_adjust_handler.go b/manager/internal/jobmgr/ready_to_adjust_handler.go index 7324ef6..1fd2637 100644 --- a/manager/internal/jobmgr/ready_to_adjust_handler.go +++ b/manager/internal/jobmgr/ready_to_adjust_handler.go @@ -71,7 +71,7 @@ func (h *ReadyToAdjustHandler) onNormalJobEvent(evt event.Event, job *readyToAdj return } - needWait := true + needWait := false // 无论发生什么事件,都检查一下前置任务的状态 if resFile, ok := norJob.Info.Files.Dataset.(*schsdk.ResourceJobFileInfo); ok { @@ -110,6 +110,9 @@ func (h *ReadyToAdjustHandler) onNormalJobEvent(evt event.Event, job *readyToAdj job.state, )) return + } else { + // 等待的Job不是失败或者成功状态,则需要继续等待 + needWait = true } } diff --git a/manager/internal/mq/job.go b/manager/internal/mq/job.go index 839d52f..334d495 100644 --- a/manager/internal/mq/job.go +++ b/manager/internal/mq/job.go @@ -9,6 +9,8 @@ import ( // 提交任务集 func (svc *Service) SubmitJobSet(msg *mgrmq.SubmitJobSet) (*mgrmq.SubmitJobSetResp, *mq.CodeMessage) { + logger.Debugf("submitting job") + jobSet, err := svc.jobMgr.SubmitJobSet(msg.JobSet, msg.PreScheduleScheme) if err != nil { logger.Warnf("submitting job set: %s", err.Error()) @@ -20,6 +22,10 @@ func (svc *Service) SubmitJobSet(msg *mgrmq.SubmitJobSet) (*mgrmq.SubmitJobSetRe // 任务集中某个文件上传完成 func (svc *Service) JobSetLocalFileUploaded(msg *mgrmq.JobSetLocalFileUploaded) (*mgrmq.JobSetLocalFileUploadedResp, *mq.CodeMessage) { + logger.WithField("LocalPath", msg.LocalPath). + WithField("PackageID", msg.PackageID). + Debugf("local file uploaded") + svc.jobMgr.LocalFileUploaded(msg.JobSetID, msg.LocalPath, msg.Error, msg.PackageID) return mq.ReplyOK(mgrmq.NewJobSetLocalFileUploadedResp()) } diff --git a/manager/main.go b/manager/main.go index 0b8c1a3..86e4ad3 100644 --- a/manager/main.go +++ b/manager/main.go @@ -68,11 +68,13 @@ func main() { logger.Fatalf("new manager mq server: %s", err.Error()) } - mqSvr.OnError = func(err error) { + mqSvr.OnError(func(err error) { logger.Warnf("manager server err: %s", err.Error()) - } + }) // 启动服务 + go serveJobManager(jobMgr) + go serveExecutorManager(exeMgr) go serveAdvisorManager(advMgr) @@ -83,6 +85,17 @@ func main() { <-forever } +func serveJobManager(mgr *jobmgr.Manager) { + logger.Info("start serving job manager") + + err := mgr.Serve() + if err != nil { + logger.Errorf("job manager stopped with error: %s", err.Error()) + } + + logger.Info("job manager stopped") +} + func serveMQServer(server *mgrmq.Server) { logger.Info("start serving mq server")