This commit is contained in:
songjc 2023-09-04 16:48:46 +08:00
parent 156d418265
commit 7187e46185
9 changed files with 88 additions and 179 deletions

View File

@ -7,13 +7,10 @@ import (
type StorageService interface {
StartStorageLoadPackage(msg *StartStorageLoadPackage) (*StartStorageLoadPackageResp, *mq.CodeMessage)
WaitStorageLoadPackage(msg *WaitStorageLoadPackage) (*WaitStorageLoadPackageResp, *mq.CodeMessage)
StartStorageCreatePackage(msg *StartStorageCreatePackage) (*StartStorageCreatePackageResp, *mq.CodeMessage)
WaitStorageCreatePackage(msg *WaitStorageCreatePackage) (*WaitStorageCreatePackageResp, *mq.CodeMessage)
StartCacheMovePackage(msg *StartCacheMovePackage) (*StartCacheMovePackageResp, *mq.CodeMessage)
WaitCacheMovePackage(msg *WaitCacheMovePackage) (*WaitCacheMovePackageResp, *mq.CodeMessage)
}
// 启动存储系统调度文件任务
@ -43,34 +40,6 @@ func (c *Client) StartStorageLoadPackage(msg StartStorageLoadPackage, opts ...mq
return mq.Request[StartStorageLoadPackageResp](c.rabbitCli, msg, opts...)
}
// 等待存储系统调度文件任务
var _ = Register(StorageService.WaitStorageLoadPackage)
type WaitStorageLoadPackage struct {
TaskID string `json:"taskID"`
WaitTimeoutMs int64 `json:"waitTimeoutMs"`
}
type WaitStorageLoadPackageResp struct {
IsComplete bool `json:"isComplete"`
Error string `json:"error"`
}
func NewWaitStorageLoadPackage(taskID string, waitTimeoutMs int64) WaitStorageLoadPackage {
return WaitStorageLoadPackage{
TaskID: taskID,
WaitTimeoutMs: waitTimeoutMs,
}
}
func NewWaitStorageLoadPackageResp(isComplete bool, err string) WaitStorageLoadPackageResp {
return WaitStorageLoadPackageResp{
IsComplete: isComplete,
Error: err,
}
}
func (c *Client) WaitStorageLoadPackage(msg WaitStorageLoadPackage, opts ...mq.RequestOption) (*WaitStorageLoadPackageResp, error) {
return mq.Request[WaitStorageLoadPackageResp](c.rabbitCli, msg, opts...)
}
// 启动存储系统从存储服务上传文件任务
var _ = Register(StorageService.StartStorageCreatePackage)
@ -105,36 +74,6 @@ func (c *Client) StartStorageCreatePackage(msg StartStorageCreatePackage, opts .
return mq.Request[StartStorageCreatePackageResp](c.rabbitCli, msg, opts...)
}
// 等待存储系统从存储服务上传文件任务
var _ = Register(StorageService.WaitStorageCreatePackage)
type WaitStorageCreatePackage struct {
TaskID string `json:"taskID"`
WaitTimeoutMs int64 `json:"waitTimeoutMs"`
}
type WaitStorageCreatePackageResp struct {
IsComplete bool `json:"isComplete"`
Error string `json:"error"`
PackageID int64 `json:"packageID"`
}
func NewWaitStorageCreatePackage(taskID string, waitTimeoutMs int64) WaitStorageCreatePackage {
return WaitStorageCreatePackage{
TaskID: taskID,
WaitTimeoutMs: waitTimeoutMs,
}
}
func NewWaitStorageCreatePackageResp(isComplete bool, err string, packageID int64) WaitStorageCreatePackageResp {
return WaitStorageCreatePackageResp{
IsComplete: isComplete,
Error: err,
PackageID: packageID,
}
}
func (c *Client) WaitStorageCreatePackage(msg WaitStorageCreatePackage, opts ...mq.RequestOption) (*WaitStorageCreatePackageResp, error) {
return mq.Request[WaitStorageCreatePackageResp](c.rabbitCli, msg, opts...)
}
// 启动存储系统调度文件到某个节点的缓存的任务
var _ = Register(StorageService.StartCacheMovePackage)
@ -162,31 +101,3 @@ func NewStartCacheMovePackageResp(taskID string) StartCacheMovePackageResp {
func (c *Client) StartCacheMovePackage(msg StartCacheMovePackage, opts ...mq.RequestOption) (*StartCacheMovePackageResp, error) {
return mq.Request[StartCacheMovePackageResp](c.rabbitCli, msg, opts...)
}
// 等待存储系统调度文件到某个节点的缓存的任务
var _ = Register(StorageService.WaitCacheMovePackage)
type WaitCacheMovePackage struct {
TaskID string `json:"taskID"`
WaitTimeoutMs int64 `json:"waitTimeoutMs"`
}
type WaitCacheMovePackageResp struct {
IsComplete bool `json:"isComplete"`
Error string `json:"error"`
}
func NewWaitCacheMovePackage(taskID string, waitTimeoutMs int64) WaitCacheMovePackage {
return WaitCacheMovePackage{
TaskID: taskID,
WaitTimeoutMs: waitTimeoutMs,
}
}
func NewWaitCacheMovePackageResp(isComplete bool, err string) WaitCacheMovePackageResp {
return WaitCacheMovePackageResp{
IsComplete: isComplete,
Error: err,
}
}
func (c *Client) WaitCacheMovePackage(msg WaitCacheMovePackage, opts ...mq.RequestOption) (*WaitCacheMovePackageResp, error) {
return mq.Request[WaitCacheMovePackageResp](c.rabbitCli, msg, opts...)
}

View File

@ -8,25 +8,66 @@ type TaskStatusConst interface {
type ScheduleTaskStatus struct {
Status string `json:"status"`
Err error `json:"err"`
PCMJobID int64 `json:"pcmJobID"`
}
func NewScheduleTaskStatus(status string, pcmJobID int64) ScheduleTaskStatus {
func NewScheduleTaskStatus(status string, err error, pcmJobID int64) ScheduleTaskStatus {
return ScheduleTaskStatus{
Status: status,
Err: err,
PCMJobID: pcmJobID,
}
}
type UploadImageTaskStatus struct {
TaskID string `json:"taskID"`
Status string `json:"status"`
Err error `json:"err"`
ImageID int64 `json:"imageID"`
}
func NewUploadImageTaskStatus(status string, imageID int64) UploadImageTaskStatus {
func NewUploadImageTaskStatus(status string, err error, imageID int64) UploadImageTaskStatus {
return UploadImageTaskStatus{
Status: status,
Err: err,
ImageID: imageID,
}
}
type CacheMovePackageTaskStatus struct {
Status string `json:"status"`
Err error `json:"err"`
}
func NewCacheMovePackageTaskStatus(status string, err error) CacheMovePackageTaskStatus {
return CacheMovePackageTaskStatus{
Status: status,
Err: err,
}
}
type CreatePackageTaskStatus struct {
Status string `json:"status"`
Err error `json:"err"`
PackageID int64 `json:"packageID"`
}
func NewCreatePackageTaskStatus(status string, err error, packageID int64) CreatePackageTaskStatus {
return CreatePackageTaskStatus{
Status: status,
Err: err,
PackageID: packageID,
}
}
type LoadPackageTaskStatus struct {
Status string `json:"status"`
Err error `json:"err"`
}
func NewLoadPackageTaskStatus(status string, err error) LoadPackageTaskStatus {
return LoadPackageTaskStatus{
Status: status,
Err: err,
}
}

View File

@ -77,7 +77,9 @@ func (r *Reporter) Serve() error {
//若上报失败,数据应保留
r.taskStatusLock.Lock()
for _, ts := range taskStatus {
r.Report(ts.TaskID, ts.Status)
if _, exists := r.taskStatus[ts.TaskID]; !exists {
r.taskStatus[ts.TaskID] = ts.Status
}
}
r.taskStatusLock.Unlock()
}
@ -88,5 +90,8 @@ func init() {
mq.RegisterTypeSet[exectsk.TaskStatus](
myreflect.TypeOf[exectsk.ScheduleTaskStatus](),
myreflect.TypeOf[exectsk.UploadImageTaskStatus](),
myreflect.TypeOf[exectsk.CacheMovePackageTaskStatus](),
myreflect.TypeOf[exectsk.CreatePackageTaskStatus](),
myreflect.TypeOf[exectsk.LoadPackageTaskStatus](),
)
}

View File

@ -1,8 +1,6 @@
package services
import (
"time"
"gitlink.org.cn/cloudream/common/pkgs/mq"
execmq "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq/executor"
schtsk "gitlink.org.cn/cloudream/scheduler/executor/internal/task"
@ -13,54 +11,13 @@ func (svc *Service) StartStorageLoadPackage(msg *execmq.StartStorageLoadPackage)
return mq.ReplyOK(execmq.NewStartStorageLoadPackageResp(tsk.ID()))
}
func (svc *Service) WaitStorageLoadPackage(msg *execmq.WaitStorageLoadPackage) (*execmq.WaitStorageLoadPackageResp, *mq.CodeMessage) {
tsk := svc.taskManager.FindByID(msg.TaskID)
if tsk.WaitTimeout(time.Duration(msg.WaitTimeoutMs) * time.Millisecond) {
errMsg := ""
if tsk.Error() != nil {
errMsg = tsk.Error().Error()
}
return mq.ReplyOK(execmq.NewWaitStorageLoadPackageResp(true, errMsg))
}
return mq.ReplyOK(execmq.NewWaitStorageLoadPackageResp(false, ""))
}
func (svc *Service) StartStorageCreatePackage(msg *execmq.StartStorageCreatePackage) (*execmq.StartStorageCreatePackageResp, *mq.CodeMessage) {
tsk := svc.taskManager.StartNew(schtsk.NewStorageCreatePackage(msg.UserID, msg.StorageID, msg.Path, msg.BucketID, msg.Name, msg.Redundancy))
return mq.ReplyOK(execmq.NewStartStorageCreatePackageResp(tsk.ID()))
}
func (svc *Service) WaitStorageCreatePackage(msg *execmq.WaitStorageCreatePackage) (*execmq.WaitStorageCreatePackageResp, *mq.CodeMessage) {
tsk := svc.taskManager.FindByID(msg.TaskID)
if tsk.WaitTimeout(time.Duration(msg.WaitTimeoutMs) * time.Millisecond) {
tskBody := tsk.Body().(*schtsk.StorageCreatePackage)
errMsg := ""
if tsk.Error() != nil {
errMsg = tsk.Error().Error()
}
return mq.ReplyOK(execmq.NewWaitStorageCreatePackageResp(true, errMsg, tskBody.Result.PackageID))
}
return mq.ReplyOK(execmq.NewWaitStorageCreatePackageResp(false, "", 0))
}
func (svc *Service) StartCacheMovePackage(msg *execmq.StartCacheMovePackage) (*execmq.StartCacheMovePackageResp, *mq.CodeMessage) {
tsk := svc.taskManager.StartNew(schtsk.NewCacheMovePackage(msg.UserID, msg.PackageID, msg.NodeID))
// tsk := svc.taskManager.StartNew(task.TaskBody[schtsk.NewCacheMovePackage(msg.UserID, msg.PackageID, msg.NodeID)])
return mq.ReplyOK(execmq.NewStartCacheMovePackageResp(tsk.ID()))
}
func (svc *Service) WaitCacheMovePackage(msg *execmq.WaitCacheMovePackage) (*execmq.WaitCacheMovePackageResp, *mq.CodeMessage) {
tsk := svc.taskManager.FindByID(msg.TaskID)
if tsk.WaitTimeout(time.Duration(msg.WaitTimeoutMs) * time.Millisecond) {
errMsg := ""
if tsk.Error() != nil {
errMsg = tsk.Error().Error()
}
return mq.ReplyOK(execmq.NewWaitCacheMovePackageResp(true, errMsg))
}
return mq.ReplyOK(execmq.NewWaitCacheMovePackageResp(false, ""))
}

View File

@ -8,6 +8,7 @@ import (
"gitlink.org.cn/cloudream/common/pkgs/logger"
"gitlink.org.cn/cloudream/common/pkgs/task"
"gitlink.org.cn/cloudream/scheduler/common/globals"
exectsk "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq/executor/task"
)
type CacheMovePackage struct {
@ -30,6 +31,13 @@ func (t *CacheMovePackage) Execute(task *task.Task[TaskContext], ctx TaskContext
defer log.Debugf("end")
err := t.do(ctx)
if err != nil {
//TODO 若任务失败上报的状态failed字段根据情况修改
ctx.reporter.Report(task.ID(), exectsk.NewCacheMovePackageTaskStatus("failed", err))
} else {
ctx.reporter.Report(task.ID(), exectsk.NewCacheMovePackageTaskStatus("completed", nil))
}
ctx.reporter.ReportNow()
complete(err, CompleteOption{
RemovingDelay: time.Minute,

View File

@ -17,12 +17,6 @@ type PCMScheduleTask struct {
envs []map[string]string
imageID int64
cmdLine string
Result PCMScheduleTaskResult
}
type PCMScheduleTaskResult struct {
Result string
PCMJobID int64
}
func NewPCMScheduleTask(nodeID int64, envs []map[string]string, imageID int64, cmdLine string) *PCMScheduleTask {
@ -40,9 +34,10 @@ func (t *PCMScheduleTask) Execute(task *task.Task[TaskContext], ctx TaskContext,
defer log.Debugf("end")
err := t.do(task.ID(), ctx)
//TODO 若任务失败上报的状态failed字段根据情况修改
ctx.reporter.Report(task.ID(), "failed")
if err != nil {
//TODO 若任务失败上报的状态failed字段根据情况修改
ctx.reporter.Report(task.ID(), exectsk.NewScheduleTaskStatus("failed", err, 0))
}
ctx.reporter.ReportNow()
complete(err, CompleteOption{
@ -78,20 +73,15 @@ func (t *PCMScheduleTask) do(taskID string, ctx TaskContext) error {
return err
}
taskStatus := exectsk.NewScheduleTaskStatus(tsResp.Status, resp.PCMJobID)
if taskStatus.Status != prevStatus {
ctx.reporter.Report(taskID, taskStatus)
t.Result.Result = tsResp.Result
t.Result.PCMJobID = resp.PCMJobID
if tsResp.Status != prevStatus {
ctx.reporter.Report(taskID, exectsk.NewScheduleTaskStatus(tsResp.Status, nil, resp.PCMJobID))
}
prevStatus = taskStatus.Status
prevStatus = tsResp.Status
// TODO 根据接口result返回情况修改
// 根据返回的result判定任务是否完成若完成 跳出循环,结束任务
if tsResp.Status == "Completed" {
ctx.reporter.ReportNow()
return nil
}
}

View File

@ -14,12 +14,6 @@ import (
type PCMUploadImage struct {
nodeID int64
imagePath string
Result PCMUploadImageResult
}
type PCMUploadImageResult struct {
Result string `json:"result"`
ImageID int64 `json:"imageID"`
}
func NewPCMUploadImage(nodeID int64, imagePath string) *PCMUploadImage {
@ -35,9 +29,10 @@ func (t *PCMUploadImage) Execute(task *task.Task[TaskContext], ctx TaskContext,
defer log.Debugf("end")
err := t.do(task.ID(), ctx)
//TODO 若任务失败上报的状态failed字段根据情况修改
ctx.reporter.Report(task.ID(), "failed")
if err != nil {
//TODO 若任务失败上报的状态failed字段根据情况修改
ctx.reporter.Report(task.ID(), exectsk.NewUploadImageTaskStatus("failed", err, 0))
}
ctx.reporter.ReportNow()
complete(err, CompleteOption{
@ -56,17 +51,11 @@ func (t *PCMUploadImage) do(taskID string, ctx TaskContext) error {
NodeID: t.nodeID,
ImagePath: t.imagePath,
})
if err != nil {
return err
}
// TODO 根据接口result返回情况修改
taskStatus := exectsk.NewUploadImageTaskStatus(resp.Result, resp.ImageID)
ctx.reporter.Report(taskID, taskStatus)
ctx.reporter.ReportNow()
t.Result.Result = resp.Result
t.Result.ImageID = resp.ImageID
ctx.reporter.Report(taskID, exectsk.NewUploadImageTaskStatus(resp.Result, nil, resp.ImageID))
return nil
}

View File

@ -9,6 +9,7 @@ import (
"gitlink.org.cn/cloudream/common/pkgs/logger"
"gitlink.org.cn/cloudream/common/pkgs/task"
"gitlink.org.cn/cloudream/scheduler/common/globals"
exectsk "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq/executor/task"
)
type StorageCreatePackage struct {
@ -18,12 +19,6 @@ type StorageCreatePackage struct {
bucketID int64
name string
redundancy models.TypedRedundancyInfo
Result StorageCreatePackageResult
}
type StorageCreatePackageResult struct {
PackageID int64
}
func NewStorageCreatePackage(userID int64, storageID int64, path string, bucketID int64, name string, redundancy models.TypedRedundancyInfo) *StorageCreatePackage {
@ -42,14 +37,19 @@ func (t *StorageCreatePackage) Execute(task *task.Task[TaskContext], ctx TaskCon
log.Debugf("begin")
defer log.Debugf("end")
err := t.do(ctx)
err := t.do(task.ID(), ctx)
if err != nil {
//TODO 若任务失败上报的状态failed字段根据情况修改
ctx.reporter.Report(task.ID(), exectsk.NewCreatePackageTaskStatus("failed", err, 0))
}
ctx.reporter.ReportNow()
complete(err, CompleteOption{
RemovingDelay: time.Minute,
})
}
func (t *StorageCreatePackage) do(ctx TaskContext) error {
func (t *StorageCreatePackage) do(taskID string, ctx TaskContext) error {
stgCli, err := globals.CloudreamStoragePool.Acquire()
if err != nil {
return fmt.Errorf("new cloudream storage client: %w", err)
@ -64,11 +64,11 @@ func (t *StorageCreatePackage) do(ctx TaskContext) error {
Name: t.name,
Redundancy: t.redundancy,
})
if err != nil {
return err
}
t.Result.PackageID = resp.PackageID
// TODO 根据接口result返回情况修改
ctx.reporter.Report(taskID, exectsk.NewCreatePackageTaskStatus("completed", nil, resp.PackageID))
return nil
}

View File

@ -8,6 +8,7 @@ import (
"gitlink.org.cn/cloudream/common/pkgs/logger"
"gitlink.org.cn/cloudream/common/pkgs/task"
"gitlink.org.cn/cloudream/scheduler/common/globals"
exectsk "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq/executor/task"
)
type StorageLoadPackage struct {
@ -30,6 +31,13 @@ func (t *StorageLoadPackage) Execute(task *task.Task[TaskContext], ctx TaskConte
defer log.Debugf("end")
err := t.do(ctx)
if err != nil {
//TODO 若任务失败上报的状态failed字段根据情况修改
ctx.reporter.Report(task.ID(), exectsk.NewLoadPackageTaskStatus("failed", err))
} else {
ctx.reporter.Report(task.ID(), exectsk.NewLoadPackageTaskStatus("completed", nil))
}
ctx.reporter.ReportNow()
complete(err, CompleteOption{
RemovingDelay: time.Minute,