算子控制模块 init

This commit is contained in:
songjc 2023-09-04 09:51:20 +08:00
parent 88fd60dceb
commit 04cf03ce2c
15 changed files with 411 additions and 103 deletions

View File

@ -7,6 +7,7 @@ import (
scmq "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq"
cltmq "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq/collector"
execmq "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq/executor"
magmq "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq/manager"
)
var ExecutorMQPool *execmq.Pool
@ -36,3 +37,5 @@ var PCMPool *pcm.Pool
func InitPCMPool(cfg *pcm.Config) {
PCMPool = pcm.NewPool(cfg)
}
var ManagerPool *magmq.Pool

View File

@ -0,0 +1,18 @@
package models
const (
// 提交(被调整器选择前)
TaskStatusCommit = "Commit"
// 调整中(被调整器选择但未调整完)
TaskStatusChanging = "Changing"
// 待执行(被调整器选择且调整完)
TaskStatusWaiting = "Waiting"
// 提交中(被执行器选择但未提交)
TaskStatusSubmitting = "Submitting"
// 已提交(被执行器选择且已提交、但未执行完成)
TaskStatusSubmitted = "Submitted"
// 已完成(执行完成)
TaskStatusCompleted = "Completed"
// 失败
TaskStatusFailed = "Failed"
)

View File

@ -3,44 +3,71 @@ package executor
import "gitlink.org.cn/cloudream/common/pkgs/mq"
type PCMService interface {
UploadImg(msg *UploadImg) (*UploadImgResp, *mq.CodeMessage)
StartUploadImg(msg *StartUploadImg) (*StartUploadImgResp, *mq.CodeMessage)
WaitUploadImg(msg *WaitUploadImg) (*WaitUploadImgResp, *mq.CodeMessage)
GetImgList(msg *GetImgList) (*GetImgListResp, *mq.CodeMessage)
DeleteImg(msg *DeleteImg) (*DeleteImgResp, *mq.CodeMessage)
SchedulerTask(msg *SchedulerTask) (*SchedulerTaskResp, *mq.CodeMessage)
// GetTaskStatus(msg *GetOneTaskStatus) (*GetOneTaskStatusResp, *mq.CodeMessage)
StartSchedulerTask(msg *StartSchedulerTask) (*StartSchedulerTaskResp, *mq.CodeMessage)
WaitSchedulerTask(msg *WaitSchedulerTask) (*WaitSchedulerTaskResp, *mq.CodeMessage)
DeleteTask(msg *DeleteTask) (*DeleteTaskResp, *mq.CodeMessage)
}
// 上传(并注册)镜像
var _ = Register(PCMService.UploadImg)
// 启动上传(并注册)镜像任务
var _ = Register(PCMService.StartUploadImg)
type UploadImg struct {
type StartUploadImg struct {
NodeID int64 `json:"nodeID"`
ImgPath string `json:"imgPath"`
}
type StartUploadImgResp struct {
TaskID string `json:"taskID"`
}
func NewUploadImg(nodeID int64, imgPath string) UploadImg {
return UploadImg{
func NewStartUploadImg(nodeID int64, imgPath string) StartUploadImg {
return StartUploadImg{
NodeID: nodeID,
ImgPath: imgPath,
}
}
type UploadImgResp struct {
Result string `json:"result"`
ImgID int64 `json:"imgID"`
}
func NewUploadImgResp(result string, imgID int64) UploadImgResp {
return UploadImgResp{
Result: result,
ImgID: imgID,
func NewStartUploadImgResp(taskID string) StartUploadImgResp {
return StartUploadImgResp{
TaskID: taskID,
}
}
func (c *Client) StartUploadImg(msg StartUploadImg, opts ...mq.RequestOption) (*StartUploadImgResp, error) {
return mq.Request[StartUploadImgResp](c.rabbitCli, msg, opts...)
}
func (c *Client) UploadImg(msg UploadImg, opts ...mq.RequestOption) (*UploadImgResp, error) {
return mq.Request[UploadImgResp](c.rabbitCli, msg, opts...)
// 等待上传(并注册)镜像任务
var _ = Register(PCMService.WaitUploadImg)
type WaitUploadImg struct {
TaskID string `json:"taskID"`
WaitTimeoutMs int64 `json:"waitTimeoutMs"`
}
type WaitUploadImgResp struct {
IsComplete bool `json:"isComplete"`
Error string `json:"error"`
}
func NewWaitUploadImg(taskID string, waitTimeoutMs int64) WaitUploadImg {
return WaitUploadImg{
TaskID: taskID,
WaitTimeoutMs: waitTimeoutMs,
}
}
func NewWaitUploadImgResp(isComplete bool, err string) WaitUploadImgResp {
return WaitUploadImgResp{
IsComplete: isComplete,
Error: err,
}
}
func (c *Client) WaitUploadImg(msg WaitUploadImg, opts ...mq.RequestOption) (*WaitUploadImgResp, error) {
return mq.Request[WaitUploadImgResp](c.rabbitCli, msg, opts...)
}
// 查询镜像列表
@ -99,61 +126,63 @@ func (c *Client) DeleteImg(msg DeleteImg, opts ...mq.RequestOption) (*DeleteImgR
return mq.Request[DeleteImgResp](c.rabbitCli, msg, opts...)
}
// 提交任务
var _ = Register(PCMService.SchedulerTask)
// 启动提交任务
var _ = Register(PCMService.StartSchedulerTask)
type SchedulerTask struct {
type StartSchedulerTask struct {
NodeID int64 `json:"nodeID"`
Envs []map[string]string `json:"envs"`
ImgID int64 `json:"imgID"`
CMDLine string `json:"cmdLine"`
}
type StartSchedulerTaskResp struct {
TaskID string `json:"taskID"`
}
func NewSchedulerTask(nodeID int64, envs []map[string]string, imgID int64, cmdLine string) SchedulerTask {
return SchedulerTask{
func NewStartSchedulerTask(nodeID int64, envs []map[string]string, imgID int64, cmdLine string) StartSchedulerTask {
return StartSchedulerTask{
NodeID: nodeID,
Envs: envs,
ImgID: imgID,
CMDLine: cmdLine,
}
}
type SchedulerTaskResp struct {
Result string `json:"result"`
PCMJobID int64 `json:"pcmJobID"`
}
func NewSchedulerTaskResp(result string, pcmJobID int64) SchedulerTaskResp {
return SchedulerTaskResp{
Result: result,
PCMJobID: pcmJobID,
func NewStartSchedulerTaskResp(taskID string) StartSchedulerTaskResp {
return StartSchedulerTaskResp{
TaskID: taskID,
}
}
func (c *Client) SchedulerTask(msg SchedulerTask, opts ...mq.RequestOption) (*SchedulerTaskResp, error) {
return mq.Request[SchedulerTaskResp](c.rabbitCli, msg, opts...)
func (c *Client) StartSchedulerTask(msg StartUploadImg, opts ...mq.RequestOption) (*StartSchedulerTaskResp, error) {
return mq.Request[StartSchedulerTaskResp](c.rabbitCli, msg, opts...)
}
// 获取package的缓存分布情况
// var _ = Register(PCMService.GetOneTaskStatus)
// 等待提交任务
var _ = Register(PCMService.WaitUploadImg)
// type GetOneTaskStatus struct {
// }
type WaitSchedulerTask struct {
TaskID string `json:"taskID"`
WaitTimeoutMs int64 `json:"waitTimeoutMs"`
}
type WaitSchedulerTaskResp struct {
IsComplete bool `json:"isComplete"`
Error string `json:"error"`
}
// func NewGetOneTaskStatus() GetOneTaskStatus {
// return GetOneTaskStatus{}
// }
// type GetOneTaskStatusResp struct {
// }
// func NewGetOneTaskStatusResp() GetAllTaskStatusResp {
// return GetAllTaskStatusResp{}
// }
// func (c *Client) GetOneTaskStatus(msg GetOneTaskStatus, opts ...mq.RequestOption) (*GetOneTaskStatusResp, error) {
// return mq.Request[GetOneTaskStatusResp](c.rabbitCli, msg, opts...)
// }
func NewWaitSchedulerTask(taskID string, waitTimeoutMs int64) WaitSchedulerTask {
return WaitSchedulerTask{
TaskID: taskID,
WaitTimeoutMs: waitTimeoutMs,
}
}
func NewWaitSchedulerTaskResp(isComplete bool, err string) WaitSchedulerTaskResp {
return WaitSchedulerTaskResp{
IsComplete: isComplete,
Error: err,
}
}
func (c *Client) WaitSchedulerTask(msg WaitUploadImg, opts ...mq.RequestOption) (*WaitSchedulerTaskResp, error) {
return mq.Request[WaitSchedulerTaskResp](c.rabbitCli, msg, opts...)
}
// 删除任务
var _ = Register(PCMService.DeleteTask)

View File

@ -0,0 +1,32 @@
package task
type TaskStatus interface{}
type TaskStatusConst interface {
TaskStatus | SchedulerTaskStatus | UploadImgTaskStatus
}
type SchedulerTaskStatus struct {
Status string `json:"status"`
PCMJobID int64 `json:"pcmJobID"`
}
func NewSchedulerTaskStatus(status string, pcmJobID int64) SchedulerTaskStatus {
return SchedulerTaskStatus{
Status: status,
PCMJobID: pcmJobID,
}
}
type UploadImgTaskStatus struct {
TaskID string `json:"taskID"`
Status string `json:"status"`
ImgID int64 `json:"imgID"`
}
func NewUploadImgTaskStatus(status string, imgID int64) UploadImgTaskStatus {
return UploadImgTaskStatus{
Status: status,
ImgID: imgID,
}
}

View File

@ -2,25 +2,26 @@ package manager
import (
"gitlink.org.cn/cloudream/common/pkgs/mq"
exectsk "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq/executor/task"
)
// 接收executor上报的存活状态及任务执行情况
var _ = Register(Service.ReportTaskStatus)
type ReportTaskStatus struct {
IsAlive bool `json:"isAlive"`
Jobs JobStatus `json:"jobs"`
ExecutorID string `json:"executorID"`
TaskStatus []TaskStatus `json:"taskStatus"`
}
type JobStatus struct {
JobID int64 `json:"jobID"`
Status string `json:"status"`
type TaskStatus struct {
TaskID string
Status exectsk.TaskStatus
}
func NewReportTaskStatus(isAlive bool, jobs JobStatus) ReportTaskStatus {
func NewReportTaskStatus(executorID string, taskStatus []TaskStatus) ReportTaskStatus {
return ReportTaskStatus{
IsAlive: isAlive,
Jobs: jobs,
ExecutorID: executorID,
TaskStatus: taskStatus,
}
}

View File

@ -0,0 +1,70 @@
package reporter
import (
"fmt"
"time"
"gitlink.org.cn/cloudream/common/pkgs/mq"
"gitlink.org.cn/cloudream/scheduler/common/globals"
exectsk "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq/executor/task"
mgrmq "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq/manager"
myreflect "gitlink.org.cn/cloudream/common/utils/reflect"
)
type Reporter struct {
executorID string
taskStatus map[string]exectsk.TaskStatus
}
func NewReportTaskStatus(executorID string) Reporter {
return Reporter{
executorID: executorID,
taskStatus: make(map[string]exectsk.TaskStatus),
}
}
func (rts *Reporter) Report(taskID string, taskStatus exectsk.TaskStatus) error {
rts.taskStatus[taskID] = taskStatus
return nil
}
func (r *Reporter) serve() error {
magCli, err := globals.ManagerPool.Acquire()
if err != nil {
return fmt.Errorf("new manager client: %w", err)
}
defer magCli.Close()
tasksChan := make(chan Reporter)
ticker := time.NewTicker(30 * time.Second)
defer ticker.Stop()
for {
select {
case <-ticker.C:
fmt.Println("Sending heartbeat to manager...")
case <-tasksChan:
fmt.Printf("Received status change for JobID %s: %s\n", r.executorID, r.taskStatus)
ticker.Reset(30 * time.Second)
}
var taskStatus []mgrmq.TaskStatus
for taskID, status := range r.taskStatus {
taskStatus = append(taskStatus, mgrmq.TaskStatus{
TaskID: taskID,
Status: status,
})
}
magCli.ReportTaskStatus(mgrmq.NewReportTaskStatus(r.executorID, taskStatus))
}
}
func init() {
mq.RegisterTypeSet[exectsk.TaskStatus](
myreflect.TypeOf[exectsk.SchedulerTaskStatus](),
myreflect.TypeOf[exectsk.UploadImgTaskStatus](),
)
}

View File

@ -1,32 +1,33 @@
package services
import (
"time"
"gitlink.org.cn/cloudream/common/api/pcm"
"gitlink.org.cn/cloudream/common/consts/errorcode"
"gitlink.org.cn/cloudream/common/pkgs/logger"
"gitlink.org.cn/cloudream/common/pkgs/mq"
"gitlink.org.cn/cloudream/scheduler/common/globals"
execmq "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq/executor"
schtsk "gitlink.org.cn/cloudream/scheduler/executor/internal/task"
)
func (svc *Service) UploadImg(msg *execmq.UploadImg) (*execmq.UploadImgResp, *mq.CodeMessage) {
pcmCli, err := globals.PCMPool.Acquire()
if err != nil {
logger.Warnf("new pcm client, err: %s", err.Error())
return mq.ReplyFailed[execmq.UploadImgResp](errorcode.OperationFailed, "new pcm client failed")
}
defer pcmCli.Close()
func (svc *Service) StartUploadImg(msg *execmq.StartUploadImg) (*execmq.StartUploadImgResp, *mq.CodeMessage) {
tsk := svc.taskManager.StartNew(schtsk.NewPCMUploadImg(msg.NodeID, msg.ImgPath))
return mq.ReplyOK(execmq.NewStartUploadImgResp(tsk.ID()))
}
resp, err := pcmCli.UploadImg(pcm.UploadImgReq{
NodeID: msg.NodeID,
ImgPath: msg.ImgPath,
})
if err != nil {
logger.Warnf("upload image failed, err: %s", err.Error())
return mq.ReplyFailed[execmq.UploadImgResp](errorcode.OperationFailed, "upload image failed")
}
func (svc *Service) WaitUploadImg(msg *execmq.WaitUploadImg) (*execmq.WaitUploadImgResp, *mq.CodeMessage) {
tsk := svc.taskManager.FindByID(msg.TaskID)
if tsk.WaitTimeout(time.Duration(msg.WaitTimeoutMs) * time.Millisecond) {
errMsg := ""
if tsk.Error() != nil {
errMsg = tsk.Error().Error()
}
return mq.ReplyOK(execmq.NewUploadImgResp(resp.Result, resp.ImgID))
return mq.ReplyOK(execmq.NewWaitUploadImgResp(true, errMsg))
}
return mq.ReplyOK(execmq.NewWaitUploadImgResp(false, ""))
}
func (svc *Service) GetImgList(msg *execmq.GetImgList) (*execmq.GetImgListResp, *mq.CodeMessage) {
@ -67,25 +68,22 @@ func (svc *Service) DeleteImg(msg *execmq.DeleteImg) (*execmq.DeleteImgResp, *mq
return mq.ReplyOK(execmq.NewDeleteImgResp(resp.Result))
}
func (svc *Service) SchedulerTask(msg *execmq.SchedulerTask) (*execmq.SchedulerTaskResp, *mq.CodeMessage) {
pcmCli, err := globals.PCMPool.Acquire()
if err != nil {
logger.Warnf("new pcm client, err: %s", err.Error())
return mq.ReplyFailed[execmq.SchedulerTaskResp](errorcode.OperationFailed, "new pcm client failed")
}
defer pcmCli.Close()
func (svc *Service) StartSchedulerTask(msg *execmq.StartSchedulerTask) (*execmq.StartSchedulerTaskResp, *mq.CodeMessage) {
tsk := svc.taskManager.StartNew(schtsk.NewPCMSchedulerTask(msg.NodeID, msg.Envs, msg.ImgID, msg.CMDLine))
return mq.ReplyOK(execmq.NewStartSchedulerTaskResp(tsk.ID()))
}
resp, err := pcmCli.SchedulerTask(pcm.SchedulerTaskReq{
NodeID: msg.NodeID,
Envs: msg.Envs,
ImgID: msg.ImgID,
CMDLine: msg.CMDLine,
})
if err != nil {
logger.Warnf("scheduler task failed, err: %s", err.Error())
return mq.ReplyFailed[execmq.SchedulerTaskResp](errorcode.OperationFailed, "scheduler task failed")
func (svc *Service) WaitSchedulerTask(msg *execmq.WaitSchedulerTask) (*execmq.WaitSchedulerTaskResp, *mq.CodeMessage) {
tsk := svc.taskManager.FindByID(msg.TaskID)
if tsk.WaitTimeout(time.Duration(msg.WaitTimeoutMs) * time.Millisecond) {
errMsg := ""
if tsk.Error() != nil {
errMsg = tsk.Error().Error()
}
return mq.ReplyOK(execmq.NewWaitSchedulerTaskResp(true, errMsg))
}
return mq.ReplyOK(execmq.NewSchedulerTaskResp(resp.Result, resp.PCMJobID))
return mq.ReplyOK(execmq.NewWaitSchedulerTaskResp(false, ""))
}
func (svc *Service) DeleteTask(msg *execmq.DeleteTask) (*execmq.DeleteTaskResp, *mq.CodeMessage) {

View File

@ -48,6 +48,7 @@ func (svc *Service) WaitStorageCreatePackage(msg *execmq.WaitStorageCreatePackag
func (svc *Service) StartCacheMovePackage(msg *execmq.StartCacheMovePackage) (*execmq.StartCacheMovePackageResp, *mq.CodeMessage) {
tsk := svc.taskManager.StartNew(schtsk.NewCacheMovePackage(msg.UserID, msg.PackageID, msg.NodeID))
// tsk := svc.taskManager.StartNew(task.TaskBody[schtsk.NewCacheMovePackage(msg.UserID, msg.PackageID, msg.NodeID)])
return mq.ReplyOK(execmq.NewStartCacheMovePackageResp(tsk.ID()))
}

View File

@ -6,6 +6,7 @@ import (
"gitlink.org.cn/cloudream/common/api/storage"
"gitlink.org.cn/cloudream/common/pkgs/logger"
"gitlink.org.cn/cloudream/common/pkgs/task"
"gitlink.org.cn/cloudream/scheduler/common/globals"
)
@ -23,12 +24,13 @@ func NewCacheMovePackage(userID int64, packageID int64, nodeID int64) *CacheMove
}
}
func (t *CacheMovePackage) Execute(ctx TaskContext, complete CompleteFn) {
func (t *CacheMovePackage) Execute(task *task.Task[TaskContext], ctx TaskContext, complete CompleteFn) {
log := logger.WithType[CacheMovePackage]("Task")
log.Debugf("begin with %v", logger.FormatStruct(t))
defer log.Debugf("end")
err := t.do(ctx)
complete(err, CompleteOption{
RemovingDelay: time.Minute,
})

View File

@ -0,0 +1,80 @@
package task
import (
"fmt"
"time"
"gitlink.org.cn/cloudream/common/api/pcm"
"gitlink.org.cn/cloudream/common/pkgs/logger"
"gitlink.org.cn/cloudream/common/pkgs/task"
"gitlink.org.cn/cloudream/scheduler/common/globals"
exectsk "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq/executor/task"
)
type PCMSchedulerTask struct {
nodeID int64
envs []map[string]string
imgID int64
cmdLine string
Result PCMSchedulerTaskResult
}
type PCMSchedulerTaskResult struct {
Result string
PCMJobID int64
}
func NewPCMSchedulerTask(nodeID int64, envs []map[string]string, imgID int64, cmdLine string) *PCMSchedulerTask {
return &PCMSchedulerTask{
nodeID: nodeID,
envs: envs,
imgID: imgID,
cmdLine: cmdLine,
}
}
func (t *PCMSchedulerTask) Execute(task *task.Task[TaskContext], ctx TaskContext, complete CompleteFn) {
log := logger.WithType[PCMSchedulerTask]("Task")
log.Debugf("begin")
defer log.Debugf("end")
err := t.do(task.ID(), ctx)
complete(err, CompleteOption{
RemovingDelay: time.Minute,
})
}
func (t *PCMSchedulerTask) do(taskID string, ctx TaskContext) error {
pcmCli, err := globals.PCMPool.Acquire()
if err != nil {
return fmt.Errorf("new pcm client: %w", err)
}
defer pcmCli.Close()
resp, err := pcmCli.SchedulerTask(pcm.SchedulerTaskReq{
NodeID: t.nodeID,
Envs: t.envs,
ImgID: t.imgID,
CMDLine: t.cmdLine,
})
if err != nil {
return err
}
for {
tsResp, err := pcmCli.GetTaskStatus(pcm.GetTaskStatusReq{
NodeID: t.nodeID,
PCMJobID: resp.PCMJobID,
})
if err != nil {
return err
}
taskStatus := exectsk.NewSchedulerTaskStatus(tsResp.Status, resp.PCMJobID)
ctx.reporter.Report(taskID, taskStatus)
t.Result.Result = tsResp.Result
t.Result.PCMJobID = resp.PCMJobID
}
}

View File

@ -0,0 +1,65 @@
package task
import (
"fmt"
"time"
"gitlink.org.cn/cloudream/common/api/pcm"
"gitlink.org.cn/cloudream/common/pkgs/logger"
"gitlink.org.cn/cloudream/common/pkgs/task"
"gitlink.org.cn/cloudream/scheduler/common/globals"
exectsk "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq/executor/task"
)
type PCMUploadImg struct {
nodeID int64
imgPath string
Result PCMUploadImgResult
}
type PCMUploadImgResult struct {
Result string `json:"result"`
ImgID int64 `json:"imgID"`
}
func NewPCMUploadImg(nodeID int64, imgPath string) *PCMUploadImg {
return &PCMUploadImg{
nodeID: nodeID,
imgPath: imgPath,
}
}
func (t *PCMUploadImg) Execute(task *task.Task[TaskContext], ctx TaskContext, complete CompleteFn) {
log := logger.WithType[PCMUploadImg]("Task")
log.Debugf("begin")
defer log.Debugf("end")
err := t.do(task.ID(), ctx)
complete(err, CompleteOption{
RemovingDelay: time.Minute,
})
}
func (t *PCMUploadImg) do(taskID string, ctx TaskContext) error {
pcmCli, err := globals.PCMPool.Acquire()
if err != nil {
return fmt.Errorf("new pcm client: %w", err)
}
defer pcmCli.Close()
resp, err := pcmCli.UploadImg(pcm.UploadImgReq{
NodeID: t.nodeID,
ImgPath: t.imgPath,
})
if err != nil {
return err
}
taskStatus := exectsk.NewUploadImgTaskStatus(resp.Result, resp.ImgID)
ctx.reporter.Report(taskID, taskStatus)
t.Result.Result = resp.Result
t.Result.ImgID = resp.ImgID
return nil
}

View File

@ -7,6 +7,7 @@ import (
"gitlink.org.cn/cloudream/common/api/storage"
"gitlink.org.cn/cloudream/common/models"
"gitlink.org.cn/cloudream/common/pkgs/logger"
"gitlink.org.cn/cloudream/common/pkgs/task"
"gitlink.org.cn/cloudream/scheduler/common/globals"
)
@ -36,12 +37,13 @@ func NewStorageCreatePackage(userID int64, storageID int64, path string, bucketI
}
}
func (t *StorageCreatePackage) Execute(ctx TaskContext, complete CompleteFn) {
func (t *StorageCreatePackage) Execute(task *task.Task[TaskContext], ctx TaskContext, complete CompleteFn) {
log := logger.WithType[StorageCreatePackage]("Task")
log.Debugf("begin")
defer log.Debugf("end")
err := t.do(ctx)
complete(err, CompleteOption{
RemovingDelay: time.Minute,
})

View File

@ -6,6 +6,7 @@ import (
"gitlink.org.cn/cloudream/common/api/storage"
"gitlink.org.cn/cloudream/common/pkgs/logger"
"gitlink.org.cn/cloudream/common/pkgs/task"
"gitlink.org.cn/cloudream/scheduler/common/globals"
)
@ -23,12 +24,13 @@ func NewStorageLoadPackage(userID int64, packageID int64, storageID int64) *Stor
}
}
func (t *StorageLoadPackage) Execute(ctx TaskContext, complete CompleteFn) {
func (t *StorageLoadPackage) Execute(task *task.Task[TaskContext], ctx TaskContext, complete CompleteFn) {
log := logger.WithType[StorageLoadPackage]("Task")
log.Debugf("begin with %w", logger.FormatStruct(t))
defer log.Debugf("end")
err := t.do(ctx)
complete(err, CompleteOption{
RemovingDelay: time.Minute,
})

View File

@ -2,9 +2,11 @@ package task
import (
"gitlink.org.cn/cloudream/common/pkgs/task"
reporter "gitlink.org.cn/cloudream/scheduler/executor/internal/reporter"
)
type TaskContext struct {
reporter *reporter.Reporter
}
// 需要在Task结束后主动调用completing函数将在Manager加锁期间被调用
@ -19,6 +21,8 @@ type Task = task.Task[TaskContext]
type CompleteOption = task.CompleteOption
func NewManager() Manager {
return task.NewManager(TaskContext{})
func NewManager(reporter *reporter.Reporter) Manager {
return task.NewManager(TaskContext{
reporter: reporter,
})
}

View File

@ -8,6 +8,7 @@ import (
"gitlink.org.cn/cloudream/scheduler/common/globals"
execmq "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq/executor"
"gitlink.org.cn/cloudream/scheduler/executor/internal/config"
"gitlink.org.cn/cloudream/scheduler/executor/internal/reporter"
"gitlink.org.cn/cloudream/scheduler/executor/internal/services"
"gitlink.org.cn/cloudream/scheduler/executor/internal/task"
)
@ -28,7 +29,7 @@ func main() {
globals.InitCloudreamStoragePool(&config.Cfg().CloudreamStorage)
globals.InitPCMPool(&config.Cfg().PCM)
taskMgr := task.NewManager()
taskMgr := task.NewManager(&reporter.Reporter{})
mqSvr, err := execmq.NewServer(services.NewService(&taskMgr), &config.Cfg().RabbitMQ)
if err != nil {