diff --git a/common/pkgs/mq/executor/pcm.go b/common/pkgs/mq/executor/pcm.go index b384c07..8450a63 100644 --- a/common/pkgs/mq/executor/pcm.go +++ b/common/pkgs/mq/executor/pcm.go @@ -3,45 +3,13 @@ package executor import "gitlink.org.cn/cloudream/common/pkgs/mq" type PCMService interface { - StartUploadImage(msg *StartUploadImage) (*StartUploadImageResp, *mq.CodeMessage) - GetImageList(msg *GetImageList) (*GetImageListResp, *mq.CodeMessage) DeleteImage(msg *DeleteImage) (*DeleteImageResp, *mq.CodeMessage) - StartScheduleTask(msg *StartScheduleTask) (*StartScheduleTaskResp, *mq.CodeMessage) - DeleteTask(msg *DeleteTask) (*DeleteTaskResp, *mq.CodeMessage) } -// 启动上传(并注册)镜像任务 -var _ = Register(Service.StartUploadImage) - -type StartUploadImage struct { - mq.MessageBodyBase - SlwNodeID int64 `json:"slwNodeID"` - ImagePath string `json:"imagePath"` -} -type StartUploadImageResp struct { - mq.MessageBodyBase - TaskID string `json:"taskID"` -} - -func NewStartUploadImage(slwNodeID int64, imagePath string) *StartUploadImage { - return &StartUploadImage{ - SlwNodeID: slwNodeID, - ImagePath: imagePath, - } -} -func NewStartUploadImageResp(taskID string) *StartUploadImageResp { - return &StartUploadImageResp{ - TaskID: taskID, - } -} -func (c *Client) StartUploadImage(msg *StartUploadImage, opts ...mq.RequestOption) (*StartUploadImageResp, error) { - return mq.Request(Service.StartUploadImage, c.rabbitCli, msg, opts...) -} - // 查询镜像列表 var _ = Register(Service.GetImageList) @@ -96,38 +64,6 @@ func (c *Client) DeleteImage(msg *DeleteImage, opts ...mq.RequestOption) (*Delet return mq.Request(Service.DeleteImage, c.rabbitCli, msg, opts...) } -// 启动提交任务 -var _ = Register(Service.StartScheduleTask) - -type StartScheduleTask struct { - mq.MessageBodyBase - SlwNodeID int64 `json:"slwNodeID"` - Envs []map[string]string `json:"envs"` - ImageID int64 `json:"imageID"` - CMDLine string `json:"cmdLine"` -} -type StartScheduleTaskResp struct { - mq.MessageBodyBase - TaskID string `json:"taskID"` -} - -func NewStartScheduleTask(slwNodeID int64, envs []map[string]string, imageID int64, cmdLine string) *StartScheduleTask { - return &StartScheduleTask{ - SlwNodeID: slwNodeID, - Envs: envs, - ImageID: imageID, - CMDLine: cmdLine, - } -} -func NewStartScheduleTaskResp(taskID string) *StartScheduleTaskResp { - return &StartScheduleTaskResp{ - TaskID: taskID, - } -} -func (c *Client) StartScheduleTask(msg *StartScheduleTask, opts ...mq.RequestOption) (*StartScheduleTaskResp, error) { - return mq.Request(Service.StartScheduleTask, c.rabbitCli, msg, opts...) -} - // 删除任务 var _ = Register(Service.DeleteTask) diff --git a/common/pkgs/mq/executor/server.go b/common/pkgs/mq/executor/server.go index 61ca4ab..52d1ec2 100644 --- a/common/pkgs/mq/executor/server.go +++ b/common/pkgs/mq/executor/server.go @@ -7,8 +7,9 @@ import ( // Service 协调端接口 type Service interface { - StorageService PCMService + + TaskService } const ( diff --git a/common/pkgs/mq/executor/storage.go b/common/pkgs/mq/executor/storage.go deleted file mode 100644 index 2f90701..0000000 --- a/common/pkgs/mq/executor/storage.go +++ /dev/null @@ -1,109 +0,0 @@ -package executor - -import ( - "gitlink.org.cn/cloudream/common/models" - "gitlink.org.cn/cloudream/common/pkgs/mq" -) - -type StorageService interface { - StartStorageLoadPackage(msg *StartStorageLoadPackage) (*StartStorageLoadPackageResp, *mq.CodeMessage) - - StartStorageCreatePackage(msg *StartStorageCreatePackage) (*StartStorageCreatePackageResp, *mq.CodeMessage) - - StartCacheMovePackage(msg *StartCacheMovePackage) (*StartCacheMovePackageResp, *mq.CodeMessage) -} - -// 启动存储系统调度文件任务 -var _ = Register(Service.StartStorageLoadPackage) - -type StartStorageLoadPackage struct { - mq.MessageBodyBase - UserID int64 `json:"userID"` - PackageID int64 `json:"packageID"` - StorageID int64 `json:"storageID"` -} -type StartStorageLoadPackageResp struct { - mq.MessageBodyBase - TaskID string `json:"taskID"` -} - -func NewStartStorageLoadPackage(packageID int64, userID int64) *StartStorageLoadPackage { - return &StartStorageLoadPackage{ - PackageID: packageID, - UserID: userID, - } -} -func NewStartStorageLoadPackageResp(taskID string) *StartStorageLoadPackageResp { - return &StartStorageLoadPackageResp{ - TaskID: taskID, - } -} -func (c *Client) StartStorageLoadPackage(msg *StartStorageLoadPackage, opts ...mq.RequestOption) (*StartStorageLoadPackageResp, error) { - return mq.Request(Service.StartStorageLoadPackage, c.rabbitCli, msg, opts...) -} - -// 启动存储系统从存储服务上传文件任务 -var _ = Register(Service.StartStorageCreatePackage) - -type StartStorageCreatePackage struct { - mq.MessageBodyBase - UserID int64 `json:"userID"` - StorageID int64 `json:"storageID"` - Path string `json:"path"` - BucketID int64 `json:"bucketID"` - Name string `json:"name"` - Redundancy models.TypedRedundancyInfo `json:"redundancy"` -} -type StartStorageCreatePackageResp struct { - mq.MessageBodyBase - TaskID string `json:"taskID"` -} - -func NewStartStorageCreatePackage(userID int64, storageID int64, filePath string, bucketID int64, name string, redundancy models.TypedRedundancyInfo) *StartStorageCreatePackage { - return &StartStorageCreatePackage{ - UserID: userID, - StorageID: storageID, - Path: filePath, - BucketID: bucketID, - Name: name, - Redundancy: redundancy, - } -} -func NewStartStorageCreatePackageResp(taskID string) *StartStorageCreatePackageResp { - return &StartStorageCreatePackageResp{ - TaskID: taskID, - } -} -func (c *Client) StartStorageCreatePackage(msg *StartStorageCreatePackage, opts ...mq.RequestOption) (*StartStorageCreatePackageResp, error) { - return mq.Request(Service.StartStorageCreatePackage, c.rabbitCli, msg, opts...) -} - -// 启动存储系统调度文件到某个节点的缓存的任务 -var _ = Register(Service.StartCacheMovePackage) - -type StartCacheMovePackage struct { - mq.MessageBodyBase - UserID int64 `json:"userID"` - PackageID int64 `json:"packageID"` - StgNodeID int64 `json:"stgNodeID"` -} -type StartCacheMovePackageResp struct { - mq.MessageBodyBase - TaskID string `json:"taskID"` -} - -func NewStartCacheMovePackage(userID int64, packageID int64, stgNodeID int64) *StartCacheMovePackage { - return &StartCacheMovePackage{ - UserID: userID, - PackageID: packageID, - StgNodeID: stgNodeID, - } -} -func NewStartCacheMovePackageResp(taskID string) *StartCacheMovePackageResp { - return &StartCacheMovePackageResp{ - TaskID: taskID, - } -} -func (c *Client) StartCacheMovePackage(msg *StartCacheMovePackage, opts ...mq.RequestOption) (*StartCacheMovePackageResp, error) { - return mq.Request(Service.StartCacheMovePackage, c.rabbitCli, msg, opts...) -} diff --git a/common/pkgs/mq/executor/task.go b/common/pkgs/mq/executor/task.go new file mode 100644 index 0000000..6d983ce --- /dev/null +++ b/common/pkgs/mq/executor/task.go @@ -0,0 +1,42 @@ +package executor + +import ( + "gitlink.org.cn/cloudream/common/pkgs/mq" + exectsk "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq/executor/task" +) + +type TaskService interface { + StartTask(msg *StartTask) (*StartTaskResp, *mq.CodeMessage) +} + +// 启动一个任务 +var _ = Register(Service.StartTask) + +type StartTask struct { + mq.MessageBodyBase + Info exectsk.TaskInfo `json:"info"` +} +type StartTaskResp struct { + mq.MessageBodyBase + ExecutorID string `json:"executorID"` + TaskID string `json:"taskID"` +} + +func NewStartTask(info exectsk.TaskInfo) *StartTask { + return &StartTask{ + Info: info, + } +} +func NewStartTaskResp(execID string, taskID string) *StartTaskResp { + return &StartTaskResp{ + ExecutorID: execID, + TaskID: taskID, + } +} +func (c *Client) StartTask(msg *StartTask, opts ...mq.RequestOption) (*StartTaskResp, error) { + return mq.Request(Service.StartTask, c.rabbitCli, msg, opts...) +} + +func init() { + mq.RegisterUnionType(exectsk.TaskInfoTypeUnion) +} diff --git a/common/pkgs/mq/executor/task/cache_move_package.go b/common/pkgs/mq/executor/task/cache_move_package.go new file mode 100644 index 0000000..fd0d35e --- /dev/null +++ b/common/pkgs/mq/executor/task/cache_move_package.go @@ -0,0 +1,31 @@ +package task + +type CacheMovePackage struct { + TaskInfoBase + UserID int64 `json:"userID"` + PackageID int64 `json:"packageID"` + StgNodeID int64 `json:"stgNodeID"` +} +type CacheMovePackageStatus struct { + TaskStatusBase + Status string `json:"status"` + Error string `json:"error"` +} + +func NewCacheMovePackage(userID int64, packageID int64, stgNodeID int64) *CacheMovePackage { + return &CacheMovePackage{ + UserID: userID, + PackageID: packageID, + StgNodeID: stgNodeID, + } +} +func NewCacheMovePackageStatus(status string, err string) *CacheMovePackageStatus { + return &CacheMovePackageStatus{ + Status: status, + Error: err, + } +} + +func init() { + Register[CacheMovePackage, CacheMovePackageStatus]() +} diff --git a/common/pkgs/mq/executor/task/schedule_task.go b/common/pkgs/mq/executor/task/schedule_task.go new file mode 100644 index 0000000..6ee44c0 --- /dev/null +++ b/common/pkgs/mq/executor/task/schedule_task.go @@ -0,0 +1,36 @@ +package task + +type ScheduleTask struct { + TaskInfoBase + SlwNodeID int64 `json:"slwNodeID"` + Envs []map[string]string `json:"envs"` + ImageID int64 `json:"imageID"` + CMDLine string `json:"cmdLine"` +} +type ScheduleTaskStatus struct { + TaskStatusBase + Status string `json:"status"` + Error string `json:"error"` + PCMJobID int64 `json:"pcmJobID"` +} + +func NewScheduleTask(slwNodeID int64, envs []map[string]string, imageID int64, cmdLine string) *ScheduleTask { + return &ScheduleTask{ + SlwNodeID: slwNodeID, + Envs: envs, + ImageID: imageID, + CMDLine: cmdLine, + } +} + +func NewScheduleTaskStatus(status string, err string, pcmJobID int64) *ScheduleTaskStatus { + return &ScheduleTaskStatus{ + Status: status, + Error: err, + PCMJobID: pcmJobID, + } +} + +func init() { + Register[ScheduleTask, ScheduleTaskStatus]() +} diff --git a/common/pkgs/mq/executor/task/storage_create_package.go b/common/pkgs/mq/executor/task/storage_create_package.go new file mode 100644 index 0000000..e3d3073 --- /dev/null +++ b/common/pkgs/mq/executor/task/storage_create_package.go @@ -0,0 +1,43 @@ +package task + +import ( + "gitlink.org.cn/cloudream/common/models" +) + +type StorageCreatePackage struct { + TaskInfoBase + UserID int64 `json:"userID"` + StorageID int64 `json:"storageID"` + Path string `json:"path"` + BucketID int64 `json:"bucketID"` + Name string `json:"name"` + Redundancy models.TypedRedundancyInfo `json:"redundancy"` +} +type StorageCreatePackageStatus struct { + TaskStatusBase + Status string `json:"status"` + Error string `json:"error"` + PackageID int64 `json:"packageID"` +} + +func NewStorageCreatePackage(userID int64, storageID int64, filePath string, bucketID int64, name string, redundancy models.TypedRedundancyInfo) *StorageCreatePackage { + return &StorageCreatePackage{ + UserID: userID, + StorageID: storageID, + Path: filePath, + BucketID: bucketID, + Name: name, + Redundancy: redundancy, + } +} +func NewStorageCreatePackageStatus(status string, err string, packageID int64) *StorageCreatePackageStatus { + return &StorageCreatePackageStatus{ + Status: status, + Error: err, + PackageID: packageID, + } +} + +func init() { + Register[StorageCreatePackage, StorageCreatePackageStatus]() +} diff --git a/common/pkgs/mq/executor/task/storage_load_package.go b/common/pkgs/mq/executor/task/storage_load_package.go new file mode 100644 index 0000000..50f949d --- /dev/null +++ b/common/pkgs/mq/executor/task/storage_load_package.go @@ -0,0 +1,30 @@ +package task + +type StorageLoadPackage struct { + TaskInfoBase + UserID int64 `json:"userID"` + PackageID int64 `json:"packageID"` + StorageID int64 `json:"storageID"` +} +type StorageLoadPackageStatus struct { + TaskStatusBase + Status string `json:"status"` + Error string `json:"error"` +} + +func NewStorageLoadPackage(packageID int64, userID int64) *StorageLoadPackage { + return &StorageLoadPackage{ + PackageID: packageID, + UserID: userID, + } +} +func NewStorageLoadPackageStatus(status string, err string) *StorageLoadPackageStatus { + return &StorageLoadPackageStatus{ + Status: status, + Error: err, + } +} + +func init() { + Register[StorageLoadPackage, StorageCreatePackageStatus]() +} diff --git a/common/pkgs/mq/executor/task/task.go b/common/pkgs/mq/executor/task/task.go index b83919f..3b9058e 100644 --- a/common/pkgs/mq/executor/task/task.go +++ b/common/pkgs/mq/executor/task/task.go @@ -1,93 +1,36 @@ package task import ( - "gitlink.org.cn/cloudream/common/pkgs/mq" "gitlink.org.cn/cloudream/common/pkgs/types" myreflect "gitlink.org.cn/cloudream/common/utils/reflect" ) -type TaskStatus interface{} - -// 增加了新类型后需要在这里也同步添加 -type TaskStatusConst interface { - TaskStatus | ScheduleTaskStatus | UploadImageTaskStatus | CacheMovePackageTaskStatus | CreatePackageTaskStatus | LoadPackageTaskStatus +// 任务 +type TaskInfo interface { + Noop() } // 增加了新类型后需要在这里也同步添加 -var TaskStatusTypeUnion = types.NewTypeUnion[TaskStatus]( - myreflect.TypeOf[ScheduleTaskStatus](), - myreflect.TypeOf[UploadImageTaskStatus](), - myreflect.TypeOf[CacheMovePackageTaskStatus](), - myreflect.TypeOf[CreatePackageTaskStatus](), - myreflect.TypeOf[LoadPackageTaskStatus](), -) +var TaskInfoTypeUnion = types.NewTypeUnion[TaskInfo]() -type ScheduleTaskStatus struct { - Status string `json:"status"` - Error string `json:"error"` - PCMJobID int64 `json:"pcmJobID"` +type TaskInfoBase struct{} + +func (s *TaskInfoBase) Noop() {} + +// 任务上报的状态 +type TaskStatus interface { + Noop() } -func NewScheduleTaskStatus(status string, err string, pcmJobID int64) ScheduleTaskStatus { - return ScheduleTaskStatus{ - Status: status, - Error: err, - PCMJobID: pcmJobID, - } -} +// 增加了新类型后需要在这里也同步添加 +var TaskStatusTypeUnion = types.NewTypeUnion[TaskStatus]() -type UploadImageTaskStatus struct { - Status string `json:"status"` - Error string `json:"error"` - ImageID int64 `json:"imageID"` -} +type TaskStatusBase struct{} -func NewUploadImageTaskStatus(status string, err string, imageID int64) UploadImageTaskStatus { - return UploadImageTaskStatus{ - Status: status, - Error: err, - ImageID: imageID, - } -} +func (s *TaskStatusBase) Noop() {} -type CacheMovePackageTaskStatus struct { - Status string `json:"status"` - Error string `json:"error"` -} +func Register[TTaskInfo any, TTaskStatus any]() { + TaskInfoTypeUnion.Add(myreflect.TypeOf[TTaskInfo]()) -func NewCacheMovePackageTaskStatus(status string, err string) CacheMovePackageTaskStatus { - return CacheMovePackageTaskStatus{ - Status: status, - Error: err, - } -} - -type CreatePackageTaskStatus struct { - Status string `json:"status"` - Error string `json:"error"` - PackageID int64 `json:"packageID"` -} - -func NewCreatePackageTaskStatus(status string, err string, packageID int64) CreatePackageTaskStatus { - return CreatePackageTaskStatus{ - Status: status, - Error: err, - PackageID: packageID, - } -} - -type LoadPackageTaskStatus struct { - Status string `json:"status"` - Error string `json:"error"` -} - -func NewLoadPackageTaskStatus(status string, err string) LoadPackageTaskStatus { - return LoadPackageTaskStatus{ - Status: status, - Error: err, - } -} - -func init() { - mq.RegisterUnionType(TaskStatusTypeUnion) + TaskStatusTypeUnion.Add(myreflect.TypeOf[TTaskStatus]()) } diff --git a/common/pkgs/mq/executor/task/upload_image.go b/common/pkgs/mq/executor/task/upload_image.go new file mode 100644 index 0000000..e37fc3f --- /dev/null +++ b/common/pkgs/mq/executor/task/upload_image.go @@ -0,0 +1,31 @@ +package task + +type UploadImage struct { + TaskInfoBase + SlwNodeID int64 `json:"slwNodeID"` + ImagePath string `json:"imagePath"` +} +type UploadImageStatus struct { + TaskStatusBase + Status string `json:"status"` + Error string `json:"error"` + ImageID int64 `json:"imageID"` +} + +func NewUploadImage(slwNodeID int64, imagePath string) *UploadImage { + return &UploadImage{ + SlwNodeID: slwNodeID, + ImagePath: imagePath, + } +} +func NewUploadImageStatus(status string, err string, imageID int64) *UploadImageStatus { + return &UploadImageStatus{ + Status: status, + Error: err, + ImageID: imageID, + } +} + +func init() { + Register[UploadImage, UploadImageStatus]() +} diff --git a/common/pkgs/mq/manager/executor.go b/common/pkgs/mq/manager/executor.go index 7ebcc11..3d9493f 100644 --- a/common/pkgs/mq/manager/executor.go +++ b/common/pkgs/mq/manager/executor.go @@ -44,3 +44,7 @@ func NewExecutorTaskStatus(taskID string, status exectsk.TaskStatus) ExecutorTas func (c *Client) ReportExecutorTaskStatus(msg *ReportExecutorTaskStatus, opts ...mq.RequestOption) (*ReportExecutorTaskStatusResp, error) { return mq.Request(Service.ReportExecutorTaskStatus, c.rabbitCli, msg, opts...) } + +func init() { + mq.RegisterUnionType(exectsk.TaskStatusTypeUnion) +} diff --git a/executor/internal/services/pcm.go b/executor/internal/services/pcm.go index eb4db94..e7aae79 100644 --- a/executor/internal/services/pcm.go +++ b/executor/internal/services/pcm.go @@ -7,14 +7,8 @@ import ( "gitlink.org.cn/cloudream/common/pkgs/mq" "gitlink.org.cn/cloudream/scheduler/common/globals" execmq "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq/executor" - schtsk "gitlink.org.cn/cloudream/scheduler/executor/internal/task" ) -func (svc *Service) StartUploadImage(msg *execmq.StartUploadImage) (*execmq.StartUploadImageResp, *mq.CodeMessage) { - tsk := svc.taskManager.StartNew(schtsk.NewPCMUploadImage(msg.SlwNodeID, msg.ImagePath)) - return mq.ReplyOK(execmq.NewStartUploadImageResp(tsk.ID())) -} - func (svc *Service) GetImageList(msg *execmq.GetImageList) (*execmq.GetImageListResp, *mq.CodeMessage) { pcmCli, err := globals.PCMPool.Acquire() if err != nil { @@ -53,11 +47,6 @@ func (svc *Service) DeleteImage(msg *execmq.DeleteImage) (*execmq.DeleteImageRes return mq.ReplyOK(execmq.NewDeleteImageResp(resp.Result)) } -func (svc *Service) StartScheduleTask(msg *execmq.StartScheduleTask) (*execmq.StartScheduleTaskResp, *mq.CodeMessage) { - tsk := svc.taskManager.StartNew(schtsk.NewPCMScheduleTask(msg.SlwNodeID, msg.Envs, msg.ImageID, msg.CMDLine)) - return mq.ReplyOK(execmq.NewStartScheduleTaskResp(tsk.ID())) -} - func (svc *Service) DeleteTask(msg *execmq.DeleteTask) (*execmq.DeleteTaskResp, *mq.CodeMessage) { pcmCli, err := globals.PCMPool.Acquire() if err != nil { diff --git a/executor/internal/services/storage.go b/executor/internal/services/storage.go deleted file mode 100644 index da29fcc..0000000 --- a/executor/internal/services/storage.go +++ /dev/null @@ -1,23 +0,0 @@ -package services - -import ( - "gitlink.org.cn/cloudream/common/pkgs/mq" - execmq "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq/executor" - schtsk "gitlink.org.cn/cloudream/scheduler/executor/internal/task" -) - -func (svc *Service) StartStorageLoadPackage(msg *execmq.StartStorageLoadPackage) (*execmq.StartStorageLoadPackageResp, *mq.CodeMessage) { - tsk := svc.taskManager.StartNew(schtsk.NewStorageLoadPackage(msg.UserID, msg.PackageID, msg.StorageID)) - return mq.ReplyOK(execmq.NewStartStorageLoadPackageResp(tsk.ID())) -} - -func (svc *Service) StartStorageCreatePackage(msg *execmq.StartStorageCreatePackage) (*execmq.StartStorageCreatePackageResp, *mq.CodeMessage) { - tsk := svc.taskManager.StartNew(schtsk.NewStorageCreatePackage(msg.UserID, msg.StorageID, msg.Path, msg.BucketID, msg.Name, msg.Redundancy)) - return mq.ReplyOK(execmq.NewStartStorageCreatePackageResp(tsk.ID())) -} - -func (svc *Service) StartCacheMovePackage(msg *execmq.StartCacheMovePackage) (*execmq.StartCacheMovePackageResp, *mq.CodeMessage) { - tsk := svc.taskManager.StartNew(schtsk.NewCacheMovePackage(msg.UserID, msg.PackageID, msg.StgNodeID)) - // tsk := svc.taskManager.StartNew(task.TaskBody[schtsk.NewCacheMovePackage(msg.UserID, msg.PackageID, msg.NodeID)]) - return mq.ReplyOK(execmq.NewStartCacheMovePackageResp(tsk.ID())) -} diff --git a/executor/internal/services/task.go b/executor/internal/services/task.go new file mode 100644 index 0000000..02657ed --- /dev/null +++ b/executor/internal/services/task.go @@ -0,0 +1,21 @@ +package services + +import ( + "gitlink.org.cn/cloudream/common/consts/errorcode" + "gitlink.org.cn/cloudream/common/pkgs/logger" + "gitlink.org.cn/cloudream/common/pkgs/mq" + "gitlink.org.cn/cloudream/common/utils/reflect" + execmq "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq/executor" + myglbs "gitlink.org.cn/cloudream/scheduler/executor/internal/globals" +) + +func (svc *Service) StartTask(msg *execmq.StartTask) (*execmq.StartTaskResp, *mq.CodeMessage) { + tsk, err := svc.taskManager.StartByInfo(msg.Info) + if err != nil { + logger.WithField("Info", reflect.TypeOfValue(msg.Info).Name()). + Warnf("starting task by info: %s", err.Error()) + return nil, mq.Failed(errorcode.OperationFailed, "start task by info failed") + } + + return mq.ReplyOK(execmq.NewStartTaskResp(myglbs.ExecutorID, tsk.ID())) +} diff --git a/executor/internal/task/cache_move_package.go b/executor/internal/task/cache_move_package.go index dbaacc2..82e2c99 100644 --- a/executor/internal/task/cache_move_package.go +++ b/executor/internal/task/cache_move_package.go @@ -12,16 +12,12 @@ import ( ) type CacheMovePackage struct { - userID int64 - packageID int64 - stgNodeID int64 + *exectsk.CacheMovePackage } -func NewCacheMovePackage(userID int64, packageID int64, stgNodeID int64) *CacheMovePackage { +func NewCacheMovePackage(info *exectsk.CacheMovePackage) *CacheMovePackage { return &CacheMovePackage{ - userID: userID, - packageID: packageID, - stgNodeID: stgNodeID, + CacheMovePackage: info, } } @@ -33,9 +29,9 @@ func (t *CacheMovePackage) Execute(task *task.Task[TaskContext], ctx TaskContext err := t.do(ctx) if err != nil { //TODO 若任务失败,上报的状态failed字段根据情况修改 - ctx.reporter.Report(task.ID(), exectsk.NewCacheMovePackageTaskStatus("failed", err.Error())) + ctx.reporter.Report(task.ID(), exectsk.NewCacheMovePackageStatus("failed", err.Error())) } else { - ctx.reporter.Report(task.ID(), exectsk.NewCacheMovePackageTaskStatus("completed", "")) + ctx.reporter.Report(task.ID(), exectsk.NewCacheMovePackageStatus("completed", "")) } ctx.reporter.ReportNow() @@ -52,8 +48,12 @@ func (t *CacheMovePackage) do(ctx TaskContext) error { defer stgCli.Close() return stgCli.CacheMovePackage(storage.CacheMovePackageReq{ - UserID: t.userID, - PackageID: t.packageID, - NodeID: t.stgNodeID, + UserID: t.UserID, + PackageID: t.PackageID, + NodeID: t.StgNodeID, }) } + +func init() { + Register(NewCacheMovePackage) +} diff --git a/executor/internal/task/pcm_schedule_task.go b/executor/internal/task/pcm_schedule_task.go index 0e28fcc..2bfd5c1 100644 --- a/executor/internal/task/pcm_schedule_task.go +++ b/executor/internal/task/pcm_schedule_task.go @@ -13,18 +13,12 @@ import ( ) type PCMScheduleTask struct { - slwNodeID int64 - envs []map[string]string - imageID int64 - cmdLine string + *exectsk.ScheduleTask } -func NewPCMScheduleTask(slwNodeID int64, envs []map[string]string, imageID int64, cmdLine string) *PCMScheduleTask { +func NewPCMScheduleTask(info *exectsk.ScheduleTask) *PCMScheduleTask { return &PCMScheduleTask{ - slwNodeID: slwNodeID, - envs: envs, - imageID: imageID, - cmdLine: cmdLine, + ScheduleTask: info, } } @@ -53,10 +47,10 @@ func (t *PCMScheduleTask) do(taskID string, ctx TaskContext) error { defer pcmCli.Close() resp, err := pcmCli.ScheduleTask(pcm.ScheduleTaskReq{ - SlwNodeID: t.slwNodeID, - Envs: t.envs, - ImageID: t.imageID, - CMDLine: t.cmdLine, + SlwNodeID: t.SlwNodeID, + Envs: t.Envs, + ImageID: t.ImageID, + CMDLine: t.CMDLine, }) if err != nil { @@ -66,7 +60,7 @@ func (t *PCMScheduleTask) do(taskID string, ctx TaskContext) error { var prevStatus string for { tsResp, err := pcmCli.GetTaskStatus(pcm.GetTaskStatusReq{ - SlwNodeID: t.slwNodeID, + SlwNodeID: t.SlwNodeID, PCMJobID: resp.PCMJobID, }) if err != nil { @@ -86,3 +80,7 @@ func (t *PCMScheduleTask) do(taskID string, ctx TaskContext) error { } } } + +func init() { + Register(NewPCMScheduleTask) +} diff --git a/executor/internal/task/pcm_upload_img.go b/executor/internal/task/pcm_upload_img.go index f1cd3e0..c53bcf4 100644 --- a/executor/internal/task/pcm_upload_img.go +++ b/executor/internal/task/pcm_upload_img.go @@ -12,14 +12,12 @@ import ( ) type PCMUploadImage struct { - slwNodeID int64 - imagePath string + *exectsk.UploadImage } -func NewPCMUploadImage(slwNodeID int64, imagePath string) *PCMUploadImage { +func NewPCMUploadImage(info *exectsk.UploadImage) *PCMUploadImage { return &PCMUploadImage{ - slwNodeID: slwNodeID, - imagePath: imagePath, + UploadImage: info, } } @@ -31,7 +29,7 @@ func (t *PCMUploadImage) Execute(task *task.Task[TaskContext], ctx TaskContext, err := t.do(task.ID(), ctx) if err != nil { //TODO 若任务失败,上报的状态failed字段根据情况修改 - ctx.reporter.Report(task.ID(), exectsk.NewUploadImageTaskStatus("failed", err.Error(), 0)) + ctx.reporter.Report(task.ID(), exectsk.NewUploadImageStatus("failed", err.Error(), 0)) } ctx.reporter.ReportNow() @@ -48,14 +46,18 @@ func (t *PCMUploadImage) do(taskID string, ctx TaskContext) error { defer pcmCli.Close() resp, err := pcmCli.UploadImage(pcm.UploadImageReq{ - SlwNodeID: t.slwNodeID, - ImagePath: t.imagePath, + SlwNodeID: t.SlwNodeID, + ImagePath: t.ImagePath, }) if err != nil { return err } // TODO 根据接口result返回情况修改 - ctx.reporter.Report(taskID, exectsk.NewUploadImageTaskStatus(resp.Result, "", resp.ImageID)) + ctx.reporter.Report(taskID, exectsk.NewUploadImageStatus(resp.Result, "", resp.ImageID)) return nil } + +func init() { + Register(NewPCMUploadImage) +} diff --git a/executor/internal/task/storage_create_package.go b/executor/internal/task/storage_create_package.go index 77e1b1a..562018e 100644 --- a/executor/internal/task/storage_create_package.go +++ b/executor/internal/task/storage_create_package.go @@ -5,7 +5,6 @@ import ( "time" "gitlink.org.cn/cloudream/common/api/storage" - "gitlink.org.cn/cloudream/common/models" "gitlink.org.cn/cloudream/common/pkgs/logger" "gitlink.org.cn/cloudream/common/pkgs/task" "gitlink.org.cn/cloudream/scheduler/common/globals" @@ -13,22 +12,12 @@ import ( ) type StorageCreatePackage struct { - userID int64 - storageID int64 - path string - bucketID int64 - name string - redundancy models.TypedRedundancyInfo + *exectsk.StorageCreatePackage } -func NewStorageCreatePackage(userID int64, storageID int64, path string, bucketID int64, name string, redundancy models.TypedRedundancyInfo) *StorageCreatePackage { +func NewStorageCreatePackage(info *exectsk.StorageCreatePackage) *StorageCreatePackage { return &StorageCreatePackage{ - userID: userID, - storageID: storageID, - path: path, - bucketID: bucketID, - name: name, - redundancy: redundancy, + StorageCreatePackage: info, } } @@ -40,7 +29,7 @@ func (t *StorageCreatePackage) Execute(task *task.Task[TaskContext], ctx TaskCon err := t.do(task.ID(), ctx) if err != nil { //TODO 若任务失败,上报的状态failed字段根据情况修改 - ctx.reporter.Report(task.ID(), exectsk.NewCreatePackageTaskStatus("failed", err.Error(), 0)) + ctx.reporter.Report(task.ID(), exectsk.NewStorageCreatePackageStatus("failed", err.Error(), 0)) } ctx.reporter.ReportNow() @@ -57,18 +46,22 @@ func (t *StorageCreatePackage) do(taskID string, ctx TaskContext) error { defer stgCli.Close() resp, err := stgCli.StorageCreatePackage(storage.StorageCreatePackageReq{ - UserID: t.userID, - StorageID: t.storageID, - Path: t.path, - BucketID: t.bucketID, - Name: t.name, - Redundancy: t.redundancy, + UserID: t.UserID, + StorageID: t.StorageID, + Path: t.Path, + BucketID: t.BucketID, + Name: t.Name, + Redundancy: t.Redundancy, }) if err != nil { return err } // TODO 根据接口result返回情况修改 - ctx.reporter.Report(taskID, exectsk.NewCreatePackageTaskStatus("completed", "", resp.PackageID)) + ctx.reporter.Report(taskID, exectsk.NewStorageCreatePackageStatus("completed", "", resp.PackageID)) return nil } + +func init() { + Register(NewStorageCreatePackage) +} diff --git a/executor/internal/task/storage_load_package.go b/executor/internal/task/storage_load_package.go index f63d542..d620c7b 100644 --- a/executor/internal/task/storage_load_package.go +++ b/executor/internal/task/storage_load_package.go @@ -12,16 +12,12 @@ import ( ) type StorageLoadPackage struct { - userID int64 - packageID int64 - storageID int64 + *exectsk.StorageLoadPackage } -func NewStorageLoadPackage(userID int64, packageID int64, storageID int64) *StorageLoadPackage { +func NewStorageLoadPackage(info *exectsk.StorageLoadPackage) *StorageLoadPackage { return &StorageLoadPackage{ - userID: userID, - packageID: packageID, - storageID: storageID, + StorageLoadPackage: info, } } @@ -33,9 +29,9 @@ func (t *StorageLoadPackage) Execute(task *task.Task[TaskContext], ctx TaskConte err := t.do(ctx) if err != nil { //TODO 若任务失败,上报的状态failed字段根据情况修改 - ctx.reporter.Report(task.ID(), exectsk.NewLoadPackageTaskStatus("failed", err.Error())) + ctx.reporter.Report(task.ID(), exectsk.NewStorageLoadPackageStatus("failed", err.Error())) } else { - ctx.reporter.Report(task.ID(), exectsk.NewLoadPackageTaskStatus("completed", "")) + ctx.reporter.Report(task.ID(), exectsk.NewStorageLoadPackageStatus("completed", "")) } ctx.reporter.ReportNow() @@ -52,8 +48,12 @@ func (t *StorageLoadPackage) do(ctx TaskContext) error { defer stgCli.Close() return stgCli.StorageLoadPackage(storage.StorageLoadPackageReq{ - UserID: t.userID, - PackageID: t.packageID, - StorageID: t.storageID, + UserID: t.UserID, + PackageID: t.PackageID, + StorageID: t.StorageID, }) } + +func init() { + Register(NewStorageLoadPackage) +} diff --git a/executor/internal/task/task.go b/executor/internal/task/task.go index 0cf4ab3..6f37186 100644 --- a/executor/internal/task/task.go +++ b/executor/internal/task/task.go @@ -1,7 +1,12 @@ package task import ( + "fmt" + "reflect" + "gitlink.org.cn/cloudream/common/pkgs/task" + myreflect "gitlink.org.cn/cloudream/common/utils/reflect" + exectsk "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq/executor/task" reporter "gitlink.org.cn/cloudream/scheduler/executor/internal/reporter" ) @@ -13,7 +18,9 @@ type TaskContext struct { // 因此适合进行执行结果的设置 type CompleteFn = task.CompleteFn -type Manager = task.Manager[TaskContext] +type Manager struct { + task.Manager[TaskContext] +} type TaskBody = task.TaskBody[TaskContext] @@ -22,7 +29,28 @@ type Task = task.Task[TaskContext] type CompleteOption = task.CompleteOption func NewManager(reporter *reporter.Reporter) Manager { - return task.NewManager(TaskContext{ - reporter: reporter, - }) + return Manager{ + Manager: task.NewManager(TaskContext{ + reporter: reporter, + }), + } +} + +func (m *Manager) StartByInfo(info exectsk.TaskInfo) (*Task, error) { + infoType := myreflect.TypeOfValue(info) + + ctor, ok := taskFromInfoCtors[infoType] + if !ok { + return nil, fmt.Errorf("unknow info type") + } + + return m.StartNew(ctor(info)), nil +} + +var taskFromInfoCtors map[reflect.Type]func(exectsk.TaskInfo) TaskBody + +func Register[TInfo exectsk.TaskInfo, TTaskBody TaskBody](ctor func(info TInfo) TTaskBody) { + taskFromInfoCtors[myreflect.TypeOf[TInfo]()] = func(info exectsk.TaskInfo) TaskBody { + return ctor(info.(TInfo)) + } }