将executor的任务启动接口合并成一个

This commit is contained in:
Sydonian 2023-09-13 10:43:12 +08:00
parent 51c59bd951
commit 1b18c15f74
20 changed files with 352 additions and 356 deletions

View File

@ -3,45 +3,13 @@ package executor
import "gitlink.org.cn/cloudream/common/pkgs/mq"
type PCMService interface {
StartUploadImage(msg *StartUploadImage) (*StartUploadImageResp, *mq.CodeMessage)
GetImageList(msg *GetImageList) (*GetImageListResp, *mq.CodeMessage)
DeleteImage(msg *DeleteImage) (*DeleteImageResp, *mq.CodeMessage)
StartScheduleTask(msg *StartScheduleTask) (*StartScheduleTaskResp, *mq.CodeMessage)
DeleteTask(msg *DeleteTask) (*DeleteTaskResp, *mq.CodeMessage)
}
// 启动上传(并注册)镜像任务
var _ = Register(Service.StartUploadImage)
type StartUploadImage struct {
mq.MessageBodyBase
SlwNodeID int64 `json:"slwNodeID"`
ImagePath string `json:"imagePath"`
}
type StartUploadImageResp struct {
mq.MessageBodyBase
TaskID string `json:"taskID"`
}
func NewStartUploadImage(slwNodeID int64, imagePath string) *StartUploadImage {
return &StartUploadImage{
SlwNodeID: slwNodeID,
ImagePath: imagePath,
}
}
func NewStartUploadImageResp(taskID string) *StartUploadImageResp {
return &StartUploadImageResp{
TaskID: taskID,
}
}
func (c *Client) StartUploadImage(msg *StartUploadImage, opts ...mq.RequestOption) (*StartUploadImageResp, error) {
return mq.Request(Service.StartUploadImage, c.rabbitCli, msg, opts...)
}
// 查询镜像列表
var _ = Register(Service.GetImageList)
@ -96,38 +64,6 @@ func (c *Client) DeleteImage(msg *DeleteImage, opts ...mq.RequestOption) (*Delet
return mq.Request(Service.DeleteImage, c.rabbitCli, msg, opts...)
}
// 启动提交任务
var _ = Register(Service.StartScheduleTask)
type StartScheduleTask struct {
mq.MessageBodyBase
SlwNodeID int64 `json:"slwNodeID"`
Envs []map[string]string `json:"envs"`
ImageID int64 `json:"imageID"`
CMDLine string `json:"cmdLine"`
}
type StartScheduleTaskResp struct {
mq.MessageBodyBase
TaskID string `json:"taskID"`
}
func NewStartScheduleTask(slwNodeID int64, envs []map[string]string, imageID int64, cmdLine string) *StartScheduleTask {
return &StartScheduleTask{
SlwNodeID: slwNodeID,
Envs: envs,
ImageID: imageID,
CMDLine: cmdLine,
}
}
func NewStartScheduleTaskResp(taskID string) *StartScheduleTaskResp {
return &StartScheduleTaskResp{
TaskID: taskID,
}
}
func (c *Client) StartScheduleTask(msg *StartScheduleTask, opts ...mq.RequestOption) (*StartScheduleTaskResp, error) {
return mq.Request(Service.StartScheduleTask, c.rabbitCli, msg, opts...)
}
// 删除任务
var _ = Register(Service.DeleteTask)

View File

@ -7,8 +7,9 @@ import (
// Service 协调端接口
type Service interface {
StorageService
PCMService
TaskService
}
const (

View File

@ -1,109 +0,0 @@
package executor
import (
"gitlink.org.cn/cloudream/common/models"
"gitlink.org.cn/cloudream/common/pkgs/mq"
)
type StorageService interface {
StartStorageLoadPackage(msg *StartStorageLoadPackage) (*StartStorageLoadPackageResp, *mq.CodeMessage)
StartStorageCreatePackage(msg *StartStorageCreatePackage) (*StartStorageCreatePackageResp, *mq.CodeMessage)
StartCacheMovePackage(msg *StartCacheMovePackage) (*StartCacheMovePackageResp, *mq.CodeMessage)
}
// 启动存储系统调度文件任务
var _ = Register(Service.StartStorageLoadPackage)
type StartStorageLoadPackage struct {
mq.MessageBodyBase
UserID int64 `json:"userID"`
PackageID int64 `json:"packageID"`
StorageID int64 `json:"storageID"`
}
type StartStorageLoadPackageResp struct {
mq.MessageBodyBase
TaskID string `json:"taskID"`
}
func NewStartStorageLoadPackage(packageID int64, userID int64) *StartStorageLoadPackage {
return &StartStorageLoadPackage{
PackageID: packageID,
UserID: userID,
}
}
func NewStartStorageLoadPackageResp(taskID string) *StartStorageLoadPackageResp {
return &StartStorageLoadPackageResp{
TaskID: taskID,
}
}
func (c *Client) StartStorageLoadPackage(msg *StartStorageLoadPackage, opts ...mq.RequestOption) (*StartStorageLoadPackageResp, error) {
return mq.Request(Service.StartStorageLoadPackage, c.rabbitCli, msg, opts...)
}
// 启动存储系统从存储服务上传文件任务
var _ = Register(Service.StartStorageCreatePackage)
type StartStorageCreatePackage struct {
mq.MessageBodyBase
UserID int64 `json:"userID"`
StorageID int64 `json:"storageID"`
Path string `json:"path"`
BucketID int64 `json:"bucketID"`
Name string `json:"name"`
Redundancy models.TypedRedundancyInfo `json:"redundancy"`
}
type StartStorageCreatePackageResp struct {
mq.MessageBodyBase
TaskID string `json:"taskID"`
}
func NewStartStorageCreatePackage(userID int64, storageID int64, filePath string, bucketID int64, name string, redundancy models.TypedRedundancyInfo) *StartStorageCreatePackage {
return &StartStorageCreatePackage{
UserID: userID,
StorageID: storageID,
Path: filePath,
BucketID: bucketID,
Name: name,
Redundancy: redundancy,
}
}
func NewStartStorageCreatePackageResp(taskID string) *StartStorageCreatePackageResp {
return &StartStorageCreatePackageResp{
TaskID: taskID,
}
}
func (c *Client) StartStorageCreatePackage(msg *StartStorageCreatePackage, opts ...mq.RequestOption) (*StartStorageCreatePackageResp, error) {
return mq.Request(Service.StartStorageCreatePackage, c.rabbitCli, msg, opts...)
}
// 启动存储系统调度文件到某个节点的缓存的任务
var _ = Register(Service.StartCacheMovePackage)
type StartCacheMovePackage struct {
mq.MessageBodyBase
UserID int64 `json:"userID"`
PackageID int64 `json:"packageID"`
StgNodeID int64 `json:"stgNodeID"`
}
type StartCacheMovePackageResp struct {
mq.MessageBodyBase
TaskID string `json:"taskID"`
}
func NewStartCacheMovePackage(userID int64, packageID int64, stgNodeID int64) *StartCacheMovePackage {
return &StartCacheMovePackage{
UserID: userID,
PackageID: packageID,
StgNodeID: stgNodeID,
}
}
func NewStartCacheMovePackageResp(taskID string) *StartCacheMovePackageResp {
return &StartCacheMovePackageResp{
TaskID: taskID,
}
}
func (c *Client) StartCacheMovePackage(msg *StartCacheMovePackage, opts ...mq.RequestOption) (*StartCacheMovePackageResp, error) {
return mq.Request(Service.StartCacheMovePackage, c.rabbitCli, msg, opts...)
}

View File

@ -0,0 +1,42 @@
package executor
import (
"gitlink.org.cn/cloudream/common/pkgs/mq"
exectsk "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq/executor/task"
)
type TaskService interface {
StartTask(msg *StartTask) (*StartTaskResp, *mq.CodeMessage)
}
// 启动一个任务
var _ = Register(Service.StartTask)
type StartTask struct {
mq.MessageBodyBase
Info exectsk.TaskInfo `json:"info"`
}
type StartTaskResp struct {
mq.MessageBodyBase
ExecutorID string `json:"executorID"`
TaskID string `json:"taskID"`
}
func NewStartTask(info exectsk.TaskInfo) *StartTask {
return &StartTask{
Info: info,
}
}
func NewStartTaskResp(execID string, taskID string) *StartTaskResp {
return &StartTaskResp{
ExecutorID: execID,
TaskID: taskID,
}
}
func (c *Client) StartTask(msg *StartTask, opts ...mq.RequestOption) (*StartTaskResp, error) {
return mq.Request(Service.StartTask, c.rabbitCli, msg, opts...)
}
func init() {
mq.RegisterUnionType(exectsk.TaskInfoTypeUnion)
}

View File

@ -0,0 +1,31 @@
package task
type CacheMovePackage struct {
TaskInfoBase
UserID int64 `json:"userID"`
PackageID int64 `json:"packageID"`
StgNodeID int64 `json:"stgNodeID"`
}
type CacheMovePackageStatus struct {
TaskStatusBase
Status string `json:"status"`
Error string `json:"error"`
}
func NewCacheMovePackage(userID int64, packageID int64, stgNodeID int64) *CacheMovePackage {
return &CacheMovePackage{
UserID: userID,
PackageID: packageID,
StgNodeID: stgNodeID,
}
}
func NewCacheMovePackageStatus(status string, err string) *CacheMovePackageStatus {
return &CacheMovePackageStatus{
Status: status,
Error: err,
}
}
func init() {
Register[CacheMovePackage, CacheMovePackageStatus]()
}

View File

@ -0,0 +1,36 @@
package task
type ScheduleTask struct {
TaskInfoBase
SlwNodeID int64 `json:"slwNodeID"`
Envs []map[string]string `json:"envs"`
ImageID int64 `json:"imageID"`
CMDLine string `json:"cmdLine"`
}
type ScheduleTaskStatus struct {
TaskStatusBase
Status string `json:"status"`
Error string `json:"error"`
PCMJobID int64 `json:"pcmJobID"`
}
func NewScheduleTask(slwNodeID int64, envs []map[string]string, imageID int64, cmdLine string) *ScheduleTask {
return &ScheduleTask{
SlwNodeID: slwNodeID,
Envs: envs,
ImageID: imageID,
CMDLine: cmdLine,
}
}
func NewScheduleTaskStatus(status string, err string, pcmJobID int64) *ScheduleTaskStatus {
return &ScheduleTaskStatus{
Status: status,
Error: err,
PCMJobID: pcmJobID,
}
}
func init() {
Register[ScheduleTask, ScheduleTaskStatus]()
}

View File

@ -0,0 +1,43 @@
package task
import (
"gitlink.org.cn/cloudream/common/models"
)
type StorageCreatePackage struct {
TaskInfoBase
UserID int64 `json:"userID"`
StorageID int64 `json:"storageID"`
Path string `json:"path"`
BucketID int64 `json:"bucketID"`
Name string `json:"name"`
Redundancy models.TypedRedundancyInfo `json:"redundancy"`
}
type StorageCreatePackageStatus struct {
TaskStatusBase
Status string `json:"status"`
Error string `json:"error"`
PackageID int64 `json:"packageID"`
}
func NewStorageCreatePackage(userID int64, storageID int64, filePath string, bucketID int64, name string, redundancy models.TypedRedundancyInfo) *StorageCreatePackage {
return &StorageCreatePackage{
UserID: userID,
StorageID: storageID,
Path: filePath,
BucketID: bucketID,
Name: name,
Redundancy: redundancy,
}
}
func NewStorageCreatePackageStatus(status string, err string, packageID int64) *StorageCreatePackageStatus {
return &StorageCreatePackageStatus{
Status: status,
Error: err,
PackageID: packageID,
}
}
func init() {
Register[StorageCreatePackage, StorageCreatePackageStatus]()
}

View File

@ -0,0 +1,30 @@
package task
type StorageLoadPackage struct {
TaskInfoBase
UserID int64 `json:"userID"`
PackageID int64 `json:"packageID"`
StorageID int64 `json:"storageID"`
}
type StorageLoadPackageStatus struct {
TaskStatusBase
Status string `json:"status"`
Error string `json:"error"`
}
func NewStorageLoadPackage(packageID int64, userID int64) *StorageLoadPackage {
return &StorageLoadPackage{
PackageID: packageID,
UserID: userID,
}
}
func NewStorageLoadPackageStatus(status string, err string) *StorageLoadPackageStatus {
return &StorageLoadPackageStatus{
Status: status,
Error: err,
}
}
func init() {
Register[StorageLoadPackage, StorageCreatePackageStatus]()
}

View File

@ -1,93 +1,36 @@
package task
import (
"gitlink.org.cn/cloudream/common/pkgs/mq"
"gitlink.org.cn/cloudream/common/pkgs/types"
myreflect "gitlink.org.cn/cloudream/common/utils/reflect"
)
type TaskStatus interface{}
// 增加了新类型后需要在这里也同步添加
type TaskStatusConst interface {
TaskStatus | ScheduleTaskStatus | UploadImageTaskStatus | CacheMovePackageTaskStatus | CreatePackageTaskStatus | LoadPackageTaskStatus
// 任务
type TaskInfo interface {
Noop()
}
// 增加了新类型后需要在这里也同步添加
var TaskStatusTypeUnion = types.NewTypeUnion[TaskStatus](
myreflect.TypeOf[ScheduleTaskStatus](),
myreflect.TypeOf[UploadImageTaskStatus](),
myreflect.TypeOf[CacheMovePackageTaskStatus](),
myreflect.TypeOf[CreatePackageTaskStatus](),
myreflect.TypeOf[LoadPackageTaskStatus](),
)
var TaskInfoTypeUnion = types.NewTypeUnion[TaskInfo]()
type ScheduleTaskStatus struct {
Status string `json:"status"`
Error string `json:"error"`
PCMJobID int64 `json:"pcmJobID"`
type TaskInfoBase struct{}
func (s *TaskInfoBase) Noop() {}
// 任务上报的状态
type TaskStatus interface {
Noop()
}
func NewScheduleTaskStatus(status string, err string, pcmJobID int64) ScheduleTaskStatus {
return ScheduleTaskStatus{
Status: status,
Error: err,
PCMJobID: pcmJobID,
}
}
// 增加了新类型后需要在这里也同步添加
var TaskStatusTypeUnion = types.NewTypeUnion[TaskStatus]()
type UploadImageTaskStatus struct {
Status string `json:"status"`
Error string `json:"error"`
ImageID int64 `json:"imageID"`
}
type TaskStatusBase struct{}
func NewUploadImageTaskStatus(status string, err string, imageID int64) UploadImageTaskStatus {
return UploadImageTaskStatus{
Status: status,
Error: err,
ImageID: imageID,
}
}
func (s *TaskStatusBase) Noop() {}
type CacheMovePackageTaskStatus struct {
Status string `json:"status"`
Error string `json:"error"`
}
func Register[TTaskInfo any, TTaskStatus any]() {
TaskInfoTypeUnion.Add(myreflect.TypeOf[TTaskInfo]())
func NewCacheMovePackageTaskStatus(status string, err string) CacheMovePackageTaskStatus {
return CacheMovePackageTaskStatus{
Status: status,
Error: err,
}
}
type CreatePackageTaskStatus struct {
Status string `json:"status"`
Error string `json:"error"`
PackageID int64 `json:"packageID"`
}
func NewCreatePackageTaskStatus(status string, err string, packageID int64) CreatePackageTaskStatus {
return CreatePackageTaskStatus{
Status: status,
Error: err,
PackageID: packageID,
}
}
type LoadPackageTaskStatus struct {
Status string `json:"status"`
Error string `json:"error"`
}
func NewLoadPackageTaskStatus(status string, err string) LoadPackageTaskStatus {
return LoadPackageTaskStatus{
Status: status,
Error: err,
}
}
func init() {
mq.RegisterUnionType(TaskStatusTypeUnion)
TaskStatusTypeUnion.Add(myreflect.TypeOf[TTaskStatus]())
}

View File

@ -0,0 +1,31 @@
package task
type UploadImage struct {
TaskInfoBase
SlwNodeID int64 `json:"slwNodeID"`
ImagePath string `json:"imagePath"`
}
type UploadImageStatus struct {
TaskStatusBase
Status string `json:"status"`
Error string `json:"error"`
ImageID int64 `json:"imageID"`
}
func NewUploadImage(slwNodeID int64, imagePath string) *UploadImage {
return &UploadImage{
SlwNodeID: slwNodeID,
ImagePath: imagePath,
}
}
func NewUploadImageStatus(status string, err string, imageID int64) *UploadImageStatus {
return &UploadImageStatus{
Status: status,
Error: err,
ImageID: imageID,
}
}
func init() {
Register[UploadImage, UploadImageStatus]()
}

View File

@ -44,3 +44,7 @@ func NewExecutorTaskStatus(taskID string, status exectsk.TaskStatus) ExecutorTas
func (c *Client) ReportExecutorTaskStatus(msg *ReportExecutorTaskStatus, opts ...mq.RequestOption) (*ReportExecutorTaskStatusResp, error) {
return mq.Request(Service.ReportExecutorTaskStatus, c.rabbitCli, msg, opts...)
}
func init() {
mq.RegisterUnionType(exectsk.TaskStatusTypeUnion)
}

View File

@ -7,14 +7,8 @@ import (
"gitlink.org.cn/cloudream/common/pkgs/mq"
"gitlink.org.cn/cloudream/scheduler/common/globals"
execmq "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq/executor"
schtsk "gitlink.org.cn/cloudream/scheduler/executor/internal/task"
)
func (svc *Service) StartUploadImage(msg *execmq.StartUploadImage) (*execmq.StartUploadImageResp, *mq.CodeMessage) {
tsk := svc.taskManager.StartNew(schtsk.NewPCMUploadImage(msg.SlwNodeID, msg.ImagePath))
return mq.ReplyOK(execmq.NewStartUploadImageResp(tsk.ID()))
}
func (svc *Service) GetImageList(msg *execmq.GetImageList) (*execmq.GetImageListResp, *mq.CodeMessage) {
pcmCli, err := globals.PCMPool.Acquire()
if err != nil {
@ -53,11 +47,6 @@ func (svc *Service) DeleteImage(msg *execmq.DeleteImage) (*execmq.DeleteImageRes
return mq.ReplyOK(execmq.NewDeleteImageResp(resp.Result))
}
func (svc *Service) StartScheduleTask(msg *execmq.StartScheduleTask) (*execmq.StartScheduleTaskResp, *mq.CodeMessage) {
tsk := svc.taskManager.StartNew(schtsk.NewPCMScheduleTask(msg.SlwNodeID, msg.Envs, msg.ImageID, msg.CMDLine))
return mq.ReplyOK(execmq.NewStartScheduleTaskResp(tsk.ID()))
}
func (svc *Service) DeleteTask(msg *execmq.DeleteTask) (*execmq.DeleteTaskResp, *mq.CodeMessage) {
pcmCli, err := globals.PCMPool.Acquire()
if err != nil {

View File

@ -1,23 +0,0 @@
package services
import (
"gitlink.org.cn/cloudream/common/pkgs/mq"
execmq "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq/executor"
schtsk "gitlink.org.cn/cloudream/scheduler/executor/internal/task"
)
func (svc *Service) StartStorageLoadPackage(msg *execmq.StartStorageLoadPackage) (*execmq.StartStorageLoadPackageResp, *mq.CodeMessage) {
tsk := svc.taskManager.StartNew(schtsk.NewStorageLoadPackage(msg.UserID, msg.PackageID, msg.StorageID))
return mq.ReplyOK(execmq.NewStartStorageLoadPackageResp(tsk.ID()))
}
func (svc *Service) StartStorageCreatePackage(msg *execmq.StartStorageCreatePackage) (*execmq.StartStorageCreatePackageResp, *mq.CodeMessage) {
tsk := svc.taskManager.StartNew(schtsk.NewStorageCreatePackage(msg.UserID, msg.StorageID, msg.Path, msg.BucketID, msg.Name, msg.Redundancy))
return mq.ReplyOK(execmq.NewStartStorageCreatePackageResp(tsk.ID()))
}
func (svc *Service) StartCacheMovePackage(msg *execmq.StartCacheMovePackage) (*execmq.StartCacheMovePackageResp, *mq.CodeMessage) {
tsk := svc.taskManager.StartNew(schtsk.NewCacheMovePackage(msg.UserID, msg.PackageID, msg.StgNodeID))
// tsk := svc.taskManager.StartNew(task.TaskBody[schtsk.NewCacheMovePackage(msg.UserID, msg.PackageID, msg.NodeID)])
return mq.ReplyOK(execmq.NewStartCacheMovePackageResp(tsk.ID()))
}

View File

@ -0,0 +1,21 @@
package services
import (
"gitlink.org.cn/cloudream/common/consts/errorcode"
"gitlink.org.cn/cloudream/common/pkgs/logger"
"gitlink.org.cn/cloudream/common/pkgs/mq"
"gitlink.org.cn/cloudream/common/utils/reflect"
execmq "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq/executor"
myglbs "gitlink.org.cn/cloudream/scheduler/executor/internal/globals"
)
func (svc *Service) StartTask(msg *execmq.StartTask) (*execmq.StartTaskResp, *mq.CodeMessage) {
tsk, err := svc.taskManager.StartByInfo(msg.Info)
if err != nil {
logger.WithField("Info", reflect.TypeOfValue(msg.Info).Name()).
Warnf("starting task by info: %s", err.Error())
return nil, mq.Failed(errorcode.OperationFailed, "start task by info failed")
}
return mq.ReplyOK(execmq.NewStartTaskResp(myglbs.ExecutorID, tsk.ID()))
}

View File

@ -12,16 +12,12 @@ import (
)
type CacheMovePackage struct {
userID int64
packageID int64
stgNodeID int64
*exectsk.CacheMovePackage
}
func NewCacheMovePackage(userID int64, packageID int64, stgNodeID int64) *CacheMovePackage {
func NewCacheMovePackage(info *exectsk.CacheMovePackage) *CacheMovePackage {
return &CacheMovePackage{
userID: userID,
packageID: packageID,
stgNodeID: stgNodeID,
CacheMovePackage: info,
}
}
@ -33,9 +29,9 @@ func (t *CacheMovePackage) Execute(task *task.Task[TaskContext], ctx TaskContext
err := t.do(ctx)
if err != nil {
//TODO 若任务失败上报的状态failed字段根据情况修改
ctx.reporter.Report(task.ID(), exectsk.NewCacheMovePackageTaskStatus("failed", err.Error()))
ctx.reporter.Report(task.ID(), exectsk.NewCacheMovePackageStatus("failed", err.Error()))
} else {
ctx.reporter.Report(task.ID(), exectsk.NewCacheMovePackageTaskStatus("completed", ""))
ctx.reporter.Report(task.ID(), exectsk.NewCacheMovePackageStatus("completed", ""))
}
ctx.reporter.ReportNow()
@ -52,8 +48,12 @@ func (t *CacheMovePackage) do(ctx TaskContext) error {
defer stgCli.Close()
return stgCli.CacheMovePackage(storage.CacheMovePackageReq{
UserID: t.userID,
PackageID: t.packageID,
NodeID: t.stgNodeID,
UserID: t.UserID,
PackageID: t.PackageID,
NodeID: t.StgNodeID,
})
}
func init() {
Register(NewCacheMovePackage)
}

View File

@ -13,18 +13,12 @@ import (
)
type PCMScheduleTask struct {
slwNodeID int64
envs []map[string]string
imageID int64
cmdLine string
*exectsk.ScheduleTask
}
func NewPCMScheduleTask(slwNodeID int64, envs []map[string]string, imageID int64, cmdLine string) *PCMScheduleTask {
func NewPCMScheduleTask(info *exectsk.ScheduleTask) *PCMScheduleTask {
return &PCMScheduleTask{
slwNodeID: slwNodeID,
envs: envs,
imageID: imageID,
cmdLine: cmdLine,
ScheduleTask: info,
}
}
@ -53,10 +47,10 @@ func (t *PCMScheduleTask) do(taskID string, ctx TaskContext) error {
defer pcmCli.Close()
resp, err := pcmCli.ScheduleTask(pcm.ScheduleTaskReq{
SlwNodeID: t.slwNodeID,
Envs: t.envs,
ImageID: t.imageID,
CMDLine: t.cmdLine,
SlwNodeID: t.SlwNodeID,
Envs: t.Envs,
ImageID: t.ImageID,
CMDLine: t.CMDLine,
})
if err != nil {
@ -66,7 +60,7 @@ func (t *PCMScheduleTask) do(taskID string, ctx TaskContext) error {
var prevStatus string
for {
tsResp, err := pcmCli.GetTaskStatus(pcm.GetTaskStatusReq{
SlwNodeID: t.slwNodeID,
SlwNodeID: t.SlwNodeID,
PCMJobID: resp.PCMJobID,
})
if err != nil {
@ -86,3 +80,7 @@ func (t *PCMScheduleTask) do(taskID string, ctx TaskContext) error {
}
}
}
func init() {
Register(NewPCMScheduleTask)
}

View File

@ -12,14 +12,12 @@ import (
)
type PCMUploadImage struct {
slwNodeID int64
imagePath string
*exectsk.UploadImage
}
func NewPCMUploadImage(slwNodeID int64, imagePath string) *PCMUploadImage {
func NewPCMUploadImage(info *exectsk.UploadImage) *PCMUploadImage {
return &PCMUploadImage{
slwNodeID: slwNodeID,
imagePath: imagePath,
UploadImage: info,
}
}
@ -31,7 +29,7 @@ func (t *PCMUploadImage) Execute(task *task.Task[TaskContext], ctx TaskContext,
err := t.do(task.ID(), ctx)
if err != nil {
//TODO 若任务失败上报的状态failed字段根据情况修改
ctx.reporter.Report(task.ID(), exectsk.NewUploadImageTaskStatus("failed", err.Error(), 0))
ctx.reporter.Report(task.ID(), exectsk.NewUploadImageStatus("failed", err.Error(), 0))
}
ctx.reporter.ReportNow()
@ -48,14 +46,18 @@ func (t *PCMUploadImage) do(taskID string, ctx TaskContext) error {
defer pcmCli.Close()
resp, err := pcmCli.UploadImage(pcm.UploadImageReq{
SlwNodeID: t.slwNodeID,
ImagePath: t.imagePath,
SlwNodeID: t.SlwNodeID,
ImagePath: t.ImagePath,
})
if err != nil {
return err
}
// TODO 根据接口result返回情况修改
ctx.reporter.Report(taskID, exectsk.NewUploadImageTaskStatus(resp.Result, "", resp.ImageID))
ctx.reporter.Report(taskID, exectsk.NewUploadImageStatus(resp.Result, "", resp.ImageID))
return nil
}
func init() {
Register(NewPCMUploadImage)
}

View File

@ -5,7 +5,6 @@ import (
"time"
"gitlink.org.cn/cloudream/common/api/storage"
"gitlink.org.cn/cloudream/common/models"
"gitlink.org.cn/cloudream/common/pkgs/logger"
"gitlink.org.cn/cloudream/common/pkgs/task"
"gitlink.org.cn/cloudream/scheduler/common/globals"
@ -13,22 +12,12 @@ import (
)
type StorageCreatePackage struct {
userID int64
storageID int64
path string
bucketID int64
name string
redundancy models.TypedRedundancyInfo
*exectsk.StorageCreatePackage
}
func NewStorageCreatePackage(userID int64, storageID int64, path string, bucketID int64, name string, redundancy models.TypedRedundancyInfo) *StorageCreatePackage {
func NewStorageCreatePackage(info *exectsk.StorageCreatePackage) *StorageCreatePackage {
return &StorageCreatePackage{
userID: userID,
storageID: storageID,
path: path,
bucketID: bucketID,
name: name,
redundancy: redundancy,
StorageCreatePackage: info,
}
}
@ -40,7 +29,7 @@ func (t *StorageCreatePackage) Execute(task *task.Task[TaskContext], ctx TaskCon
err := t.do(task.ID(), ctx)
if err != nil {
//TODO 若任务失败上报的状态failed字段根据情况修改
ctx.reporter.Report(task.ID(), exectsk.NewCreatePackageTaskStatus("failed", err.Error(), 0))
ctx.reporter.Report(task.ID(), exectsk.NewStorageCreatePackageStatus("failed", err.Error(), 0))
}
ctx.reporter.ReportNow()
@ -57,18 +46,22 @@ func (t *StorageCreatePackage) do(taskID string, ctx TaskContext) error {
defer stgCli.Close()
resp, err := stgCli.StorageCreatePackage(storage.StorageCreatePackageReq{
UserID: t.userID,
StorageID: t.storageID,
Path: t.path,
BucketID: t.bucketID,
Name: t.name,
Redundancy: t.redundancy,
UserID: t.UserID,
StorageID: t.StorageID,
Path: t.Path,
BucketID: t.BucketID,
Name: t.Name,
Redundancy: t.Redundancy,
})
if err != nil {
return err
}
// TODO 根据接口result返回情况修改
ctx.reporter.Report(taskID, exectsk.NewCreatePackageTaskStatus("completed", "", resp.PackageID))
ctx.reporter.Report(taskID, exectsk.NewStorageCreatePackageStatus("completed", "", resp.PackageID))
return nil
}
func init() {
Register(NewStorageCreatePackage)
}

View File

@ -12,16 +12,12 @@ import (
)
type StorageLoadPackage struct {
userID int64
packageID int64
storageID int64
*exectsk.StorageLoadPackage
}
func NewStorageLoadPackage(userID int64, packageID int64, storageID int64) *StorageLoadPackage {
func NewStorageLoadPackage(info *exectsk.StorageLoadPackage) *StorageLoadPackage {
return &StorageLoadPackage{
userID: userID,
packageID: packageID,
storageID: storageID,
StorageLoadPackage: info,
}
}
@ -33,9 +29,9 @@ func (t *StorageLoadPackage) Execute(task *task.Task[TaskContext], ctx TaskConte
err := t.do(ctx)
if err != nil {
//TODO 若任务失败上报的状态failed字段根据情况修改
ctx.reporter.Report(task.ID(), exectsk.NewLoadPackageTaskStatus("failed", err.Error()))
ctx.reporter.Report(task.ID(), exectsk.NewStorageLoadPackageStatus("failed", err.Error()))
} else {
ctx.reporter.Report(task.ID(), exectsk.NewLoadPackageTaskStatus("completed", ""))
ctx.reporter.Report(task.ID(), exectsk.NewStorageLoadPackageStatus("completed", ""))
}
ctx.reporter.ReportNow()
@ -52,8 +48,12 @@ func (t *StorageLoadPackage) do(ctx TaskContext) error {
defer stgCli.Close()
return stgCli.StorageLoadPackage(storage.StorageLoadPackageReq{
UserID: t.userID,
PackageID: t.packageID,
StorageID: t.storageID,
UserID: t.UserID,
PackageID: t.PackageID,
StorageID: t.StorageID,
})
}
func init() {
Register(NewStorageLoadPackage)
}

View File

@ -1,7 +1,12 @@
package task
import (
"fmt"
"reflect"
"gitlink.org.cn/cloudream/common/pkgs/task"
myreflect "gitlink.org.cn/cloudream/common/utils/reflect"
exectsk "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq/executor/task"
reporter "gitlink.org.cn/cloudream/scheduler/executor/internal/reporter"
)
@ -13,7 +18,9 @@ type TaskContext struct {
// 因此适合进行执行结果的设置
type CompleteFn = task.CompleteFn
type Manager = task.Manager[TaskContext]
type Manager struct {
task.Manager[TaskContext]
}
type TaskBody = task.TaskBody[TaskContext]
@ -22,7 +29,28 @@ type Task = task.Task[TaskContext]
type CompleteOption = task.CompleteOption
func NewManager(reporter *reporter.Reporter) Manager {
return task.NewManager(TaskContext{
reporter: reporter,
})
return Manager{
Manager: task.NewManager(TaskContext{
reporter: reporter,
}),
}
}
func (m *Manager) StartByInfo(info exectsk.TaskInfo) (*Task, error) {
infoType := myreflect.TypeOfValue(info)
ctor, ok := taskFromInfoCtors[infoType]
if !ok {
return nil, fmt.Errorf("unknow info type")
}
return m.StartNew(ctor(info)), nil
}
var taskFromInfoCtors map[reflect.Type]func(exectsk.TaskInfo) TaskBody
func Register[TInfo exectsk.TaskInfo, TTaskBody TaskBody](ctor func(info TInfo) TTaskBody) {
taskFromInfoCtors[myreflect.TypeOf[TInfo]()] = func(info exectsk.TaskInfo) TaskBody {
return ctor(info.(TInfo))
}
}