完成Manager中的剩余内容

This commit is contained in:
Sydonian 2023-09-20 16:53:25 +08:00
parent b05a23916a
commit 36e4c8993d
59 changed files with 1157 additions and 335 deletions

View File

@ -1,9 +1,12 @@
package globals
import "github.com/google/uuid"
import (
"github.com/google/uuid"
schmod "gitlink.org.cn/cloudream/scheduler/common/models"
)
var AdvisorID string
var AdvisorID schmod.AdvisorID
func Init() {
AdvisorID = uuid.NewString()
AdvisorID = schmod.AdvisorID(uuid.NewString())
}

View File

@ -7,19 +7,20 @@ import (
"gitlink.org.cn/cloudream/common/pkgs/logger"
"gitlink.org.cn/cloudream/scheduler/common/globals"
schmod "gitlink.org.cn/cloudream/scheduler/common/models"
advtsk "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq/advisor/task"
mgrmq "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq/manager"
)
type Reporter struct {
advisorID string
advisorID schmod.AdvisorID
reportInterval time.Duration
taskStatus map[string]advtsk.TaskStatus
taskStatusLock sync.Mutex
reportNow chan bool
}
func NewReporter(advisorID string, reportInterval time.Duration) Reporter {
func NewReporter(advisorID schmod.AdvisorID, reportInterval time.Duration) Reporter {
return Reporter{
advisorID: advisorID,
reportInterval: reportInterval,

View File

@ -38,7 +38,7 @@ type DefaultSchedule struct {
}
type CandidateSlwNodeInfo struct {
slwNodeID int64
slwNodeID models.SlwNodeID
resourceLevel int
resourceScore float64
nodeScore float64
@ -169,7 +169,7 @@ type ResourceInfo struct {
}
// 划分节点资源等级,并计算资源得分
func (s *DefaultSchedule) computeResourceScore(resources models.JobResourcesInfo, slwNodeID int64) (int, float64, error) {
func (s *DefaultSchedule) computeResourceScore(resources models.JobResourcesInfo, slwNodeID models.SlwNodeID) (int, float64, error) {
colCli, err := globals.CollectorMQPool.Acquire()
if err != nil {
return 0, 0, fmt.Errorf("new collector client: %w", err)
@ -306,14 +306,14 @@ type stgScore struct {
}
// 计算节点得分情况
func (s *DefaultSchedule) ComputeAllSlwNodeScore(files jobmod.JobFiles) (map[int64]SlwNodeScore, error) {
func (s *DefaultSchedule) ComputeAllSlwNodeScore(files jobmod.JobFiles) (map[models.SlwNodeID]SlwNodeScore, error) {
colCli, err := globals.CollectorMQPool.Acquire()
if err != nil {
return nil, fmt.Errorf("new collector client: %w", err)
}
defer colCli.Close()
slwNodeScores := make(map[int64]SlwNodeScore)
slwNodeScores := make(map[models.SlwNodeID]SlwNodeScore)
//计算code相关得分
codeStgScores, err := s.computeAllStgScore(files.Code.PackageID)

View File

@ -1,12 +0,0 @@
package services
import (
"gitlink.org.cn/cloudream/common/pkgs/mq"
schtsk "gitlink.org.cn/cloudream/scheduler/advisor/internal/task"
advmq "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq/advisor"
)
func (svc *Service) StartMakeScheduleScheme(msg *advmq.StartMakeScheduleScheme) (*advmq.StartMakeScheduleSchemeResp, *mq.CodeMessage) {
tsk := svc.taskManager.StartNew(schtsk.NewMakeScheduleScheme())
return mq.ReplyOK(advmq.NewStartMakeScheduleSchemeResp(tsk.ID()))
}

View File

@ -0,0 +1,21 @@
package services
import (
"gitlink.org.cn/cloudream/common/consts/errorcode"
"gitlink.org.cn/cloudream/common/pkgs/logger"
"gitlink.org.cn/cloudream/common/pkgs/mq"
"gitlink.org.cn/cloudream/common/utils/reflect"
myglbs "gitlink.org.cn/cloudream/scheduler/advisor/internal/globals"
advmq "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq/advisor"
)
func (svc *Service) StartTask(msg *advmq.StartTask) (*advmq.StartTaskResp, *mq.CodeMessage) {
tsk, err := svc.taskManager.StartByInfo(msg.Info)
if err != nil {
logger.WithField("Info", reflect.TypeOfValue(msg.Info).Name()).
Warnf("starting task by info: %s", err.Error())
return nil, mq.Failed(errorcode.OperationFailed, "start task by info failed")
}
return mq.ReplyOK(advmq.NewStartTaskResp(myglbs.AdvisorID, tsk.ID()))
}

View File

@ -17,12 +17,13 @@ import (
)
type MakeScheduleScheme struct {
Job jobmod.NormalJob
Scheme jobmod.JobScheduleScheme
*advtsk.MakeAdjustScheme
}
func NewMakeScheduleScheme() *MakeScheduleScheme {
return &MakeScheduleScheme{}
func NewMakeScheduleScheme(info *advtsk.MakeAdjustScheme) *MakeScheduleScheme {
return &MakeScheduleScheme{
MakeAdjustScheme: info,
}
}
func (t *MakeScheduleScheme) Execute(task *task.Task[TaskContext], ctx TaskContext, complete CompleteFn) {
@ -30,13 +31,13 @@ func (t *MakeScheduleScheme) Execute(task *task.Task[TaskContext], ctx TaskConte
log.Debugf("begin")
defer log.Debugf("end")
err := t.do(task.ID(), ctx)
scheme, err := t.do(task.ID(), ctx)
if err != nil {
//TODO 若任务失败上报的状态failed字段根据情况修改
ctx.reporter.Report(task.ID(), advtsk.NewScheduleSchemeTaskStatus("failed", err.Error(), jobmod.JobScheduleScheme{}))
ctx.reporter.Report(task.ID(), advtsk.NewMakeAdjustSchemeStatus(err.Error(), jobmod.JobScheduleScheme{}))
} else {
// 将调度方案上报给manager
ctx.reporter.Report(task.ID(), advtsk.NewScheduleSchemeTaskStatus("failed", "", t.Scheme))
ctx.reporter.Report(task.ID(), advtsk.NewMakeAdjustSchemeStatus("", scheme))
}
ctx.reporter.ReportNow()
@ -45,13 +46,15 @@ func (t *MakeScheduleScheme) Execute(task *task.Task[TaskContext], ctx TaskConte
})
}
func (t *MakeScheduleScheme) do(taskID string, ctx TaskContext) error {
func (t *MakeScheduleScheme) do(taskID string, ctx TaskContext) (jobmod.JobScheduleScheme, error) {
var scheme jobmod.JobScheduleScheme
isAvailable, err := t.CheckResourceAvailability()
if err != nil {
return err
return scheme, err
}
var defaultSchedule scheduler.DefaultSchedule
var scheme jobmod.JobScheduleScheme
if isAvailable {
// 确认code、dataset、image是否已经调度到该中心并生成调度方案
@ -73,19 +76,18 @@ func (t *MakeScheduleScheme) do(taskID string, ctx TaskContext) error {
}
}
if err != nil {
return err
return scheme, err
}
// 重新执行预调度方案,寻找最优节点
} else {
s, err := defaultSchedule.Schedule(t.Job)
scheme = *s
if err != nil {
return err
return scheme, err
}
}
t.Scheme = scheme
return nil
return scheme, nil
}
// 检查预调度节点资源是否足够
@ -221,3 +223,7 @@ func (t *MakeScheduleScheme) CheckResourceAvailability() (bool, error) {
}
return true, nil
}
func init() {
Register(NewMakeScheduleScheme)
}

View File

@ -1,8 +1,13 @@
package task
import (
"fmt"
"reflect"
"gitlink.org.cn/cloudream/common/pkgs/task"
myreflect "gitlink.org.cn/cloudream/common/utils/reflect"
reporter "gitlink.org.cn/cloudream/scheduler/advisor/internal/reporter"
advtsk "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq/advisor/task"
)
type TaskContext struct {
@ -13,7 +18,9 @@ type TaskContext struct {
// 因此适合进行执行结果的设置
type CompleteFn = task.CompleteFn
type Manager = task.Manager[TaskContext]
type Manager struct {
task.Manager[TaskContext]
}
type TaskBody = task.TaskBody[TaskContext]
@ -22,7 +29,28 @@ type Task = task.Task[TaskContext]
type CompleteOption = task.CompleteOption
func NewManager(reporter *reporter.Reporter) Manager {
return task.NewManager(TaskContext{
reporter: reporter,
})
return Manager{
Manager: task.NewManager(TaskContext{
reporter: reporter,
}),
}
}
func (m *Manager) StartByInfo(info advtsk.TaskInfo) (*Task, error) {
infoType := myreflect.TypeOfValue(info)
ctor, ok := taskFromInfoCtors[infoType]
if !ok {
return nil, fmt.Errorf("unknow info type")
}
return m.StartNew(ctor(info)), nil
}
var taskFromInfoCtors map[reflect.Type]func(advtsk.TaskInfo) TaskBody
func Register[TInfo advtsk.TaskInfo, TTaskBody TaskBody](ctor func(info TInfo) TTaskBody) {
taskFromInfoCtors[myreflect.TypeOf[TInfo]()] = func(info advtsk.TaskInfo) TaskBody {
return ctor(info.(TInfo))
}
}

View File

@ -21,7 +21,7 @@ func (s *Server) JobSetSvc() *JobSetService {
}
type JobSetSubmitResp struct {
JobSetID string `json:"jobSetID"`
JobSetID models.JobSetID `json:"jobSetID"`
FilesUploadScheme models.JobSetFilesUploadScheme `json:"filesUploadScheme"`
}
@ -56,10 +56,10 @@ func (s *JobSetService) Submit(ctx *gin.Context) {
}
type JobSetLocalFileUploadedReq struct {
JobSetID string `json:"jobSetID" binding:"required"`
LocalPath string `json:"localPath" binding:"required"`
Error string `json:"error"`
PackageID int64 `json:"packageID"`
JobSetID models.JobSetID `json:"jobSetID" binding:"required"`
LocalPath string `json:"localPath" binding:"required"`
Error string `json:"error"`
PackageID int64 `json:"packageID"`
}
func (s *JobSetService) LocalFileUploaded(ctx *gin.Context) {

View File

@ -17,7 +17,7 @@ func (svc *Service) JobSetSvc() *JobSetService {
}
// 提交任务集
func (svc *JobSetService) Submit(info models.JobSetInfo) (string, *models.JobSetFilesUploadScheme, error) {
func (svc *JobSetService) Submit(info models.JobSetInfo) (models.JobSetID, *models.JobSetFilesUploadScheme, error) {
mgrCli, err := globals.ManagerMQPool.Acquire()
if err != nil {
return "", nil, fmt.Errorf("new manager client: %w", err)
@ -38,7 +38,7 @@ func (svc *JobSetService) Submit(info models.JobSetInfo) (string, *models.JobSet
}
// 任务集中某个文件上传完成
func (svc *JobSetService) LocalFileUploaded(jobSetID string, localPath string, errMsg string, packageID int64) error {
func (svc *JobSetService) LocalFileUploaded(jobSetID models.JobSetID, localPath string, errMsg string, packageID int64) error {
mgrCli, err := globals.ManagerMQPool.Acquire()
if err != nil {
return fmt.Errorf("new manager client: %w", err)

View File

@ -3,6 +3,7 @@ package config
import (
cldstg "gitlink.org.cn/cloudream/common/api/storage"
uniops "gitlink.org.cn/cloudream/common/api/unifyops"
"gitlink.org.cn/cloudream/common/models"
log "gitlink.org.cn/cloudream/common/pkgs/logger"
c "gitlink.org.cn/cloudream/common/utils/config"
mymq "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq"
@ -28,7 +29,7 @@ func Cfg() *Config {
}
type SlwNodeConfig struct {
SlwNodeID int64 `json:"slwNodeID"`
StgNodeID int64 `json:"stgNodeID"`
StorageID int64 `json:"StorageID"`
SlwNodeID models.SlwNodeID `json:"slwNodeID"`
StgNodeID int64 `json:"stgNodeID"`
StorageID int64 `json:"StorageID"`
}

View File

@ -0,0 +1,13 @@
{
"logger": {
"output": "stdout",
"level": "debug"
},
"rabbitMQ": {
"address": "127.0.0.1:5672",
"account": "cloudream",
"password": "123456",
"vhost": "/"
},
"reportTimeoutSecs": 20
}

View File

@ -5,19 +5,23 @@ import (
cldstg "gitlink.org.cn/cloudream/common/api/storage"
"gitlink.org.cn/cloudream/common/api/unifyops"
scmq "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq"
advmq "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq/advisor"
cltmq "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq/collector"
execmq "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq/executor"
exemq "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq/executor"
mgrmq "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq/manager"
)
var ExecutorMQPool *execmq.Pool
var ExecutorMQPool *exemq.Pool
var AdvisorMQPool *advmq.Pool
var CollectorMQPool *cltmq.Pool
var ManagerMQPool *mgrmq.Pool
func InitMQPool(cfg *scmq.Config) {
ExecutorMQPool = execmq.NewPool(cfg)
ExecutorMQPool = exemq.NewPool(cfg)
AdvisorMQPool = advmq.NewPool(cfg)
CollectorMQPool = cltmq.NewPool(cfg)
ManagerMQPool = mgrmq.NewPool(cfg)
}

View File

@ -2,6 +2,7 @@ package job
import (
"github.com/samber/lo"
"gitlink.org.cn/cloudream/common/models"
"gitlink.org.cn/cloudream/common/pkgs/types"
"gitlink.org.cn/cloudream/common/utils/reflect"
)
@ -22,7 +23,7 @@ type FileScheduleScheme struct {
// 任务调度方案
type JobScheduleScheme struct {
TargetSlwNodeID int64 `json:"targetSlwNodeID"`
TargetSlwNodeID models.SlwNodeID `json:"targetSlwNodeID"`
Dataset FileScheduleScheme `json:"dataset"`
Code FileScheduleScheme `json:"code"`
Image FileScheduleScheme `json:"image"`
@ -35,16 +36,16 @@ type JobSetPreScheduleScheme struct {
// 任务集
type JobSet struct {
JobSetID string `json:"jobSetID"` // 全局唯一的任务集ID
JobSetID models.JobSetID `json:"jobSetID"` // 全局唯一的任务集ID
JobRefs []JobSetJobRef `json:"jobRefs"` // 任务集中包含的任务,只是一个引用
PreScheduleScheme JobSetPreScheduleScheme `json:"preScheduleScheme"`
}
type JobSetJobRef struct {
JobID string `json:"jobID"` // 任务ID
LocalJobID string `json:"localJobID"` // 在当前任务集内的任务ID
JobID models.JobID `json:"jobID"` // 任务ID
LocalJobID string `json:"localJobID"` // 在当前任务集内的任务ID
}
func NewJobSet(jobSetID string, jobRefs []JobSetJobRef, preScheduleScheme JobSetPreScheduleScheme) *JobSet {
func NewJobSet(jobSetID models.JobSetID, jobRefs []JobSetJobRef, preScheduleScheme JobSetPreScheduleScheme) *JobSet {
return &JobSet{
JobSetID: jobSetID,
JobRefs: jobRefs,
@ -63,10 +64,11 @@ func (j *JobSet) FindRefByLocalJobID(localJobID string) *JobSetJobRef {
// 任务
type Job interface {
GetJobSetID() string
GetJobID() string
GetJobSetID() models.JobSetID
GetJobID() models.JobID
GetState() JobState
SetState(state JobState)
Clone() Job
}
var JobTypeUnion = types.NewTypeUnion[Job](
@ -75,15 +77,15 @@ var JobTypeUnion = types.NewTypeUnion[Job](
)
type JobBase struct {
JobSetID string `json:"jobSetID"` // 任务集ID
JobID string `json:"jobID"` // 全局唯一任务ID
State JobState `json:"state"` // 任务当前的状态。包含当前在状态下执行操作所需的数据
JobSetID models.JobSetID `json:"jobSetID"` // 任务集ID
JobID models.JobID `json:"jobID"` // 全局唯一任务ID
State JobState `json:"state"` // 任务当前的状态。包含当前在状态下执行操作所需的数据
}
func (j *JobBase) GetJobSetID() string {
func (j *JobBase) GetJobSetID() models.JobSetID {
return j.JobSetID
}
func (j *JobBase) GetJobID() string {
func (j *JobBase) GetJobID() models.JobID {
return j.JobID
}
func (j *JobBase) GetState() JobState {

View File

@ -1,15 +1,18 @@
package job
import "gitlink.org.cn/cloudream/common/models"
import (
"gitlink.org.cn/cloudream/common/models"
schmod "gitlink.org.cn/cloudream/scheduler/common/models"
)
type NormalJob struct {
JobBase
Info models.NormalJobInfo `json:"info"` // 提交任务时提供的任务描述信息
Files JobFiles `json:"files"` // 任务需要的文件
TargetSlwNodeID int64 `json:"targetSlwNodeID"` // 将要运行此任务的算力中心ID
TargetSlwNodeID models.SlwNodeID `json:"targetSlwNodeID"` // 将要运行此任务的算力中心ID
}
func NewNormalJob(jobSetID string, jobID string, info models.NormalJobInfo) *NormalJob {
func NewNormalJob(jobSetID models.JobSetID, jobID models.JobID, info models.NormalJobInfo) *NormalJob {
return &NormalJob{
JobBase: JobBase{
JobSetID: jobSetID,
@ -19,6 +22,12 @@ func NewNormalJob(jobSetID string, jobID string, info models.NormalJobInfo) *Nor
}
}
func (j *NormalJob) Clone() Job {
tmp := *j
tmp.State = tmp.State.Clone()
return &tmp
}
type JobFiles struct {
Dataset PackageJobFile `json:"dataset"`
Code PackageJobFile `json:"code"`
@ -30,6 +39,6 @@ type PackageJobFile struct {
}
type ImageJobFile struct {
PackageID int64 `json:"packageID"`
ImageID string `json:"imageID"`
PackageID int64 `json:"packageID"`
ImageID schmod.ImageID `json:"imageID"`
}

View File

@ -8,7 +8,7 @@ type ResourceJob struct {
ResourcePackageID int64 `json:"resourcePackageID"` // 回源之后得到的PackageID
}
func NewResourceJob(jobSetID string, jobID string, info models.ResourceJobInfo) *ResourceJob {
func NewResourceJob(jobSetID models.JobSetID, jobID models.JobID, info models.ResourceJobInfo) *ResourceJob {
return &ResourceJob{
JobBase: JobBase{
JobSetID: jobSetID,
@ -17,3 +17,9 @@ func NewResourceJob(jobSetID string, jobID string, info models.ResourceJobInfo)
Info: info,
}
}
func (j *ResourceJob) Clone() Job {
tmp := *j
tmp.State = tmp.State.Clone()
return &tmp
}

View File

@ -6,12 +6,10 @@ import (
)
type JobState interface {
Noop()
Clone() JobState
}
type JobStateBase struct{}
func (s *JobStateBase) Noop() {}
var JobStateTypeUnion = types.NewTypeUnion[JobState](
myreflect.TypeOf[StatePreScheduling](),
myreflect.TypeOf[StateReadyToAdjust](),
@ -63,6 +61,10 @@ func NewStatePreScheduling(scheme JobScheduleScheme) *StatePreScheduling {
},
}
}
func (s *StatePreScheduling) Clone() JobState {
tmp := *s
return &tmp
}
type StateReadyToAdjust struct {
JobStateBase
@ -72,6 +74,11 @@ func NewStateReadyToAdjust() *StateReadyToAdjust {
return &StateReadyToAdjust{}
}
func (s *StateReadyToAdjust) Clone() JobState {
tmp := *s
return &tmp
}
type StateMakingAdjustScheme struct {
JobStateBase
FullTaskID string `json:"fullTaskID"`
@ -81,6 +88,11 @@ func NewStateMakingAdjustScheme() *StateMakingAdjustScheme {
return &StateMakingAdjustScheme{}
}
func (s *StateMakingAdjustScheme) Clone() JobState {
tmp := *s
return &tmp
}
type StateAdjusting struct {
JobStateBase
Scheme JobScheduleScheme `json:"scheme"`
@ -104,6 +116,11 @@ func NewStateAdjusting(scheme JobScheduleScheme) *StateAdjusting {
}
}
func (s *StateAdjusting) Clone() JobState {
tmp := *s
return &tmp
}
type StateReadyToExecute struct {
JobStateBase
}
@ -112,6 +129,11 @@ func NewStateReadyToExecute() *StateReadyToExecute {
return &StateReadyToExecute{}
}
func (s *StateReadyToExecute) Clone() JobState {
tmp := *s
return &tmp
}
type StateExecuting struct {
JobStateBase
FullTaskID string `json:"fullTaskID"`
@ -121,6 +143,11 @@ func NewStateExecuting() *StateExecuting {
return &StateExecuting{}
}
func (s *StateExecuting) Clone() JobState {
tmp := *s
return &tmp
}
type StateFailed struct {
JobStateBase
Error string `json:"error"`
@ -134,6 +161,11 @@ func NewStateFailed(err string, lastState JobState) *StateFailed {
}
}
func (s *StateFailed) Clone() JobState {
tmp := *s
return &tmp
}
type StateSuccess struct {
JobStateBase
}
@ -141,3 +173,8 @@ type StateSuccess struct {
func NewStateSuccess() *StateSuccess {
return &StateSuccess{}
}
func (s *StateSuccess) Clone() JobState {
tmp := *s
return &tmp
}

View File

@ -1,12 +1,20 @@
package models
import "gitlink.org.cn/cloudream/common/models"
type ExecutorID string
type AdvisorID string
type ImageID string
type ImageInfo struct {
ImageID string `json:"imageID"`
ImageID ImageID `json:"imageID"`
PackageID int64 `json:"packageID"` // 镜像文件
ImportingInfos []ImageImportingInfo `json:"importingInfos"` // 此镜像导入到了哪些节点
}
type ImageImportingInfo struct {
SlwNodeID int64 `json:"slwNodeID"`
SlwNodeImageID int64 `json:"slwNodeImageID"`
SlwNodeID models.SlwNodeID `json:"slwNodeID"`
SlwNodeImageID models.SlwNodeImageID `json:"slwNodeImageID"`
}

View File

@ -1,39 +0,0 @@
package advisor
import (
"gitlink.org.cn/cloudream/common/pkgs/mq"
"gitlink.org.cn/cloudream/scheduler/common/models/job"
)
type ApiService interface {
StartMakeScheduleScheme(msg *StartMakeScheduleScheme) (*StartMakeScheduleSchemeResp, *mq.CodeMessage)
}
// 获取调度方案
var _ = Register(Service.StartMakeScheduleScheme)
type StartMakeScheduleScheme struct {
mq.MessageBodyBase
Job job.NormalJob `json:"job"`
}
type StartMakeScheduleSchemeResp struct {
mq.MessageBodyBase
TaskID string `json:"taskID"`
}
func NewStartGetScheduleScheme(job job.NormalJob) *StartMakeScheduleScheme {
return &StartMakeScheduleScheme{
Job: job,
}
}
func NewStartMakeScheduleSchemeResp(taskID string) *StartMakeScheduleSchemeResp {
return &StartMakeScheduleSchemeResp{
TaskID: taskID,
}
}
func (c *Client) StartMakeScheduleScheme(msg *StartMakeScheduleScheme, opts ...mq.RequestOption) (*StartMakeScheduleSchemeResp, error) {
return mq.Request(Service.StartMakeScheduleScheme, c.rabbitCli, msg, opts...)
}

View File

@ -10,7 +10,7 @@ const (
)
type Service interface {
ApiService
TaskService
}
type Server struct {

View File

@ -0,0 +1,43 @@
package advisor
import (
"gitlink.org.cn/cloudream/common/pkgs/mq"
"gitlink.org.cn/cloudream/scheduler/common/models"
advtsk "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq/advisor/task"
)
type TaskService interface {
StartTask(msg *StartTask) (*StartTaskResp, *mq.CodeMessage)
}
// 启动一个任务
var _ = Register(Service.StartTask)
type StartTask struct {
mq.MessageBodyBase
Info advtsk.TaskInfo `json:"info"`
}
type StartTaskResp struct {
mq.MessageBodyBase
AdvisorID models.AdvisorID `json:"advisorID"`
TaskID string `json:"taskID"`
}
func NewStartTask(info advtsk.TaskInfo) *StartTask {
return &StartTask{
Info: info,
}
}
func NewStartTaskResp(advID models.AdvisorID, taskID string) *StartTaskResp {
return &StartTaskResp{
AdvisorID: advID,
TaskID: taskID,
}
}
func (c *Client) StartTask(msg *StartTask, opts ...mq.RequestOption) (*StartTaskResp, error) {
return mq.Request(Service.StartTask, c.rabbitCli, msg, opts...)
}
func init() {
mq.RegisterUnionType(advtsk.TaskInfoTypeUnion)
}

View File

@ -1,13 +1,18 @@
package task
import jobmod "gitlink.org.cn/cloudream/scheduler/common/models/job"
import (
jobmod "gitlink.org.cn/cloudream/scheduler/common/models/job"
)
type MakeAdjustScheme struct {
TaskInfoBase
Job jobmod.NormalJob `json:"job"`
}
func NewMakeAdjustScheme() *MakeAdjustScheme {
return &MakeAdjustScheme{}
func NewMakeAdjustScheme(job jobmod.NormalJob) *MakeAdjustScheme {
return &MakeAdjustScheme{
Job: job,
}
}
type MakeAdjustSchemeStatus struct {

View File

@ -16,17 +16,17 @@ var _ = Register(Service.GetOneResourceData)
type GetOneResourceData struct {
mq.MessageBodyBase
SlwNodeID int64 `json:"slwNodeID"`
ResourceType string `json:"type"`
SlwNodeID models.SlwNodeID `json:"slwNodeID"`
ResourceType string `json:"type"`
}
type GetOneResourceDataResp struct {
mq.MessageBodyBase
Data models.ResourceData `json:"data"`
}
func NewGetOneResourceData(nodeId int64, resourceType string) *GetOneResourceData {
func NewGetOneResourceData(nodeID models.SlwNodeID, resourceType string) *GetOneResourceData {
return &GetOneResourceData{
SlwNodeID: nodeId,
SlwNodeID: nodeID,
ResourceType: resourceType,
}
}
@ -44,14 +44,14 @@ var _ = Register(Service.GetAllResourceData)
type GetAllResourceData struct {
mq.MessageBodyBase
SlwNodeID int64 `json:"slwNodeID"`
SlwNodeID models.SlwNodeID `json:"slwNodeID"`
}
type GetAllResourceDataResp struct {
mq.MessageBodyBase
Datas []models.ResourceData `json:"datas"`
}
func NewGetAllResourceData(nodeId int64) *GetAllResourceData {
func NewGetAllResourceData(nodeId models.SlwNodeID) *GetAllResourceData {
return &GetAllResourceData{
SlwNodeID: nodeId,
}

View File

@ -16,14 +16,14 @@ var _ = Register(Service.GetSlwNodeInfo)
type GetSlwNodeInfo struct {
mq.MessageBodyBase
SlwNodeID int64 `json:"slwNodeID"`
SlwNodeID models.SlwNodeID `json:"slwNodeID"`
}
type GetSlwNodeInfoResp struct {
mq.MessageBodyBase
models.SlwNode
}
func NewGetSlwNodeInfo(slwNodeID int64) *GetSlwNodeInfo {
func NewGetSlwNodeInfo(slwNodeID models.SlwNodeID) *GetSlwNodeInfo {
return &GetSlwNodeInfo{
SlwNodeID: slwNodeID,
}

View File

@ -2,6 +2,7 @@ package executor
import (
"gitlink.org.cn/cloudream/common/pkgs/mq"
"gitlink.org.cn/cloudream/scheduler/common/models"
exectsk "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq/executor/task"
)
@ -18,8 +19,8 @@ type StartTask struct {
}
type StartTaskResp struct {
mq.MessageBodyBase
ExecutorID string `json:"executorID"`
TaskID string `json:"taskID"`
ExecutorID models.ExecutorID `json:"executorID"`
TaskID string `json:"taskID"`
}
func NewStartTask(info exectsk.TaskInfo) *StartTask {
@ -27,7 +28,7 @@ func NewStartTask(info exectsk.TaskInfo) *StartTask {
Info: info,
}
}
func NewStartTaskResp(execID string, taskID string) *StartTaskResp {
func NewStartTaskResp(execID models.ExecutorID, taskID string) *StartTaskResp {
return &StartTaskResp{
ExecutorID: execID,
TaskID: taskID,

View File

@ -4,10 +4,10 @@ import "gitlink.org.cn/cloudream/common/models"
type ScheduleTask struct {
TaskInfoBase
SlwNodeID int64 `json:"slwNodeID"`
Envs []models.EnvVar `json:"envs"`
SlwNodeImageID int64 `json:"slwNodeImageID"`
CMDLine string `json:"cmdLine"`
SlwNodeID models.SlwNodeID `json:"slwNodeID"`
Envs []models.EnvVar `json:"envs"`
SlwNodeImageID models.SlwNodeImageID `json:"slwNodeImageID"`
CMDLine string `json:"cmdLine"`
}
type ScheduleTaskStatus struct {
TaskStatusBase
@ -16,7 +16,7 @@ type ScheduleTaskStatus struct {
PCMJobID int64 `json:"pcmJobID"`
}
func NewScheduleTask(slwNodeID int64, envs []models.EnvVar, slwNodeImageID int64, cmdLine string) *ScheduleTask {
func NewScheduleTask(slwNodeID models.SlwNodeID, envs []models.EnvVar, slwNodeImageID models.SlwNodeImageID, cmdLine string) *ScheduleTask {
return &ScheduleTask{
SlwNodeID: slwNodeID,
Envs: envs,

View File

@ -1,24 +1,28 @@
package task
import (
"gitlink.org.cn/cloudream/common/models"
)
type UploadImage struct {
TaskInfoBase
SlwNodeID int64 `json:"slwNodeID"`
ImagePath string `json:"imagePath"`
SlwNodeID models.SlwNodeID `json:"slwNodeID"`
ImagePath string `json:"imagePath"`
}
type UploadImageStatus struct {
TaskStatusBase
Status string `json:"status"`
Error string `json:"error"`
ImageID int64 `json:"imageID"`
Status string `json:"status"`
Error string `json:"error"`
ImageID models.SlwNodeImageID `json:"imageID"`
}
func NewUploadImage(slwNodeID int64, imagePath string) *UploadImage {
func NewUploadImage(slwNodeID models.SlwNodeID, imagePath string) *UploadImage {
return &UploadImage{
SlwNodeID: slwNodeID,
ImagePath: imagePath,
}
}
func NewUploadImageStatus(status string, err string, imageID int64) *UploadImageStatus {
func NewUploadImageStatus(status string, err string, imageID models.SlwNodeImageID) *UploadImageStatus {
return &UploadImageStatus{
Status: status,
Error: err,

View File

@ -2,6 +2,7 @@ package manager
import (
"gitlink.org.cn/cloudream/common/pkgs/mq"
"gitlink.org.cn/cloudream/scheduler/common/models"
advtsk "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq/advisor/task"
exectsk "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq/executor/task"
)
@ -15,7 +16,7 @@ var _ = Register(Service.ReportAdvisorTaskStatus)
type ReportAdvisorTaskStatus struct {
mq.MessageBodyBase
AdvisorID string `json:"advisorID"`
AdvisorID models.AdvisorID `json:"advisorID"`
TaskStatus []AdvisorTaskStatus `json:"taskStatus"`
}
@ -27,14 +28,14 @@ type AdvisorTaskStatus struct {
Status advtsk.TaskStatus
}
func NewReportAdvisorTaskStatus(advisorID string, taskStatus []AdvisorTaskStatus) *ReportAdvisorTaskStatus {
func NewReportAdvisorTaskStatus(advisorID models.AdvisorID, taskStatus []AdvisorTaskStatus) *ReportAdvisorTaskStatus {
return &ReportAdvisorTaskStatus{
AdvisorID: advisorID,
TaskStatus: taskStatus,
}
}
func NewReportAdvisorTaskStatusResp() *ReportExecutorTaskStatusResp {
return &ReportExecutorTaskStatusResp{}
func NewReportAdvisorTaskStatusResp() *ReportAdvisorTaskStatusResp {
return &ReportAdvisorTaskStatusResp{}
}
func NewAdvisorTaskStatus(taskID string, status exectsk.TaskStatus) AdvisorTaskStatus {
return AdvisorTaskStatus{

View File

@ -2,6 +2,7 @@ package manager
import (
"gitlink.org.cn/cloudream/common/pkgs/mq"
"gitlink.org.cn/cloudream/scheduler/common/models"
exectsk "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq/executor/task"
)
@ -14,7 +15,7 @@ var _ = Register(Service.ReportExecutorTaskStatus)
type ReportExecutorTaskStatus struct {
mq.MessageBodyBase
ExecutorID string `json:"executorID"`
ExecutorID models.ExecutorID `json:"executorID"`
TaskStatus []ExecutorTaskStatus `json:"taskStatus"`
}
@ -26,7 +27,7 @@ type ExecutorTaskStatus struct {
Status exectsk.TaskStatus
}
func NewReportExecutorTaskStatus(executorID string, taskStatus []ExecutorTaskStatus) *ReportExecutorTaskStatus {
func NewReportExecutorTaskStatus(executorID models.ExecutorID, taskStatus []ExecutorTaskStatus) *ReportExecutorTaskStatus {
return &ReportExecutorTaskStatus{
ExecutorID: executorID,
TaskStatus: taskStatus,

View File

@ -2,15 +2,17 @@ package manager
import (
"gitlink.org.cn/cloudream/common/pkgs/mq"
"gitlink.org.cn/cloudream/scheduler/common/models"
schmod "gitlink.org.cn/cloudream/scheduler/common/models"
)
type ImageService interface {
CreateImage(msg *CreateImage) (*CreateImageResp, *mq.CodeMessage)
// CreateImage(msg *CreateImage) (*CreateImageResp, *mq.CodeMessage)
GetImageInfo(msg *GetImageInfo) (*GetImageInfoResp, *mq.CodeMessage)
}
/*
// 创建镜像
var _ = Register(Service.CreateImage)
@ -38,25 +40,26 @@ func NewCreateImageResp(imageID string) *CreateImageResp {
func (c *Client) CreateImage(msg *CreateImage, opts ...mq.RequestOption) (*CreateImageResp, error) {
return mq.Request(Service.CreateImage, c.rabbitCli, msg, opts...)
}
*/
// 查询镜像信息
var _ = Register(Service.GetImageInfo)
type GetImageInfo struct {
mq.MessageBodyBase
ImageID string `json:"imageID"`
ImageID models.ImageID `json:"imageID"`
}
type GetImageInfoResp struct {
mq.MessageBodyBase
schmod.ImageInfo
}
func NewGetImageInfo(imageID string) *GetImageInfo {
func NewGetImageInfo(imageID models.ImageID) *GetImageInfo {
return &GetImageInfo{
ImageID: imageID,
}
}
func NewGetImageInfoResp(imageID string, packageID int64, importingInfo []schmod.ImageImportingInfo) *GetImageInfoResp {
func NewGetImageInfoResp(imageID models.ImageID, packageID int64, importingInfo []schmod.ImageImportingInfo) *GetImageInfoResp {
return &GetImageInfoResp{
ImageInfo: schmod.ImageInfo{
ImageID: imageID,

View File

@ -13,7 +13,7 @@ type JobService interface {
GetJob(msg *GetJob) (*GetJobResp, *mq.CodeMessage)
GetJobSetJobs(msg *GetJobSetJobs) (*GetJobSetJobsResp, *mq.CodeMessage)
// GetJobSetJobs(msg *GetJobSetJobs) (*GetJobSetJobsResp, *mq.CodeMessage)
}
// 提交任务集
@ -26,7 +26,7 @@ type SubmitJobSet struct {
}
type SubmitJobSetResp struct {
mq.MessageBodyBase
JobSetID string `json:"jobSetID"`
JobSetID models.JobSetID `json:"jobSetID"`
}
func NewSubmitJobSet(jobSet models.JobSetInfo, preScheduleScheme jobmod.JobSetPreScheduleScheme) *SubmitJobSet {
@ -35,7 +35,7 @@ func NewSubmitJobSet(jobSet models.JobSetInfo, preScheduleScheme jobmod.JobSetPr
PreScheduleScheme: preScheduleScheme,
}
}
func NewSubmitJobSetResp(jobSetID string) *SubmitJobSetResp {
func NewSubmitJobSetResp(jobSetID models.JobSetID) *SubmitJobSetResp {
return &SubmitJobSetResp{
JobSetID: jobSetID,
}
@ -49,16 +49,16 @@ var _ = Register(Service.JobSetLocalFileUploaded)
type JobSetLocalFileUploaded struct {
mq.MessageBodyBase
JobSetID string `json:"jobSetID"`
LocalPath string `json:"localPath"`
Error string `json:"error"` // 如果上传文件失败,那么这个字段说明了失败原因
PackageID int64 `json:"packageID"` // 如果上传文件成功那么这个字段是上传之后得到的PackageID
JobSetID models.JobSetID `json:"jobSetID"`
LocalPath string `json:"localPath"`
Error string `json:"error"` // 如果上传文件失败,那么这个字段说明了失败原因
PackageID int64 `json:"packageID"` // 如果上传文件成功那么这个字段是上传之后得到的PackageID
}
type JobSetLocalFileUploadedResp struct {
mq.MessageBodyBase
}
func NewJobSetLocalFileUploaded(jobSetID string, localPath string, err string, packageID int64) *JobSetLocalFileUploaded {
func NewJobSetLocalFileUploaded(jobSetID models.JobSetID, localPath string, err string, packageID int64) *JobSetLocalFileUploaded {
return &JobSetLocalFileUploaded{
JobSetID: jobSetID,
LocalPath: localPath,
@ -76,14 +76,14 @@ func (c *Client) JobSetLocalFileUploaded(msg *JobSetLocalFileUploaded, opts ...m
// 获取任务数据
type GetJob struct {
mq.MessageBodyBase
JobID string `json:"jobID"`
JobID models.JobID `json:"jobID"`
}
type GetJobResp struct {
mq.MessageBodyBase
Job jobmod.Job `json:"job"`
}
func NewGetJob(jobID string) *GetJob {
func NewGetJob(jobID models.JobID) *GetJob {
return &GetJob{
JobID: jobID,
}
@ -97,6 +97,7 @@ func (c *Client) GetJob(msg *GetJob, opts ...mq.RequestOption) (*GetJobResp, err
return mq.Request(Service.GetJob, c.rabbitCli, msg, opts...)
}
/*
// 获取指定任务集中的所有任务数据
type GetJobSetJobs struct {
mq.MessageBodyBase
@ -120,6 +121,7 @@ func NewGetJobSetJobsResp(jobs []jobmod.Job) *GetJobSetJobsResp {
func (c *Client) GetJobSetJobs(msg *GetJobSetJobs, opts ...mq.RequestOption) (*GetJobSetJobsResp, error) {
return mq.Request(Service.GetJobSetJobs, c.rabbitCli, msg, opts...)
}
*/
func init() {
mq.RegisterUnionType(jobmod.JobTypeUnion)

View File

@ -1,9 +1,12 @@
package globals
import "github.com/google/uuid"
import (
"github.com/google/uuid"
schmod "gitlink.org.cn/cloudream/scheduler/common/models"
)
var ExecutorID string
var ExecutorID schmod.ExecutorID
func Init() {
ExecutorID = uuid.NewString()
ExecutorID = schmod.ExecutorID(uuid.NewString())
}

View File

@ -7,19 +7,20 @@ import (
"gitlink.org.cn/cloudream/common/pkgs/logger"
"gitlink.org.cn/cloudream/scheduler/common/globals"
schmod "gitlink.org.cn/cloudream/scheduler/common/models"
exectsk "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq/executor/task"
mgrmq "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq/manager"
)
type Reporter struct {
executorID string
executorID schmod.ExecutorID
reportInterval time.Duration
taskStatus map[string]exectsk.TaskStatus
taskStatusLock sync.Mutex
reportNow chan bool
}
func NewReporter(executorID string, reportInterval time.Duration) Reporter {
func NewReporter(executorID schmod.ExecutorID, reportInterval time.Duration) Reporter {
return Reporter{
executorID: executorID,
reportInterval: reportInterval,

View File

@ -1,18 +1,162 @@
package advisormgr
import advtsk "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq/advisor/task"
import (
"fmt"
"sync"
"time"
type OnReportedCallbackFn func(jobID string, fullTaskID string, taskStatus advtsk.TaskStatus)
"gitlink.org.cn/cloudream/common/models"
"gitlink.org.cn/cloudream/scheduler/common/globals"
schmod "gitlink.org.cn/cloudream/scheduler/common/models"
advmq "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq/advisor"
advtsk "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq/advisor/task"
mgrmq "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq/manager"
)
type Manager struct {
onReported OnReportedCallbackFn
type jobTask struct {
JobID models.JobID
TaskID string
FullTaskID string
}
func (m *Manager) OnReported(callback OnReportedCallbackFn) {
m.onReported = callback
type AdvisorInfo struct {
advisorID schmod.AdvisorID
jobTasks map[string]jobTask // key 为 TaskID
lastReportTime time.Time
}
type OnTaskUpdatedCallbackFn func(jobID models.JobID, fullTaskID string, taskStatus advtsk.TaskStatus)
type OnTimeoutCallbackFn func(jobID models.JobID, fullTaskID string)
type Manager struct {
advisors map[schmod.AdvisorID]*AdvisorInfo
lock sync.Mutex
advCli *advmq.PoolClient
onTaskUpdated OnTaskUpdatedCallbackFn
onTaskTimeout OnTimeoutCallbackFn
reportTimeout time.Duration
}
func NewManager(reportTimeout time.Duration) (*Manager, error) {
advCli, err := globals.AdvisorMQPool.Acquire()
if err != nil {
return nil, fmt.Errorf("new executor client: %w", err)
}
return &Manager{
advisors: make(map[schmod.AdvisorID]*AdvisorInfo),
advCli: advCli,
reportTimeout: reportTimeout,
}, nil
}
func (m *Manager) OnTaskUpdated(callback OnTaskUpdatedCallbackFn) {
m.onTaskUpdated = callback
}
func (m *Manager) OnTaskTimeout(callback OnTimeoutCallbackFn) {
m.onTaskTimeout = callback
}
func (m *Manager) Report(advID schmod.AdvisorID, taskStatus []mgrmq.AdvisorTaskStatus) {
m.lock.Lock()
defer m.lock.Unlock()
info, ok := m.advisors[advID]
if !ok {
info := &AdvisorInfo{
advisorID: advID,
jobTasks: make(map[string]jobTask),
}
m.advisors[advID] = info
}
info.lastReportTime = time.Now()
for _, s := range taskStatus {
tsk, ok := info.jobTasks[s.TaskID]
if !ok {
continue
}
m.onTaskUpdated(tsk.JobID, tsk.FullTaskID, s.Status)
}
}
// 启动一个Task并将其关联到指定的Job。返回一个在各Executor之间唯一的TaskID
func (m *Manager) StartTask(jobID string, info advtsk.TaskInfo) (string, error) {
func (m *Manager) StartTask(jobID models.JobID, info advtsk.TaskInfo) (string, error) {
m.lock.Lock()
defer m.lock.Unlock()
resp, err := m.advCli.StartTask(advmq.NewStartTask(info))
if err != nil {
return "", err
}
fullTaskID := fmt.Sprintf("%s-%s", resp.AdvisorID, resp.TaskID)
exeInfo, ok := m.advisors[resp.AdvisorID]
if !ok {
exeInfo = &AdvisorInfo{
advisorID: resp.AdvisorID,
jobTasks: make(map[string]jobTask),
lastReportTime: time.Now(),
}
m.advisors[resp.AdvisorID] = exeInfo
}
exeInfo.jobTasks[resp.TaskID] = jobTask{
JobID: jobID,
TaskID: resp.TaskID,
FullTaskID: fullTaskID,
}
return fullTaskID, nil
}
// 放弃对指定任务进度的等待。调用此函数不会停止任务执行,只是回调里不会再收到此任务的进度更新
func (m *Manager) ForgetTask(fullTaskID string) {
m.lock.Lock()
defer m.lock.Unlock()
for _, exe := range m.advisors {
for _, tsk := range exe.jobTasks {
if tsk.FullTaskID == fullTaskID {
delete(exe.jobTasks, fullTaskID)
return
}
}
}
}
func (m *Manager) Serve() error {
ticker := time.NewTicker(time.Second)
defer ticker.Stop()
for {
select {
case <-ticker.C:
func() {
m.lock.Lock()
defer m.lock.Unlock()
now := time.Now()
for exeID, exeInfo := range m.advisors {
dt := now.Sub(exeInfo.lastReportTime)
if dt < m.reportTimeout {
continue
}
for _, tsk := range exeInfo.jobTasks {
m.onTaskTimeout(tsk.JobID, tsk.FullTaskID)
}
delete(m.advisors, exeID)
}
}()
}
}
}

View File

@ -7,14 +7,15 @@ import (
)
type Config struct {
Logger logger.Config `json:"logger"`
RabbitMQ scmq.Config `json:"rabbitMQ"`
Logger logger.Config `json:"logger"`
RabbitMQ scmq.Config `json:"rabbitMQ"`
ReportTimeoutSecs int `json:"reportTimeoutSecs"`
}
var cfg Config
func Init() error {
return config.DefaultLoad("client", &cfg)
return config.DefaultLoad("manager", &cfg)
}
func Cfg() *Config {

View File

@ -1,18 +1,162 @@
package executormgr
import exectsk "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq/executor/task"
import (
"fmt"
"sync"
"time"
type OnReportedCallbackFn func(jobID string, fullTaskID string, taskStatus exectsk.TaskStatus)
"gitlink.org.cn/cloudream/common/models"
"gitlink.org.cn/cloudream/scheduler/common/globals"
schmod "gitlink.org.cn/cloudream/scheduler/common/models"
exemq "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq/executor"
exetsk "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq/executor/task"
mgrmq "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq/manager"
)
type Manager struct {
onReported OnReportedCallbackFn
type jobTask struct {
JobID models.JobID
TaskID string
FullTaskID string
}
func (m *Manager) OnReported(callback OnReportedCallbackFn) {
m.onReported = callback
type ExecutorInfo struct {
executorID schmod.ExecutorID
jobTasks map[string]jobTask // key 为 TaskID
lastReportTime time.Time
}
type OnTaskUpdatedCallbackFn func(jobID models.JobID, fullTaskID string, taskStatus exetsk.TaskStatus)
type OnTimeoutCallbackFn func(jobID models.JobID, fullTaskID string)
type Manager struct {
executors map[schmod.ExecutorID]*ExecutorInfo
lock sync.Mutex
exeCli *exemq.PoolClient
onTaskUpdated OnTaskUpdatedCallbackFn
onTaskTimeout OnTimeoutCallbackFn
reportTimeout time.Duration
}
func NewManager(reportTimeout time.Duration) (*Manager, error) {
exeCli, err := globals.ExecutorMQPool.Acquire()
if err != nil {
return nil, fmt.Errorf("new executor client: %w", err)
}
return &Manager{
executors: make(map[schmod.ExecutorID]*ExecutorInfo),
exeCli: exeCli,
reportTimeout: reportTimeout,
}, nil
}
func (m *Manager) OnTaskUpdated(callback OnTaskUpdatedCallbackFn) {
m.onTaskUpdated = callback
}
func (m *Manager) OnTaskTimeout(callback OnTimeoutCallbackFn) {
m.onTaskTimeout = callback
}
func (m *Manager) Report(execID schmod.ExecutorID, taskStatus []mgrmq.ExecutorTaskStatus) {
m.lock.Lock()
defer m.lock.Unlock()
info, ok := m.executors[execID]
if !ok {
info := &ExecutorInfo{
executorID: execID,
jobTasks: make(map[string]jobTask),
}
m.executors[execID] = info
}
info.lastReportTime = time.Now()
for _, s := range taskStatus {
tsk, ok := info.jobTasks[s.TaskID]
if !ok {
continue
}
m.onTaskUpdated(tsk.JobID, tsk.FullTaskID, s.Status)
}
}
// 启动一个Task并将其关联到指定的Job。返回一个在各Executor之间唯一的TaskID
func (m *Manager) StartTask(jobID string, info exectsk.TaskInfo) (string, error) {
func (m *Manager) StartTask(jobID models.JobID, info exetsk.TaskInfo) (string, error) {
m.lock.Lock()
defer m.lock.Unlock()
resp, err := m.exeCli.StartTask(exemq.NewStartTask(info))
if err != nil {
return "", err
}
fullTaskID := fmt.Sprintf("%s-%s", resp.ExecutorID, resp.TaskID)
exeInfo, ok := m.executors[resp.ExecutorID]
if !ok {
exeInfo = &ExecutorInfo{
executorID: resp.ExecutorID,
jobTasks: make(map[string]jobTask),
lastReportTime: time.Now(),
}
m.executors[resp.ExecutorID] = exeInfo
}
exeInfo.jobTasks[resp.TaskID] = jobTask{
JobID: jobID,
TaskID: resp.TaskID,
FullTaskID: fullTaskID,
}
return fullTaskID, nil
}
// 放弃对指定任务进度的等待。调用此函数不会停止任务执行,只是回调里不会再收到此任务的进度更新
func (m *Manager) ForgetTask(fullTaskID string) {
m.lock.Lock()
defer m.lock.Unlock()
for _, exe := range m.executors {
for _, tsk := range exe.jobTasks {
if tsk.FullTaskID == fullTaskID {
delete(exe.jobTasks, fullTaskID)
return
}
}
}
}
func (m *Manager) Serve() error {
ticker := time.NewTicker(time.Second)
defer ticker.Stop()
for {
select {
case <-ticker.C:
func() {
m.lock.Lock()
defer m.lock.Unlock()
now := time.Now()
for exeID, exeInfo := range m.executors {
dt := now.Sub(exeInfo.lastReportTime)
if dt < m.reportTimeout {
continue
}
for _, tsk := range exeInfo.jobTasks {
m.onTaskTimeout(tsk.JobID, tsk.FullTaskID)
}
delete(m.executors, exeID)
}
}()
}
}
}

View File

@ -1,18 +1,92 @@
package imagemgr
import schmods "gitlink.org.cn/cloudream/scheduler/common/models"
import (
"fmt"
"sync"
"gitlink.org.cn/cloudream/common/models"
schmod "gitlink.org.cn/cloudream/scheduler/common/models"
)
type Manager struct {
infos map[schmod.ImageID]*schmod.ImageInfo
imageIDIndex int64
lock sync.Mutex
}
func (m *Manager) GetImageInfo(imageID string) (*schmods.ImageInfo, error) {
func NewManager() (*Manager, error) {
return &Manager{
infos: make(map[schmod.ImageID]*schmod.ImageInfo),
}, nil
}
func (m *Manager) GetImageImportingInfo(imageID string, slwNodeID int64) (*schmods.ImageImportingInfo, error) {
func (m *Manager) GetImageInfo(imageID schmod.ImageID) (*schmod.ImageInfo, error) {
m.lock.Lock()
defer m.lock.Unlock()
info, ok := m.infos[imageID]
if !ok {
return nil, fmt.Errorf("image not found")
}
return info, nil
}
func (m *Manager) CreateImage(slwNodeImageID int64, packageID int64) (*schmods.ImageInfo, error) {
func (m *Manager) GetImageImportingInfo(imageID schmod.ImageID, slwNodeID models.SlwNodeID) (*schmod.ImageImportingInfo, error) {
m.lock.Lock()
defer m.lock.Unlock()
info, ok := m.infos[imageID]
if !ok {
return nil, fmt.Errorf("image not found")
}
for _, im := range info.ImportingInfos {
if im.SlwNodeID == slwNodeID {
return &im, nil
}
}
return nil, fmt.Errorf("no importing info for this slw node")
}
func (m *Manager) CreateImage(packageID int64) (*schmod.ImageInfo, error) {
m.lock.Lock()
defer m.lock.Unlock()
imageID := schmod.ImageID(fmt.Sprintf("%d", m.imageIDIndex))
m.imageIDIndex++
info := &schmod.ImageInfo{
ImageID: imageID,
PackageID: packageID,
}
m.infos[imageID] = info
return info, nil
}
func (m *Manager) AddImageImportingInfo(imageID schmod.ImageID, slwNodeID models.SlwNodeID, slwNodeImageID models.SlwNodeImageID) error {
m.lock.Lock()
defer m.lock.Unlock()
info, ok := m.infos[imageID]
if !ok {
return fmt.Errorf("image not found")
}
for _, im := range info.ImportingInfos {
if im.SlwNodeID == slwNodeID {
return fmt.Errorf("image had been imported to that slw node")
}
}
info.ImportingInfos = append(info.ImportingInfos, schmod.ImageImportingInfo{
SlwNodeID: slwNodeID,
SlwNodeImageID: slwNodeImageID,
})
return nil
}

View File

@ -23,7 +23,7 @@ type adjustingJob struct {
type AdjustingHandler struct {
mgr *Manager
jobs map[string]*adjustingJob
jobs map[models.JobID]*adjustingJob
cmdChan actor.CommandChannel
}
@ -31,7 +31,7 @@ type AdjustingHandler struct {
func NewAdjustingHandler(mgr *Manager) *AdjustingHandler {
return &AdjustingHandler{
mgr: mgr,
jobs: make(map[string]*adjustingJob),
jobs: make(map[models.JobID]*adjustingJob),
cmdChan: *actor.NewCommandChannel(),
}
}
@ -76,6 +76,11 @@ func (h *AdjustingHandler) Handle(job jobmod.Job) {
}
func (h *AdjustingHandler) onJobEvent(evt event.Event, job *adjustingJob) {
if cloneEvt, ok := evt.(*event.CloneJob); ok {
cloneEvt.Callback.SetValue(job.job.Clone())
return
}
err := h.doPackageScheduling(evt, job,
job.job.Info.Files.Dataset, &job.job.Files.Dataset,
&job.state.Scheme.Dataset, &job.state.Dataset,
@ -163,11 +168,17 @@ func (h *AdjustingHandler) doPackageScheduling(evt event.Event, job *adjustingJo
}
if state.Step == jobmod.StepMoving {
moveRet, ok := event.AssertExecutorTaskStatus[*exectsk.CacheMovePackageStatus](evt, state.FullTaskID)
if !ok {
moveRet, err := event.AssertExecutorTaskStatus[*exectsk.CacheMovePackageStatus](evt, state.FullTaskID)
if err == event.ErrUnconcernedTask {
return nil
}
if err == event.ErrTaskTimeout {
return fmt.Errorf("cache move package timeout")
}
h.mgr.execMgr.ForgetTask(state.FullTaskID)
if moveRet.Error != "" {
return fmt.Errorf("cache move pacakge: %s", moveRet.Error)
}
@ -177,11 +188,17 @@ func (h *AdjustingHandler) doPackageScheduling(evt event.Event, job *adjustingJo
}
if state.Step == jobmod.StepLoading {
loadRet, ok := event.AssertExecutorTaskStatus[*exectsk.StorageLoadPackageStatus](evt, state.FullTaskID)
if !ok {
loadRet, err := event.AssertExecutorTaskStatus[*exectsk.StorageLoadPackageStatus](evt, state.FullTaskID)
if err == event.ErrUnconcernedTask {
return nil
}
if err == event.ErrTaskTimeout {
return fmt.Errorf("storage load package timeout")
}
h.mgr.execMgr.ForgetTask(state.FullTaskID)
if loadRet.Error != "" {
return fmt.Errorf("storage load package: %s", loadRet.Error)
}
@ -218,11 +235,17 @@ func (h *AdjustingHandler) doImageScheduling(evt event.Event, job *adjustingJob,
}
if state.Step == jobmod.StepMoving {
cacheMoveRet, ok := event.AssertExecutorTaskStatus[*exectsk.CacheMovePackageStatus](evt, state.FullTaskID)
if !ok {
cacheMoveRet, err := event.AssertExecutorTaskStatus[*exectsk.CacheMovePackageStatus](evt, state.FullTaskID)
if err == event.ErrUnconcernedTask {
return nil
}
if err == event.ErrTaskTimeout {
return fmt.Errorf("cache move package timeout")
}
h.mgr.execMgr.ForgetTask(state.FullTaskID)
if cacheMoveRet.Error != "" {
return fmt.Errorf("cache move pacakge: %s", cacheMoveRet.Error)
}
@ -242,21 +265,27 @@ func (h *AdjustingHandler) doImageScheduling(evt event.Event, job *adjustingJob,
}
if state.Step == jobmod.StepImageImporting {
uploadImageRet, ok := event.AssertExecutorTaskStatus[*exectsk.UploadImageStatus](evt, state.FullTaskID)
if !ok {
uploadImageRet, err := event.AssertExecutorTaskStatus[*exectsk.UploadImageStatus](evt, state.FullTaskID)
if err == event.ErrUnconcernedTask {
return nil
}
if err == event.ErrTaskTimeout {
return fmt.Errorf("import image timeout")
}
h.mgr.execMgr.ForgetTask(state.FullTaskID)
if uploadImageRet.Error != "" {
return fmt.Errorf("import image: %s", uploadImageRet.Error)
}
info, err := h.mgr.imageMgr.CreateImage(uploadImageRet.ImageID, file.PackageID)
// 调整过程中不会更换镜像所以ImageID不会发生变化
err = h.mgr.imageMgr.AddImageImportingInfo(file.ImageID, job.slwNodeInfo.ID, uploadImageRet.ImageID)
if err != nil {
return fmt.Errorf("creating image info: %w", err)
}
file.ImageID = info.ImageID
state.Step = jobmod.StepCompleted
return nil
}

View File

@ -40,7 +40,10 @@ func (h *CompleteHandler) handleFailed(job jobmod.Job, state *jobmod.StateFailed
}
func (h *CompleteHandler) OnEvent(broadcast event.Broadcast, evt event.Event) {
if cloneEvt, ok := evt.(*event.CloneJob); ok {
cloneEvt.Callback.SetError(fmt.Errorf("job not found"))
return
}
}
func (h *CompleteHandler) Serve() {

View File

@ -1,35 +0,0 @@
package event
import advtsk "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq/advisor/task"
// advisor上报任务进度
type AdvisorReportTaskStatus struct {
FullTaskID string
TaskStatus advtsk.TaskStatus
}
func NewAdvisorReportTaskStatusCmd(fullTaskID string, taskStatus advtsk.TaskStatus) *AdvisorReportTaskStatus {
return &AdvisorReportTaskStatus{
FullTaskID: fullTaskID,
TaskStatus: taskStatus,
}
}
func AssertAdvisorTaskStatus[T advtsk.TaskStatus](cmd Event, fullTaskID string) (T, bool) {
var ret T
if cmd == nil {
return ret, false
}
execTaskCmd, ok := cmd.(*AdvisorReportTaskStatus)
if !ok {
return ret, false
}
if execTaskCmd.FullTaskID != fullTaskID {
return ret, false
}
status, ok := execTaskCmd.TaskStatus.(T)
return status, ok
}

View File

@ -0,0 +1,12 @@
package event
// advisor的任务执行超时
type AdvisorTaskTimeout struct {
FullTaskID string
}
func NewAdvisorTaskTimeout(fullTaskID string) *AdvisorTaskTimeout {
return &AdvisorTaskTimeout{
FullTaskID: fullTaskID,
}
}

View File

@ -0,0 +1,46 @@
package event
import advtsk "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq/advisor/task"
// advisor上报任务进度
type AdvisorTaskUpdated struct {
FullTaskID string
TaskStatus advtsk.TaskStatus
}
func NewAdvisorTaskUpdated(fullTaskID string, taskStatus advtsk.TaskStatus) *AdvisorTaskUpdated {
return &AdvisorTaskUpdated{
FullTaskID: fullTaskID,
TaskStatus: taskStatus,
}
}
func AssertAdvisorTaskStatus[T advtsk.TaskStatus](evt Event, fullTaskID string) (T, error) {
var ret T
if evt == nil {
return ret, ErrUnconcernedTask
}
if reportTaskStatus, ok := evt.(*AdvisorTaskUpdated); ok {
if reportTaskStatus.FullTaskID != fullTaskID {
return ret, ErrUnconcernedTask
}
status, ok := reportTaskStatus.TaskStatus.(T)
if !ok {
return ret, ErrUnconcernedTask
}
return status, nil
}
if taskTimeout, ok := evt.(*AdvisorTaskTimeout); ok {
if taskTimeout.FullTaskID != fullTaskID {
return ret, ErrUnconcernedTask
}
return ret, ErrTaskTimeout
}
return ret, ErrUnconcernedTask
}

View File

@ -0,0 +1,14 @@
package event
import (
"gitlink.org.cn/cloudream/common/pkgs/future"
jobmod "gitlink.org.cn/cloudream/scheduler/common/models/job"
)
type CloneJob struct {
Callback future.SetValueFuture[jobmod.Job]
}
func NewCloneJob() *CloneJob {
return &CloneJob{}
}

View File

@ -1,5 +1,15 @@
package event
import (
"errors"
"gitlink.org.cn/cloudream/common/models"
)
var ErrUnconcernedTask = errors.New("unconcerned task")
var ErrTaskTimeout = errors.New("task timeout")
type Event interface{}
type BroadcastType string
@ -12,8 +22,8 @@ const (
type Broadcast struct {
Type BroadcastType
JobSetID string
JobID string
JobSetID models.JobSetID
JobID models.JobID
}
func (b *Broadcast) ToAll() bool {
@ -34,14 +44,14 @@ func ToAll() Broadcast {
}
}
func ToJobSet(jobSetID string) Broadcast {
func ToJobSet(jobSetID models.JobSetID) Broadcast {
return Broadcast{
Type: BroadcastJobSet,
JobSetID: jobSetID,
}
}
func ToJob(jobID string) Broadcast {
func ToJob(jobID models.JobID) Broadcast {
return Broadcast{
Type: BroadcastJob,
JobID: jobID,

View File

@ -1,37 +0,0 @@
package event
import (
exectsk "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq/executor/task"
)
// executor上报任务进度
type ExecutorReportTaskStatus struct {
FullTaskID string
TaskStatus exectsk.TaskStatus
}
func NewExecutorReportTaskStatus(fullTaskID string, taskStatus exectsk.TaskStatus) *ExecutorReportTaskStatus {
return &ExecutorReportTaskStatus{
FullTaskID: fullTaskID,
TaskStatus: taskStatus,
}
}
func AssertExecutorTaskStatus[T exectsk.TaskStatus](cmd Event, fullTaskID string) (T, bool) {
var ret T
if cmd == nil {
return ret, false
}
execTaskCmd, ok := cmd.(*ExecutorReportTaskStatus)
if !ok {
return ret, false
}
if execTaskCmd.FullTaskID != fullTaskID {
return ret, false
}
status, ok := execTaskCmd.TaskStatus.(T)
return status, ok
}

View File

@ -0,0 +1,12 @@
package event
// executor的任务执行超时
type ExecutorTaskTimeout struct {
FullTaskID string
}
func NewExecutorTaskTimeout(fullTaskID string) *ExecutorTaskTimeout {
return &ExecutorTaskTimeout{
FullTaskID: fullTaskID,
}
}

View File

@ -0,0 +1,48 @@
package event
import (
exectsk "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq/executor/task"
)
// executor上报任务进度
type ExecutorTaskUpdated struct {
FullTaskID string
TaskStatus exectsk.TaskStatus
}
func NewExecutorTaskUpdated(fullTaskID string, taskStatus exectsk.TaskStatus) *ExecutorTaskUpdated {
return &ExecutorTaskUpdated{
FullTaskID: fullTaskID,
TaskStatus: taskStatus,
}
}
func AssertExecutorTaskStatus[T exectsk.TaskStatus](evt Event, fullTaskID string) (T, error) {
var ret T
if evt == nil {
return ret, ErrUnconcernedTask
}
if reportTaskStatus, ok := evt.(*ExecutorTaskUpdated); ok {
if reportTaskStatus.FullTaskID != fullTaskID {
return ret, ErrUnconcernedTask
}
status, ok := reportTaskStatus.TaskStatus.(T)
if !ok {
return ret, ErrUnconcernedTask
}
return status, nil
}
if taskTimeout, ok := evt.(*ExecutorTaskTimeout); ok {
if taskTimeout.FullTaskID != fullTaskID {
return ret, ErrUnconcernedTask
}
return ret, ErrTaskTimeout
}
return ret, ErrUnconcernedTask
}

View File

@ -1,14 +1,16 @@
package event
import "gitlink.org.cn/cloudream/common/models"
// 本地文件上传结束
type LocalFileUploaded struct {
JobSetID string
JobSetID models.JobSetID
LocalPath string
Error string
PackageID int64
}
func NewLocalFileUploaded(jobSetID string, localPath string, err string, packageID int64) *LocalFileUploaded {
func NewLocalFileUploaded(jobSetID models.JobSetID, localPath string, err string, packageID int64) *LocalFileUploaded {
return &LocalFileUploaded{
JobSetID: jobSetID,
LocalPath: localPath,

View File

@ -4,6 +4,7 @@ import (
"fmt"
"reflect"
"gitlink.org.cn/cloudream/common/models"
"gitlink.org.cn/cloudream/common/pkgs/actor"
"gitlink.org.cn/cloudream/scheduler/common/globals"
jobmod "gitlink.org.cn/cloudream/scheduler/common/models/job"
@ -20,7 +21,7 @@ type executingJob struct {
type ExecutingHandler struct {
mgr *Manager
jobs map[string]*executingJob
jobs map[models.JobID]*executingJob
cmdChan actor.CommandChannel
}
@ -28,7 +29,7 @@ type ExecutingHandler struct {
func NewExecutingHandler(mgr *Manager) *ExecutingHandler {
return &ExecutingHandler{
mgr: mgr,
jobs: make(map[string]*executingJob),
jobs: make(map[models.JobID]*executingJob),
cmdChan: *actor.NewCommandChannel(),
}
}
@ -52,6 +53,11 @@ func (h *ExecutingHandler) Handle(job jobmod.Job) {
}
func (h *ExecutingHandler) onJobEvent(evt event.Event, job *executingJob) {
if cloneEvt, ok := evt.(*event.CloneJob); ok {
cloneEvt.Callback.SetValue(job.job.Clone())
return
}
if norJob, ok := job.job.(*jobmod.NormalJob); ok {
h.onNormalJobEvent(evt, job, norJob)
} else if resJob, ok := job.job.(*jobmod.ResourceJob); ok {
@ -82,7 +88,14 @@ func (h *ExecutingHandler) onNormalJobEvent(evt event.Event, job *executingJob,
job.state.FullTaskID = fullTaskID
}
if execRet, ok := event.AssertExecutorTaskStatus[*exetsk.ScheduleTaskStatus](evt, job.state.FullTaskID); ok {
if execRet, err := event.AssertExecutorTaskStatus[*exetsk.ScheduleTaskStatus](evt, job.state.FullTaskID); err != event.ErrUnconcernedTask {
if err == event.ErrTaskTimeout {
h.changeJobState(job.job, jobmod.NewStateFailed("schedule task timeout", job.state))
return
}
h.mgr.execMgr.ForgetTask(job.state.FullTaskID)
if execRet.Error != "" {
h.changeJobState(job.job, jobmod.NewStateFailed(execRet.Error, job.state))
return
@ -157,7 +170,14 @@ func (h *ExecutingHandler) onResourceJobEvent(evt event.Event, job *executingJob
job.state.FullTaskID = fullTaskID
}
if createRet, ok := event.AssertAdvisorTaskStatus[*exetsk.StorageCreatePackageStatus](evt, job.state.FullTaskID); ok {
if createRet, err := event.AssertExecutorTaskStatus[*exetsk.StorageCreatePackageStatus](evt, job.state.FullTaskID); err != event.ErrUnconcernedTask {
if err == event.ErrTaskTimeout {
h.changeJobState(job.job, jobmod.NewStateFailed("storage create package timeout", job.state))
return
}
h.mgr.execMgr.ForgetTask(job.state.FullTaskID)
if createRet.Error != "" {
h.changeJobState(job.job, jobmod.NewStateFailed(createRet.Error, job.state))
return

View File

@ -1,6 +1,7 @@
package jobmgr
import (
"context"
"fmt"
"reflect"
"sync"
@ -33,9 +34,9 @@ type Manager struct {
defaultHandler StateHandler
jobSetIDIndex int
jobSets map[string]*jobmod.JobSet
jobSets map[models.JobSetID]*jobmod.JobSet
jobIDIndex int
jobs map[string]*mgrJob
jobs map[models.JobID]*mgrJob
}
func NewManager(execMgr *executormgr.Manager, advMgr *advisormgr.Manager, imageMgr *imagemgr.Manager) (*Manager, error) {
@ -45,9 +46,11 @@ func NewManager(execMgr *executormgr.Manager, advMgr *advisormgr.Manager, imageM
imageMgr: imageMgr,
}
execMgr.OnReported(mgr.ExecutorReportTaskStatus)
execMgr.OnTaskUpdated(mgr.executorTaskUpdated)
execMgr.OnTaskTimeout(mgr.executorTaskTimeout)
advMgr.OnReported(mgr.AdvisorReportTaskStatus)
advMgr.OnTaskUpdated(mgr.advisorTaskUpdated)
advMgr.OnTaskTimeout(mgr.advisorTaskTimeout)
mgr.handlers[myreflect.TypeOf[jobmod.StatePreScheduling]()] = NewPreSchedulingHandler(mgr)
mgr.handlers[myreflect.TypeOf[jobmod.StateReadyToAdjust]()] = NewReadyToAdjustHandler(mgr)
@ -85,14 +88,14 @@ func (m *Manager) SubmitJobSet(jobSetInfo models.JobSetInfo, preScheduleScheme j
m.pubLock.Lock()
defer m.pubLock.Unlock()
jobSetID := fmt.Sprintf("%d", m.jobSetIDIndex)
jobSetID := models.JobSetID(fmt.Sprintf("%d", m.jobSetIDIndex))
var jobs []jobmod.Job
var normalJobs []*jobmod.NormalJob
var resJobs []*jobmod.ResourceJob
var jobRefs []jobmod.JobSetJobRef
for i, jobInfo := range jobSetInfo.Jobs {
jobID := fmt.Sprintf("%d", m.jobIDIndex+i)
jobID := models.JobID(fmt.Sprintf("%d", m.jobIDIndex+i))
switch info := jobInfo.(type) {
case *models.NormalJobInfo:
@ -144,7 +147,7 @@ func (m *Manager) SubmitJobSet(jobSetInfo models.JobSetInfo, preScheduleScheme j
return jobSet, nil
}
func (m *Manager) LocalFileUploaded(jobSetID string, localPath string, err string, packageID int64) error {
func (m *Manager) LocalFileUploaded(jobSetID models.JobSetID, localPath string, err string, packageID int64) error {
m.pubLock.Lock()
defer m.pubLock.Unlock()
@ -155,7 +158,7 @@ func (m *Manager) LocalFileUploaded(jobSetID string, localPath string, err strin
return nil
}
func (m *Manager) ExecutorReportTaskStatus(jobID string, fullTaskID string, taskStatus exectsk.TaskStatus) {
func (m *Manager) executorTaskUpdated(jobID models.JobID, fullTaskID string, taskStatus exectsk.TaskStatus) {
m.pubLock.Lock()
defer m.pubLock.Unlock()
@ -164,10 +167,10 @@ func (m *Manager) ExecutorReportTaskStatus(jobID string, fullTaskID string, task
return
}
job.Handler.OnEvent(event.ToJob(jobID), event.NewExecutorReportTaskStatus(fullTaskID, taskStatus))
job.Handler.OnEvent(event.ToJob(jobID), event.NewExecutorTaskUpdated(fullTaskID, taskStatus))
}
func (m *Manager) AdvisorReportTaskStatus(jobID string, fullTaskID string, taskStatus advtsk.TaskStatus) {
func (m *Manager) executorTaskTimeout(jobID models.JobID, fullTaskID string) {
m.pubLock.Lock()
defer m.pubLock.Unlock()
@ -176,7 +179,47 @@ func (m *Manager) AdvisorReportTaskStatus(jobID string, fullTaskID string, taskS
return
}
job.Handler.OnEvent(event.ToJob(jobID), event.NewAdvisorReportTaskStatusCmd(fullTaskID, taskStatus))
job.Handler.OnEvent(event.ToJob(jobID), event.NewExecutorTaskTimeout(fullTaskID))
}
func (m *Manager) advisorTaskUpdated(jobID models.JobID, fullTaskID string, taskStatus advtsk.TaskStatus) {
m.pubLock.Lock()
defer m.pubLock.Unlock()
job, ok := m.jobs[jobID]
if !ok {
return
}
job.Handler.OnEvent(event.ToJob(jobID), event.NewAdvisorTaskUpdated(fullTaskID, taskStatus))
}
func (m *Manager) advisorTaskTimeout(jobID models.JobID, fullTaskID string) {
m.pubLock.Lock()
defer m.pubLock.Unlock()
job, ok := m.jobs[jobID]
if !ok {
return
}
job.Handler.OnEvent(event.ToJob(jobID), event.NewAdvisorTaskTimeout(fullTaskID))
}
func (m *Manager) CloneJob(jobID models.JobID) (jobmod.Job, error) {
m.pubLock.Lock()
job, ok := m.jobs[jobID]
if !ok {
m.pubLock.Unlock()
return nil, fmt.Errorf("job not found")
}
evt := event.NewCloneJob()
job.Handler.OnEvent(event.ToJob(jobID), evt)
m.pubLock.Unlock()
return evt.Callback.WaitValue(context.Background())
}
// 根据job状态选择handler进行处理。需要加锁

View File

@ -4,6 +4,7 @@ import (
"fmt"
"reflect"
"gitlink.org.cn/cloudream/common/models"
"gitlink.org.cn/cloudream/common/pkgs/actor"
jobmod "gitlink.org.cn/cloudream/scheduler/common/models/job"
advtsk "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq/advisor/task"
@ -11,14 +12,14 @@ import (
)
type makingAdjustSchemeJob struct {
job jobmod.Job
job *jobmod.NormalJob
state *jobmod.StateMakingAdjustScheme
}
type MakingAdjustSchemeHandler struct {
mgr *Manager
jobs map[string]*makingAdjustSchemeJob
jobs map[models.JobID]*makingAdjustSchemeJob
cmdChan actor.CommandChannel
}
@ -26,13 +27,19 @@ type MakingAdjustSchemeHandler struct {
func NewMakingAdjustSchemeHandler(mgr *Manager) *MakingAdjustSchemeHandler {
return &MakingAdjustSchemeHandler{
mgr: mgr,
jobs: make(map[string]*makingAdjustSchemeJob),
jobs: make(map[models.JobID]*makingAdjustSchemeJob),
cmdChan: *actor.NewCommandChannel(),
}
}
func (h *MakingAdjustSchemeHandler) Handle(job jobmod.Job) {
h.cmdChan.Send(func() {
norJob, ok := job.(*jobmod.NormalJob)
if !ok {
h.changeJobState(job, jobmod.NewStateFailed(fmt.Sprintf("unknow job: %v", reflect.TypeOf(job)), job.GetState()))
return
}
state, ok := job.GetState().(*jobmod.StateMakingAdjustScheme)
if !ok {
h.changeJobState(job, jobmod.NewStateFailed(fmt.Sprintf("unknow state: %v", reflect.TypeOf(job.GetState())), job.GetState()))
@ -40,7 +47,7 @@ func (h *MakingAdjustSchemeHandler) Handle(job jobmod.Job) {
}
rjob := &makingAdjustSchemeJob{
job: job,
job: norJob,
state: state,
}
h.jobs[job.GetJobID()] = rjob
@ -50,8 +57,13 @@ func (h *MakingAdjustSchemeHandler) Handle(job jobmod.Job) {
}
func (h *MakingAdjustSchemeHandler) onJobEvent(evt event.Event, job *makingAdjustSchemeJob) {
if cloneEvt, ok := evt.(*event.CloneJob); ok {
cloneEvt.Callback.SetValue(job.job.Clone())
return
}
if job.state.FullTaskID == "" {
fullTaskID, err := h.mgr.advMgr.StartTask(job.job.GetJobID(), advtsk.NewMakeAdjustScheme())
fullTaskID, err := h.mgr.advMgr.StartTask(job.job.GetJobID(), advtsk.NewMakeAdjustScheme(*job.job))
if err != nil {
h.changeJobState(job.job, jobmod.NewStateFailed(err.Error(), job.state))
return
@ -60,7 +72,14 @@ func (h *MakingAdjustSchemeHandler) onJobEvent(evt event.Event, job *makingAdjus
job.state.FullTaskID = fullTaskID
}
if makingRet, ok := event.AssertAdvisorTaskStatus[*advtsk.MakeAdjustSchemeStatus](evt, job.state.FullTaskID); ok {
if makingRet, err := event.AssertAdvisorTaskStatus[*advtsk.MakeAdjustSchemeStatus](evt, job.state.FullTaskID); err != event.ErrUnconcernedTask {
if err == event.ErrTaskTimeout {
h.changeJobState(job.job, jobmod.NewStateFailed("make adjust scheme timeout", job.state))
return
}
h.mgr.advMgr.ForgetTask(job.state.FullTaskID)
if makingRet.Error != "" {
h.changeJobState(job.job, jobmod.NewStateFailed(makingRet.Error, job.state))
return

View File

@ -26,7 +26,7 @@ type preSchedulingJob struct {
type PreSchedulingHandler struct {
mgr *Manager
jobs map[string]*preSchedulingJob
jobs map[models.JobID]*preSchedulingJob
cmdChan actor.CommandChannel
}
@ -34,7 +34,7 @@ type PreSchedulingHandler struct {
func NewPreSchedulingHandler(mgr *Manager) *PreSchedulingHandler {
return &PreSchedulingHandler{
mgr: mgr,
jobs: make(map[string]*preSchedulingJob),
jobs: make(map[models.JobID]*preSchedulingJob),
cmdChan: *actor.NewCommandChannel(),
}
}
@ -79,6 +79,11 @@ func (h *PreSchedulingHandler) Handle(job jobmod.Job) {
}
func (h *PreSchedulingHandler) onJobEvent(evt event.Event, job *preSchedulingJob) {
if cloneEvt, ok := evt.(*event.CloneJob); ok {
cloneEvt.Callback.SetValue(job.job.Clone())
return
}
err := h.doPackageScheduling(evt, job,
job.job.Info.Files.Dataset, &job.job.Files.Dataset,
&job.state.Scheme.Dataset, &job.state.Dataset,
@ -194,11 +199,17 @@ func (h *PreSchedulingHandler) doPackageScheduling(evt event.Event, job *preSche
}
if state.Step == jobmod.StepMoving {
moveRet, ok := event.AssertExecutorTaskStatus[*exectsk.CacheMovePackageStatus](evt, state.FullTaskID)
if !ok {
moveRet, err := event.AssertExecutorTaskStatus[*exectsk.CacheMovePackageStatus](evt, state.FullTaskID)
if err == event.ErrUnconcernedTask {
return nil
}
if err == event.ErrTaskTimeout {
return fmt.Errorf("cache move package timeout")
}
h.mgr.execMgr.ForgetTask(state.FullTaskID)
if moveRet.Error != "" {
return fmt.Errorf("cache move pacakge: %s", moveRet.Error)
}
@ -208,11 +219,17 @@ func (h *PreSchedulingHandler) doPackageScheduling(evt event.Event, job *preSche
}
if state.Step == jobmod.StepLoading {
loadRet, ok := event.AssertExecutorTaskStatus[*exectsk.StorageLoadPackageStatus](evt, state.FullTaskID)
if !ok {
loadRet, err := event.AssertExecutorTaskStatus[*exectsk.StorageLoadPackageStatus](evt, state.FullTaskID)
if err == event.ErrUnconcernedTask {
return nil
}
if err == event.ErrTaskTimeout {
return fmt.Errorf("storage load package timeout")
}
h.mgr.execMgr.ForgetTask(state.FullTaskID)
if loadRet.Error != "" {
return fmt.Errorf("storage load package: %s", loadRet.Error)
}
@ -259,6 +276,14 @@ func (h *PreSchedulingHandler) doImageScheduling(evt event.Event, job *preSchedu
return nil
}
// 上传完毕,则可以新建一个空的镜像的记录
info, err := h.mgr.imageMgr.CreateImage(localFileCmd.PackageID)
if err != nil {
return fmt.Errorf("creating image info: %w", err)
}
// 填充ImageID和PackageID
file.ImageID = info.ImageID
file.PackageID = localFileCmd.PackageID
state.Step = jobmod.StepUploaded
}
@ -284,11 +309,17 @@ func (h *PreSchedulingHandler) doImageScheduling(evt event.Event, job *preSchedu
}
if state.Step == jobmod.StepMoving {
cacheMoveRet, ok := event.AssertExecutorTaskStatus[*exectsk.CacheMovePackageStatus](evt, state.FullTaskID)
if !ok {
cacheMoveRet, err := event.AssertExecutorTaskStatus[*exectsk.CacheMovePackageStatus](evt, state.FullTaskID)
if err == event.ErrUnconcernedTask {
return nil
}
if err == event.ErrTaskTimeout {
return fmt.Errorf("cache move package timeout")
}
h.mgr.execMgr.ForgetTask(state.FullTaskID)
if cacheMoveRet.Error != "" {
return fmt.Errorf("cache move pacakge: %s", cacheMoveRet.Error)
}
@ -308,21 +339,26 @@ func (h *PreSchedulingHandler) doImageScheduling(evt event.Event, job *preSchedu
}
if state.Step == jobmod.StepImageImporting {
uploadImageRet, ok := event.AssertExecutorTaskStatus[*exectsk.UploadImageStatus](evt, state.FullTaskID)
if !ok {
uploadImageRet, err := event.AssertExecutorTaskStatus[*exectsk.UploadImageStatus](evt, state.FullTaskID)
if err == event.ErrUnconcernedTask {
return nil
}
if err == event.ErrTaskTimeout {
return fmt.Errorf("import image timeout")
}
h.mgr.execMgr.ForgetTask(state.FullTaskID)
if uploadImageRet.Error != "" {
return fmt.Errorf("import image: %s", uploadImageRet.Error)
}
info, err := h.mgr.imageMgr.CreateImage(uploadImageRet.ImageID, file.PackageID)
err = h.mgr.imageMgr.AddImageImportingInfo(file.ImageID, job.slwNodeInfo.ID, uploadImageRet.ImageID)
if err != nil {
return fmt.Errorf("creating image info: %w", err)
return fmt.Errorf("adding image importing info: %w", err)
}
file.ImageID = info.ImageID
state.Step = jobmod.StepCompleted
return nil
}

View File

@ -18,7 +18,7 @@ type readyToAdjustJob struct {
type ReadyToAdjustHandler struct {
mgr *Manager
jobs map[string]*readyToAdjustJob
jobs map[models.JobID]*readyToAdjustJob
cmdChan actor.CommandChannel
}
@ -26,7 +26,7 @@ type ReadyToAdjustHandler struct {
func NewReadyToAdjustHandler(mgr *Manager) *ReadyToAdjustHandler {
return &ReadyToAdjustHandler{
mgr: mgr,
jobs: make(map[string]*readyToAdjustJob),
jobs: make(map[models.JobID]*readyToAdjustJob),
cmdChan: *actor.NewCommandChannel(),
}
}
@ -50,6 +50,11 @@ func (h *ReadyToAdjustHandler) Handle(job jobmod.Job) {
}
func (h *ReadyToAdjustHandler) onJobEvent(evt event.Event, job *readyToAdjustJob) {
if cloneEvt, ok := evt.(*event.CloneJob); ok {
cloneEvt.Callback.SetValue(job.job.Clone())
return
}
if norJob, ok := job.job.(*jobmod.NormalJob); ok {
h.onNormalJobEvent(evt, job, norJob)
} else if resJob, ok := job.job.(*jobmod.ResourceJob); ok {

View File

@ -4,6 +4,7 @@ import (
"fmt"
"reflect"
"gitlink.org.cn/cloudream/common/models"
"gitlink.org.cn/cloudream/common/pkgs/actor"
jobmod "gitlink.org.cn/cloudream/scheduler/common/models/job"
"gitlink.org.cn/cloudream/scheduler/manager/internal/jobmgr/event"
@ -17,7 +18,7 @@ type readyToExecuteJob struct {
type ReadyToExecuteHandler struct {
mgr *Manager
jobs map[string]*readyToExecuteJob
jobs map[models.JobID]*readyToExecuteJob
cmdChan actor.CommandChannel
}
@ -25,7 +26,7 @@ type ReadyToExecuteHandler struct {
func NewReadyToExecuteHandler(mgr *Manager) *ReadyToExecuteHandler {
return &ReadyToExecuteHandler{
mgr: mgr,
jobs: make(map[string]*readyToExecuteJob),
jobs: make(map[models.JobID]*readyToExecuteJob),
cmdChan: *actor.NewCommandChannel(),
}
}
@ -49,6 +50,11 @@ func (h *ReadyToExecuteHandler) Handle(job jobmod.Job) {
}
func (h *ReadyToExecuteHandler) onJobEvent(evt event.Event, job *readyToExecuteJob) {
if cloneEvt, ok := evt.(*event.CloneJob); ok {
cloneEvt.Callback.SetValue(job.job.Clone())
return
}
if norJob, ok := job.job.(*jobmod.NormalJob); ok {
h.onNormalJobEvent(evt, job, norJob)
} else if resJob, ok := job.job.(*jobmod.ResourceJob); ok {

View File

@ -6,5 +6,6 @@ import (
)
func (svc *Service) ReportAdvisorTaskStatus(msg *mgrmq.ReportAdvisorTaskStatus) (*mgrmq.ReportAdvisorTaskStatusResp, *mq.CodeMessage) {
svc.advMgr.Report(msg.AdvisorID, msg.TaskStatus)
return mq.ReplyOK(mgrmq.NewReportAdvisorTaskStatusResp())
}

View File

@ -6,5 +6,6 @@ import (
)
func (svc *Service) ReportExecutorTaskStatus(msg *mgrmq.ReportExecutorTaskStatus) (*mgrmq.ReportExecutorTaskStatusResp, *mq.CodeMessage) {
svc.exeMgr.Report(msg.ExecutorID, msg.TaskStatus)
return mq.ReplyOK(mgrmq.NewReportExecutorTaskStatusResp())
}

View File

@ -1,14 +1,18 @@
package mq
import (
"gitlink.org.cn/cloudream/common/consts/errorcode"
"gitlink.org.cn/cloudream/common/pkgs/logger"
"gitlink.org.cn/cloudream/common/pkgs/mq"
mgrmq "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq/manager"
)
func (svc *Service) CreateImage(msg *mgrmq.CreateImage) (*mgrmq.CreateImageResp, *mq.CodeMessage) {
}
func (svc *Service) GetImageInfo(msg *mgrmq.GetImageInfo) (*mgrmq.GetImageInfoResp, *mq.CodeMessage) {
info, err := svc.imgMgr.GetImageInfo(msg.ImageID)
if err != nil {
logger.WithField("ImageID", msg.ImageID).Warnf("getting image info: %s", err.Error())
return nil, mq.Failed(errorcode.OperationFailed, "get image info failed")
}
return mq.ReplyOK(mgrmq.NewGetImageInfoResp(info.ImageID, info.PackageID, info.ImportingInfos))
}

View File

@ -9,7 +9,7 @@ import (
// 提交任务集
func (svc *Service) SubmitJobSet(msg *mgrmq.SubmitJobSet) (*mgrmq.SubmitJobSetResp, *mq.CodeMessage) {
jobSet, err := svc.jobMan.SubmitJobSet(msg.JobSet, msg.PreScheduleScheme)
jobSet, err := svc.jobMgr.SubmitJobSet(msg.JobSet, msg.PreScheduleScheme)
if err != nil {
logger.Warnf("submitting job set: %s", err.Error())
return nil, mq.Failed(errorcode.OperationFailed, "submit job set failed")
@ -20,14 +20,16 @@ func (svc *Service) SubmitJobSet(msg *mgrmq.SubmitJobSet) (*mgrmq.SubmitJobSetRe
// 任务集中某个文件上传完成
func (svc *Service) JobSetLocalFileUploaded(msg *mgrmq.JobSetLocalFileUploaded) (*mgrmq.JobSetLocalFileUploadedResp, *mq.CodeMessage) {
svc.jobMan.LocalFileUploaded(msg.JobSetID, msg.LocalPath, msg.Error, msg.PackageID)
svc.jobMgr.LocalFileUploaded(msg.JobSetID, msg.LocalPath, msg.Error, msg.PackageID)
return mq.ReplyOK(mgrmq.NewJobSetLocalFileUploadedResp())
}
func (svc *Service) GetJob(msg *mgrmq.GetJob) (*mgrmq.GetJobResp, *mq.CodeMessage) {
job, err := svc.jobMgr.CloneJob(msg.JobID)
if err != nil {
logger.WithField("JobID", msg.JobID).Warnf("cloning job: %s", err.Error())
return nil, mq.Failed(errorcode.OperationFailed, "get job failed")
}
}
func (svc *Service) GetJobSetJobs(msg *mgrmq.GetJobSetJobs) (*mgrmq.GetJobSetJobsResp, *mq.CodeMessage) {
return mq.ReplyOK(mgrmq.NewGetJobResp(job))
}

View File

@ -1,13 +1,24 @@
package mq
import "gitlink.org.cn/cloudream/scheduler/manager/internal/jobmgr"
import (
"gitlink.org.cn/cloudream/scheduler/manager/internal/advisormgr"
"gitlink.org.cn/cloudream/scheduler/manager/internal/executormgr"
"gitlink.org.cn/cloudream/scheduler/manager/internal/imagemgr"
"gitlink.org.cn/cloudream/scheduler/manager/internal/jobmgr"
)
type Service struct {
jobMan *jobmgr.Manager
jobMgr *jobmgr.Manager
exeMgr *executormgr.Manager
advMgr *advisormgr.Manager
imgMgr *imagemgr.Manager
}
func NewService(jobMan *jobmgr.Manager) (*Service, error) {
func NewService(jobMan *jobmgr.Manager, exeMgr *executormgr.Manager, advMgr *advisormgr.Manager, imgMgr *imagemgr.Manager) (*Service, error) {
return &Service{
jobMan: jobMan,
jobMgr: jobMan,
exeMgr: exeMgr,
advMgr: advMgr,
imgMgr: imgMgr,
}, nil
}

View File

@ -3,13 +3,18 @@ package main
import (
"fmt"
"os"
"time"
_ "google.golang.org/grpc/balancer/grpclb"
"gitlink.org.cn/cloudream/common/pkgs/logger"
"gitlink.org.cn/cloudream/scheduler/common/globals"
mgrmq "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq/manager"
"gitlink.org.cn/cloudream/scheduler/manager/internal/advisormgr"
"gitlink.org.cn/cloudream/scheduler/manager/internal/config"
"gitlink.org.cn/cloudream/scheduler/manager/internal/executormgr"
"gitlink.org.cn/cloudream/scheduler/manager/internal/imagemgr"
"gitlink.org.cn/cloudream/scheduler/manager/internal/jobmgr"
mqsvc "gitlink.org.cn/cloudream/scheduler/manager/internal/mq"
)
@ -28,7 +33,31 @@ func main() {
globals.InitMQPool(&config.Cfg().RabbitMQ)
svc, err := mqsvc.NewService()
exeMgr, err := executormgr.NewManager(time.Duration(config.Cfg().ReportTimeoutSecs) * time.Second)
if err != nil {
fmt.Printf("new executor manager: %s", err.Error())
os.Exit(1)
}
advMgr, err := advisormgr.NewManager(time.Duration(config.Cfg().ReportTimeoutSecs) * time.Second)
if err != nil {
fmt.Printf("new advisor manager: %s", err.Error())
os.Exit(1)
}
imgMgr, err := imagemgr.NewManager()
if err != nil {
fmt.Printf("new image manager: %s", err.Error())
os.Exit(1)
}
jobMgr, err := jobmgr.NewManager(exeMgr, advMgr, imgMgr)
if err != nil {
fmt.Printf("new job manager: %s", err.Error())
os.Exit(1)
}
svc, err := mqsvc.NewService(jobMgr, exeMgr, advMgr, imgMgr)
if err != nil {
fmt.Printf("new service: %s", err.Error())
os.Exit(1)
@ -44,6 +73,10 @@ func main() {
}
// 启动服务
go serveExecutorManager(exeMgr)
go serveAdvisorManager(advMgr)
go serveMQServer(mqSvr)
forever := make(chan bool)
@ -60,3 +93,25 @@ func serveMQServer(server *mgrmq.Server) {
logger.Info("mq server stopped")
}
func serveExecutorManager(mgr *executormgr.Manager) {
logger.Info("start serving executor manager")
err := mgr.Serve()
if err != nil {
logger.Errorf("executor manager stopped with error: %s", err.Error())
}
logger.Info("executor manager stopped")
}
func serveAdvisorManager(mgr *advisormgr.Manager) {
logger.Info("start serving advisor manager")
err := mgr.Serve()
if err != nil {
logger.Errorf("advisor manager stopped with error: %s", err.Error())
}
logger.Info("advisor manager stopped")
}