diff --git a/advisor/internal/scheduler/service.go b/advisor/internal/scheduler/service.go index 5edefd0..1e5f54f 100644 --- a/advisor/internal/scheduler/service.go +++ b/advisor/internal/scheduler/service.go @@ -44,7 +44,7 @@ func (s *Service) MakeScheme(dump jobmod.JobDump) (*jobmod.JobScheduleScheme, er default: } - return callback.WaitValue(context.Background()) + return callback.Wait(context.Background()) } func (s *Service) Serve() error { diff --git a/common/models/job/body.go b/common/models/job/body.go index 9ac5a6a..7acaa10 100644 --- a/common/models/job/body.go +++ b/common/models/job/body.go @@ -20,6 +20,7 @@ var _ = serder.UseTypeUnionExternallyTagged(types.Ref(types.NewTypeUnion[JobBody (*DataReturnJobDump)(nil), (*InstanceJobDump)(nil), (*MultiInstanceJobDump)(nil), + (*UpdateMultiInstanceJobDump)(nil), ))) type NormalJobDump struct { @@ -64,3 +65,13 @@ type MultiInstanceJobDump struct { func (d *MultiInstanceJobDump) getType() JobBodyDumpType { return d.Type } + +type UpdateMultiInstanceJobDump struct { + serder.Metadata `union:"MultiInstanceJob"` + Type JobBodyDumpType `json:"type"` + Files JobFiles `json:"files"` +} + +func (d *UpdateMultiInstanceJobDump) getType() JobBodyDumpType { + return d.Type +} diff --git a/common/models/job/state.go b/common/models/job/state.go index a3b884a..5f99fce 100644 --- a/common/models/job/state.go +++ b/common/models/job/state.go @@ -56,6 +56,15 @@ func (dump *MultiInstCreateInitDump) getType() JobStateDumpType { return dump.Type } +type MultiInstanceUpdateDump struct { + serder.Metadata `union:"MultiInstCreateInit"` + Type JobStateDumpType `json:"type"` +} + +func (dump *MultiInstanceUpdateDump) getType() JobStateDumpType { + return dump.Type +} + type MultiInstCreateRunningDump struct { serder.Metadata `union:"MultiInstCreateRunning"` Type JobStateDumpType `json:"type"` diff --git a/common/pkgs/mq/executor/task.go b/common/pkgs/mq/executor/task.go index 8abf260..dd0f227 100644 --- a/common/pkgs/mq/executor/task.go +++ b/common/pkgs/mq/executor/task.go @@ -6,7 +6,6 @@ import ( "gitlink.org.cn/cloudream/common/pkgs/mq" myhttp "gitlink.org.cn/cloudream/common/utils/http" "gitlink.org.cn/cloudream/common/utils/serder" - "log" "net/http" "net/url" "strings" @@ -24,7 +23,8 @@ var _ = Register(Service.StartTask) type StartTask struct { mq.MessageBodyBase - Info exectsk.TaskInfo `json:"info"` + TaskID string `json:"taskID"` + Info exectsk.TaskInfo `json:"info"` } type StartTaskResp struct { mq.MessageBodyBase @@ -32,9 +32,10 @@ type StartTaskResp struct { TaskID string `json:"taskID"` } -func NewStartTask(info exectsk.TaskInfo) *StartTask { +func NewStartTask(taskID string, info exectsk.TaskInfo) *StartTask { return &StartTask{ - Info: info, + TaskID: taskID, + Info: info, } } func NewStartTaskResp(execID schmod.ExecutorID, taskID string) *StartTaskResp { @@ -53,9 +54,7 @@ func (c *HttpClient) SubmitTask(req *StartTask) (*StartTaskResp, error) { return nil, err } - //data, err := json.Marshal(req) data, err := serder.ObjectToJSONEx(req) - log.Println("send data: " + string(data)) resp, err := myhttp.PostJSONRow(targetURL, myhttp.RequestParam{ Body: data, }) @@ -95,3 +94,56 @@ func (c *HttpClient) GetReportInfo() (*http.Response, error) { return resp, nil } + +type TaskOperateInfo struct { + TaskID string + Command string +} + +func NewTaskOperateInfo(taskID string, command string) *TaskOperateInfo { + return &TaskOperateInfo{ + TaskID: taskID, + Command: command, + } +} + +type TaskOperateResp struct { + Err error +} + +func NewTaskOperateResp(err error) *TaskOperateResp { + return &TaskOperateResp{ + Err: err, + } +} + +func (c *HttpClient) OperateTask(req *TaskOperateInfo) (*TaskOperateResp, error) { + targetURL, err := url.JoinPath(c.baseURL + "/operateTask") + if err != nil { + return nil, err + } + + data, err := serder.ObjectToJSONEx(req) + resp, err := myhttp.PostJSONRow(targetURL, myhttp.RequestParam{ + Body: data, + }) + if err != nil { + return nil, err + } + + contType := resp.Header.Get("Content-Type") + if strings.Contains(contType, myhttp.ContentTypeJSON) { + var codeResp response[TaskOperateResp] + if err := serder.JSONToObjectStream(resp.Body, &codeResp); err != nil { + return nil, fmt.Errorf("parsing response: %w", err) + } + + if codeResp.Code == errorcode.OK { + return &codeResp.Data, nil + } + + return nil, codeResp.ToError() + } + + return nil, fmt.Errorf("unknow response content type: %s", contType) +} diff --git a/common/pkgs/mq/executor/task/scheduler_create_ecs.go b/common/pkgs/mq/executor/task/scheduler_create_ecs.go index b2f04a4..b13e11a 100644 --- a/common/pkgs/mq/executor/task/scheduler_create_ecs.go +++ b/common/pkgs/mq/executor/task/scheduler_create_ecs.go @@ -1,29 +1,36 @@ package task -import cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" +import ( + schsdk "gitlink.org.cn/cloudream/common/sdks/scheduler" + cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" +) type ScheduleCreateECS struct { TaskInfoBase UserID cdssdk.UserID `json:"userID"` PackageID cdssdk.PackageID `json:"packageID"` + ModelID schsdk.ModelID `json:"modelID"` } type ScheduleCreateECSStatus struct { TaskStatusBase - Error string `json:"error"` - Address string `json:"address"` + Error string `json:"error"` + Address string `json:"address"` + ModelID schsdk.ModelID `json:"modelID"` } -func NewScheduleCreateECS(userID cdssdk.UserID, packageID cdssdk.PackageID) *ScheduleCreateECS { +func NewScheduleCreateECS(userID cdssdk.UserID, packageID cdssdk.PackageID, modelID schsdk.ModelID) *ScheduleCreateECS { return &ScheduleCreateECS{ UserID: userID, PackageID: packageID, + ModelID: modelID, } } -func NewScheduleCreateECSStatus(address string, err string) *ScheduleCreateECSStatus { +func NewScheduleCreateECSStatus(address string, modelID schsdk.ModelID, err string) *ScheduleCreateECSStatus { return &ScheduleCreateECSStatus{ Address: address, + ModelID: modelID, Error: err, } } diff --git a/common/pkgs/mq/executor/task/storage_move_object.go b/common/pkgs/mq/executor/task/storage_move_object.go new file mode 100644 index 0000000..a008601 --- /dev/null +++ b/common/pkgs/mq/executor/task/storage_move_object.go @@ -0,0 +1,29 @@ +package task + +import cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" + +type StorageMoveObject struct { + TaskInfoBase + ObjectMove cdssdk.ObjectMove `json:"objectMove"` +} + +type StorageMoveObjectStatus struct { + TaskStatusBase + Error string `json:"error"` +} + +func NewStorageMoveObject(objectMove cdssdk.ObjectMove) *StorageMoveObject { + return &StorageMoveObject{ + ObjectMove: objectMove, + } +} + +func NewStorageMoveObjectStatus(err string) *StorageMoveObjectStatus { + return &StorageMoveObjectStatus{ + Error: err, + } +} + +func init() { + Register[*StorageMoveObject, *StorageMoveObjectStatus]() +} diff --git a/common/pkgs/mq/executor/task/task.go b/common/pkgs/mq/executor/task/task.go index db2e1b6..81a2ffe 100644 --- a/common/pkgs/mq/executor/task/task.go +++ b/common/pkgs/mq/executor/task/task.go @@ -38,3 +38,8 @@ func Register[TTaskInfo TaskInfo, TTaskStatus TaskStatus]() any { return nil } + +type TaskOperateInfo struct { + TaskID string + Command string +} diff --git a/common/pkgs/mq/manager/executor.go b/common/pkgs/mq/manager/executor.go index 39f7d7b..a0ab13f 100644 --- a/common/pkgs/mq/manager/executor.go +++ b/common/pkgs/mq/manager/executor.go @@ -1,47 +1,50 @@ package manager import ( - "gitlink.org.cn/cloudream/common/pkgs/mq" schmod "gitlink.org.cn/cloudream/scheduler/common/models" exectsk "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq/executor/task" ) type ExecutorService interface { - ReportExecutorTaskStatus(msg *ReportExecutorTaskStatus) (*ReportExecutorTaskStatusResp, *mq.CodeMessage) + //ReportExecutorTaskStatus(msg *ReportExecutorTaskStatus) (*ReportExecutorTaskStatusResp, *mq.CodeMessage) } // 接收executor上报的存活状态及任务执行情况 -var _ = Register(Service.ReportExecutorTaskStatus) +//var _ = Register(Service.ReportExecutorTaskStatus) -type ReportExecutorTaskStatus struct { - mq.MessageBodyBase - ExecutorID schmod.ExecutorID `json:"executorID"` - TaskStatus []ExecutorTaskStatus `json:"taskStatus"` -} - -type ReportExecutorTaskStatusResp struct { - mq.MessageBodyBase -} +// type ReportExecutorTaskStatus struct { +// mq.MessageBodyBase +// ExecutorID schmod.ExecutorID `json:"executorID"` +// TaskStatus []ExecutorTaskStatus `json:"taskStatus"` +// } +// +// type ReportExecutorTaskStatusResp struct { +// mq.MessageBodyBase +// } type ExecutorTaskStatus struct { - TaskID string - Status exectsk.TaskStatus + ExecutorID schmod.ExecutorID `json:"executorID"` + TaskID string `json:"taskID"` + Status exectsk.TaskStatus `json:"status"` } -func NewReportExecutorTaskStatus(executorID schmod.ExecutorID, taskStatus []ExecutorTaskStatus) *ReportExecutorTaskStatus { - return &ReportExecutorTaskStatus{ - ExecutorID: executorID, - TaskStatus: taskStatus, - } -} -func NewReportExecutorTaskStatusResp() *ReportExecutorTaskStatusResp { - return &ReportExecutorTaskStatusResp{} -} -func NewExecutorTaskStatus(taskID string, status exectsk.TaskStatus) ExecutorTaskStatus { +// func NewReportExecutorTaskStatus(executorID schmod.ExecutorID, taskStatus []ExecutorTaskStatus) *ReportExecutorTaskStatus { +// return &ReportExecutorTaskStatus{ +// ExecutorID: executorID, +// TaskStatus: taskStatus, +// } +// } +// +// func NewReportExecutorTaskStatusResp() *ReportExecutorTaskStatusResp { +// return &ReportExecutorTaskStatusResp{} +// } +func NewExecutorTaskStatus(executorID schmod.ExecutorID, taskID string, status exectsk.TaskStatus) ExecutorTaskStatus { return ExecutorTaskStatus{ - TaskID: taskID, - Status: status, + ExecutorID: executorID, + TaskID: taskID, + Status: status, } } -func (c *Client) ReportExecutorTaskStatus(msg *ReportExecutorTaskStatus, opts ...mq.RequestOption) (*ReportExecutorTaskStatusResp, error) { - return mq.Request(Service.ReportExecutorTaskStatus, c.roundTripper, msg, opts...) -} + +//func (c *Client) ReportExecutorTaskStatus(msg *ReportExecutorTaskStatus, opts ...mq.RequestOption) (*ReportExecutorTaskStatusResp, error) { +// return mq.Request(Service.ReportExecutorTaskStatus, c.roundTripper, msg, opts...) +//} diff --git a/executor/internal/globals/globals.go b/executor/internal/globals/globals.go index b3fecff..0c47d9d 100644 --- a/executor/internal/globals/globals.go +++ b/executor/internal/globals/globals.go @@ -2,7 +2,6 @@ package globals import ( schmod "gitlink.org.cn/cloudream/scheduler/common/models" - "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq/manager" ) var ExecutorID schmod.ExecutorID @@ -12,5 +11,9 @@ func Init(id schmod.ExecutorID) { ExecutorID = id } -// 全局变量定义 -var EventChannel = make(chan manager.ReportExecutorTaskStatus) +const ( + UPDATE = "update" + STOP = "stop" + RESTART = "restart" + DESTROY = "destroy" +) diff --git a/executor/internal/http/server.go b/executor/internal/http/server.go index 4f8b4c9..d52e0eb 100644 --- a/executor/internal/http/server.go +++ b/executor/internal/http/server.go @@ -41,5 +41,6 @@ func (s *Server) Serve() error { func (s *Server) initRouters() { s.engine.POST("/submitTask", s.TaskSvc().SubmitTask) + s.engine.POST("/operateTask", s.TaskSvc().OperateTask) s.engine.GET("/getReportInfo", s.TaskSvc().GetReportInfo) } diff --git a/executor/internal/http/task.go b/executor/internal/http/task.go index b256123..fe0e93c 100644 --- a/executor/internal/http/task.go +++ b/executor/internal/http/task.go @@ -1,13 +1,13 @@ package http import ( - "encoding/json" "github.com/gin-gonic/gin" "gitlink.org.cn/cloudream/common/consts/errorcode" "gitlink.org.cn/cloudream/common/pkgs/logger" "gitlink.org.cn/cloudream/common/utils/reflect2" "gitlink.org.cn/cloudream/common/utils/serder" execmq "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq/executor" + mgrmq "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq/manager" myglbs "gitlink.org.cn/cloudream/scheduler/executor/internal/globals" "io" "net/http" @@ -31,8 +31,6 @@ func (s *TaskService) SubmitTask(ctx *gin.Context) { return } - println(string(bodyData)) - req, err := serder.JSONToObjectEx[execmq.StartTask](bodyData) if err != nil { log.Warnf("parsing request body: %s", err.Error()) @@ -48,7 +46,7 @@ func (s *TaskService) SubmitTask(ctx *gin.Context) { return } - ctx.JSON(http.StatusOK, OK(execmq.NewStartTaskResp(myglbs.ExecutorID, tsk.ID()))) + ctx.JSON(http.StatusOK, OK(execmq.NewStartTaskResp(myglbs.ExecutorID, string(tsk.ID())))) } func (s *TaskService) GetReportInfo(ctx *gin.Context) { @@ -56,16 +54,64 @@ func (s *TaskService) GetReportInfo(ctx *gin.Context) { ctx.Header("Cache-Control", "no-cache") ctx.Header("Connection", "keep-alive") - for report := range myglbs.EventChannel { - data, err := json.Marshal(report) + taskChan := s.svc.TaskManager.GetTaskChan() + defer taskChan.Chan.Close() + + status := mgrmq.ExecutorTaskStatus{ + ExecutorID: myglbs.ExecutorID, + } + bytes, err := serder.ObjectToJSONEx(status) + _, err = ctx.Writer.Write([]byte("data: " + string(bytes) + "\n\n")) + if err != nil { + logger.Errorf("write data: %s", err.Error()) + return + } + ctx.Writer.Flush() + + for { + receive, err := taskChan.Chan.Receive() if err != nil { - return + continue } + data, err := serder.ObjectToJSONEx(receive) + if err != nil { + logger.Errorf("marshal task: %s", err.Error()) + continue + } + + logger.Info("send task status: %s", string(data)) + _, err = ctx.Writer.Write([]byte("data: " + string(data) + "\n\n")) if err != nil { - return + logger.Errorf("write data: %s", err.Error()) + continue } ctx.Writer.Flush() // 确保数据立即发送到客户端 - println("report: " + string(data)) } } + +func (s *TaskService) OperateTask(ctx *gin.Context) { + log := logger.WithField("HTTP", "TaskOperate") + + bodyData, err := io.ReadAll(ctx.Request.Body) + if err != nil { + log.Warnf("reading request body: %s", err.Error()) + ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "read request body failed")) + return + } + + req, err := serder.JSONToObjectEx[execmq.TaskOperateInfo](bodyData) + if err != nil { + log.Warnf("parsing request body: %s", err.Error()) + ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "parse request body failed")) + return + } + task, ok := s.svc.TaskManager.Tasks[req.TaskID] + if !ok { + ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "task not found")) + return + } + + task.SendTaskOperate(req) + ctx.JSON(http.StatusOK, OK(execmq.NewTaskOperateResp(nil))) +} diff --git a/executor/internal/manager/manager.go b/executor/internal/manager/manager.go new file mode 100644 index 0000000..bf66cfe --- /dev/null +++ b/executor/internal/manager/manager.go @@ -0,0 +1,74 @@ +package manager + +import ( + "fmt" + "gitlink.org.cn/cloudream/common/pkgs/logger" + "gitlink.org.cn/cloudream/common/utils/reflect2" + exectsk "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq/executor/task" + "gitlink.org.cn/cloudream/scheduler/executor/internal/task" + "sync" +) + +type Manager struct { + statusChans []*task.TaskChan[any] + Tasks map[string]*task.Task + ctx task.TaskContext + lock sync.Mutex +} + +func NewManager() Manager { + return Manager{ + statusChans: make([]*task.TaskChan[any], 0), + Tasks: make(map[string]*task.Task), + ctx: task.TaskContext{}, + } +} + +func (m *Manager) GetTaskChan() *task.TaskChan[any] { + + // 创建 TaskChan[any] 实例,并赋值 UnboundChannel[any] + taskChan := task.NewTaskChan[any]() + + m.statusChans = append(m.statusChans, taskChan) + + return taskChan +} + +func (m *Manager) sendTaskChan(tskChan task.TaskChan[any]) { + for { + receive, err := tskChan.Chan.Receive() + if err != nil { + logger.Error(err.Error()) + continue + } + + for i := 0; i < len(m.statusChans); i++ { + err := m.statusChans[i].Chan.Send(receive) + if err != nil { + logger.Error(err.Error()) + continue + } + } + } +} + +func (m *Manager) StartByInfo(taskID string, info exectsk.TaskInfo) (*task.Task, error) { + m.lock.Lock() + defer m.lock.Unlock() + + infoType := reflect2.TypeOfValue(info) + ctor, ok := task.TaskFromInfoCtors[infoType] + if !ok { + return nil, fmt.Errorf("unknow info type") + } + + newTask := task.NewTask(taskID) + m.Tasks[taskID] = newTask + + go ctor(info).Execute(newTask, m.ctx) + + // 将task的状态发送到所有channel + go m.sendTaskChan(newTask.TaskStatusChan) + + return newTask, nil +} diff --git a/executor/internal/reporter/reporter.go b/executor/internal/reporter/reporter.go index 8f75dea..ce7b9ce 100644 --- a/executor/internal/reporter/reporter.go +++ b/executor/internal/reporter/reporter.go @@ -1,13 +1,11 @@ package reporter import ( - "gitlink.org.cn/cloudream/scheduler/executor/internal/globals" "sync" "time" schmod "gitlink.org.cn/cloudream/scheduler/common/models" exectsk "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq/executor/task" - mgrmq "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq/manager" ) type Reporter struct { @@ -58,17 +56,17 @@ func (r *Reporter) Serve() error { ticker.Reset(r.reportInterval) } - r.taskStatusLock.Lock() - var taskStatus []mgrmq.ExecutorTaskStatus - for taskID, status := range r.taskStatus { - taskStatus = append(taskStatus, mgrmq.NewExecutorTaskStatus(taskID, status)) - } - r.taskStatus = make(map[string]exectsk.TaskStatus) - r.taskStatusLock.Unlock() + //r.taskStatusLock.Lock() + //var taskStatus []mgrmq.ExecutorTaskStatus + //for taskID, status := range r.taskStatus { + // taskStatus = append(taskStatus, mgrmq.NewExecutorTaskStatus(taskID, status)) + //} + //r.taskStatus = make(map[string]exectsk.TaskStatus) + //r.taskStatusLock.Unlock() - status := mgrmq.NewReportExecutorTaskStatus(r.executorID, taskStatus) - // 将数据发送到管道中 - globals.EventChannel <- *status + //status := mgrmq.NewReportExecutorTaskStatus(r.executorID, taskStatus) + //// 将数据发送到管道中 + //globals.EventChannel <- *status //_, err := magCli.ReportExecutorTaskStatus(mgrmq.NewReportExecutorTaskStatus(r.executorID, taskStatus)) diff --git a/executor/internal/services/service.go b/executor/internal/services/service.go index 7076d74..05f13f2 100644 --- a/executor/internal/services/service.go +++ b/executor/internal/services/service.go @@ -1,15 +1,15 @@ package services import ( - "gitlink.org.cn/cloudream/scheduler/executor/internal/task" + "gitlink.org.cn/cloudream/scheduler/executor/internal/manager" ) type Service struct { - taskManager *task.Manager + TaskManager *manager.Manager } -func NewService(tskmgr *task.Manager) *Service { +func NewService(tskmgr *manager.Manager) *Service { return &Service{ - taskManager: tskmgr, + TaskManager: tskmgr, } } diff --git a/executor/internal/services/task.go b/executor/internal/services/task.go index e4696dd..2391a00 100644 --- a/executor/internal/services/task.go +++ b/executor/internal/services/task.go @@ -1,7 +1,6 @@ package services import ( - "gitlink.org.cn/cloudream/common/consts/errorcode" "gitlink.org.cn/cloudream/common/pkgs/logger" "gitlink.org.cn/cloudream/common/pkgs/mq" "gitlink.org.cn/cloudream/common/utils/reflect2" @@ -11,18 +10,18 @@ import ( ) func (svc *Service) StartTask(msg *execmq.StartTask) (*execmq.StartTaskResp, *mq.CodeMessage) { - tsk, err := svc.taskManager.StartByInfo(msg.Info) - if err != nil { - logger.WithField("Info", reflect2.TypeOfValue(msg.Info).Name()). - Warnf("starting task by info: %s", err.Error()) - return nil, mq.Failed(errorcode.OperationFailed, "start task by info failed") - } + //tsk, err := svc.TaskManager.StartByInfo(msg.Info) + //if err != nil { + // logger.WithField("Info", reflect2.TypeOfValue(msg.Info).Name()). + // Warnf("starting task by info: %s", err.Error()) + // return nil, mq.Failed(errorcode.OperationFailed, "start task by info failed") + //} - return mq.ReplyOK(execmq.NewStartTaskResp(myglbs.ExecutorID, tsk.ID())) + return mq.ReplyOK(execmq.NewStartTaskResp(myglbs.ExecutorID, "")) } func (svc *Service) SubmitTask(msg *execmq.StartTask) (*task.Task, error) { - tsk, err := svc.taskManager.StartByInfo(msg.Info) + tsk, err := svc.TaskManager.StartByInfo(msg.TaskID, msg.Info) if err != nil { logger.WithField("Info", reflect2.TypeOfValue(msg.Info).Name()). Warnf("starting task by info: %s", err.Error()) diff --git a/executor/internal/task/cache_move_package.go b/executor/internal/task/cache_move_package.go index b7d4c70..78b1953 100644 --- a/executor/internal/task/cache_move_package.go +++ b/executor/internal/task/cache_move_package.go @@ -2,10 +2,7 @@ package task import ( "fmt" - "time" - "gitlink.org.cn/cloudream/common/pkgs/logger" - "gitlink.org.cn/cloudream/common/pkgs/task" cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" schglb "gitlink.org.cn/cloudream/scheduler/common/globals" exectsk "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq/executor/task" @@ -21,22 +18,17 @@ func NewCacheMovePackage(info *exectsk.CacheMovePackage) *CacheMovePackage { } } -func (t *CacheMovePackage) Execute(task *task.Task[TaskContext], ctx TaskContext, complete CompleteFn) { +func (t *CacheMovePackage) Execute(task *Task, ctx TaskContext) { log := logger.WithType[CacheMovePackage]("Task") log.Debugf("begin with %v", logger.FormatStruct(t.CacheMovePackage)) defer log.Debugf("end") err := t.do(ctx) if err != nil { - ctx.reporter.Report(task.ID(), exectsk.NewCacheMovePackageStatus(err.Error())) + task.SendStatus(exectsk.NewCacheMovePackageStatus(err.Error())) } else { - ctx.reporter.Report(task.ID(), exectsk.NewCacheMovePackageStatus("")) + task.SendStatus(exectsk.NewCacheMovePackageStatus("")) } - ctx.reporter.ReportNow() - - complete(err, CompleteOption{ - RemovingDelay: time.Minute, - }) } func (t *CacheMovePackage) do(ctx TaskContext) error { diff --git a/executor/internal/task/create_ecs/alicloud.go b/executor/internal/task/create_ecs/alicloud.go index cc6bac9..246f833 100644 --- a/executor/internal/task/create_ecs/alicloud.go +++ b/executor/internal/task/create_ecs/alicloud.go @@ -9,7 +9,6 @@ import ( util "github.com/alibabacloud-go/tea-utils/v2/service" "github.com/alibabacloud-go/tea/tea" log "gitlink.org.cn/cloudream/common/pkgs/logger" - schglb "gitlink.org.cn/cloudream/scheduler/common/globals" "time" ) @@ -49,10 +48,9 @@ func AliConfig(configMap map[string]interface{}) { aliclient, _ = ecs.NewClient(config) } -func (a *AliCloud) CreateServer(commands []string) (string, error) { +// CreateServer 创建实例 +func (a *AliCloud) CreateServer() (string, error) { var instanceID string - var instanceIDArr string - var result string tryErr := func() (_e error) { defer func() { @@ -68,7 +66,7 @@ func (a *AliCloud) CreateServer(commands []string) (string, error) { return _err } instanceID = tea.StringValue(util.ToJSONString(responces.Body.InstanceIdSets.InstanceIdSet[0])) - instanceIDArr = tea.StringValue(util.ToJSONString(responces.Body.InstanceIdSets.InstanceIdSet)) + //instanceIDArr := tea.StringValue(util.ToJSONString(responces.Body.InstanceIdSets.InstanceIdSet)) log.Info(tea.String("--------------------创建实例成功,实例ID:" + tea.StringValue(util.ToJSONString(responces.Body.InstanceIdSets.InstanceIdSet)) + "--------------------")) return nil @@ -86,35 +84,16 @@ func (a *AliCloud) CreateServer(commands []string) (string, error) { return "", tryErr } - println("instance: " + instanceID) - println("instanceArr: " + instanceIDArr) // 获取实例IP - ip, _ := getInstanceIP(instanceIDArr, *aliclient.RegionId) - println("ip: " + ip) + //ip, _ := getInstanceIP(instanceIDArr, *aliclient.RegionId) + //println("ip: " + ip) - CDSRcloneID := schglb.CloudreamStorageConfig.URL + "/object/download?userID=1&objectID=" + schglb.CDSRclone.CDSRcloneID - CDSRcloneConfigID := schglb.CloudreamStorageConfig.URL + "/object/download?userID=1&objectID=" + schglb.CDSRclone.CDSRcloneConfigID - println("CDSRcloneID: " + CDSRcloneID) - println("CDSRcloneConfigID: " + CDSRcloneConfigID) - - //commands := []string{} - //commandContent := "yum install -y fuse3" - //commands = append(commands, commandContent) - //commandContent = "mkdir -p /opt/rclone/ \n mkdir -p /mnt/cds/" - //commands = append(commands, commandContent) - //commandContent = "cd /opt/rclone \n python3 -c 'import requests;response=requests.get(\"" + CDSRcloneID + "\",stream=True);response.raise_for_status();boundary=response.headers.get(\"Content-Type\").split(\"boundary=\")[-1].encode();content=response.content;body=[part.split(b\"\\r\\n\\r\\n\",1)[1].rsplit(b\"\\r\\n--\",1)[0] for part in content.split(b\"--\"+boundary+b\"\\r\\n\") if b\"filename=\" in part][0];open(\"rclone\",\"wb\").write(body);print(\"success\")'\n" - //println(commandContent) - //commands = append(commands, commandContent) - //commandContent = "cd /opt/rclone \n python3 -c 'import requests;response=requests.get(\"" + CDSRcloneConfigID + "\",stream=True);response.raise_for_status();boundary=response.headers.get(\"Content-Type\").split(\"boundary=\")[-1].encode();content=response.content;body=[part.split(b\"\\r\\n\\r\\n\",1)[1].rsplit(b\"\\r\\n--\",1)[0] for part in content.split(b\"--\"+boundary+b\"\\r\\n\") if b\"filename=\" in part][0];open(\"rclone.conf\",\"wb\").write(body);print(\"success\")'\n" - //println(commandContent) - //commands = append(commands, commandContent) - //commandContent = "cd /opt/rclone \n chmod +x rclone" - //commands = append(commands, commandContent) - //commandContent = "cd /opt/rclone \n nohup ./rclone mount cds: /mnt/cds --vfs-cache-mode full --vfs-read-wait 0 --vfs-read-chunk-size 128M --cache-db-purge -vv > rclone.log 2>&1 &" - //commands = append(commands, commandContent) - //commandContent = "cd /mnt/cds/bkt1/tiny_model/ \n sh execute.sh" - //commands = append(commands, commandContent) + return instanceID, nil +} +// RunCommand 执行指令 +func (a *AliCloud) RunCommand(commands []string, instanceID string) (string, error) { + var result string for i := 0; i < len(commands); i++ { log.Info("start execute command") commandId, err := runShellCommand(commands[i], instanceID, *aliclient.RegionId) @@ -123,16 +102,27 @@ func (a *AliCloud) CreateServer(commands []string) (string, error) { } // 判断是否执行成功 log.Info("describe result") - _, _, err = describeInvocationResults(aliclient, instanceID, commandId, tea.String("utf-8"), 500) + _, result, err = describeInvocationResults(aliclient, instanceID, commandId, tea.String("utf-8"), 500) if err != nil { log.Error("describeInvocationResults: " + err.Error()) return "", err } } - return result, nil } +// DestroyServer 强制销毁实例 +func (a *AliCloud) DestroyServer(instanceID string) (string, error) { + result, err := aliclient.DeleteInstance(&ecs.DeleteInstanceRequest{ + InstanceId: &instanceID, + Force: tea.Bool(true), + }) + if err != nil { + return "", err + } + return tea.StringValue(result.Body.RequestId), nil +} + func runShellCommand(commandContent string, instanceID string, regionId string) (*string, error) { // 从CDS下载文件 commandRequest := ecs.RunCommandRequest{ diff --git a/executor/internal/task/create_ecs/factory.go b/executor/internal/task/create_ecs/factory.go index 91baad3..7445817 100644 --- a/executor/internal/task/create_ecs/factory.go +++ b/executor/internal/task/create_ecs/factory.go @@ -2,14 +2,11 @@ package create_ecs // CloudProvider 是一个接口,定义了创建服务器的方法 type CloudProvider interface { - CreateServer(commands []string) (string, error) + CreateServer() (string, error) + RunCommand(commands []string, instanceID string) (string, error) + DestroyServer(instanceID string) (string, error) } -// CloudFactory 是工厂接口 -// 工厂模式中使用 CreateProvider 的设计原则是: -// 单一职责:Factory 只负责创建 CloudProvider 实例,CloudProvider 负责实际的服务器创建任务。 -// 开闭原则:Factory 可以扩展以支持新的 CloudProvider 实现,而无需修改现有代码。 -// 依赖倒置原则:客户端代码依赖于 CloudProvider 接口而不是具体实现,从而减少了耦合。 type CloudFactory interface { CreateProvider() CloudProvider } diff --git a/executor/internal/task/create_ecs/huaweicloud.go b/executor/internal/task/create_ecs/huaweicloud.go index 735b8d9..7e1bbbf 100644 --- a/executor/internal/task/create_ecs/huaweicloud.go +++ b/executor/internal/task/create_ecs/huaweicloud.go @@ -12,7 +12,7 @@ import ( // HuaweiCloud实现了CloudProvider接口 type HuaweiCloud struct{} -var req model.PostPaidServer +var serverbody model.PrePaidServer var hwConfigMap map[string]interface{} var hwclient ecs.EcsClient @@ -24,7 +24,7 @@ func HWCloudConfig(configMap map[string]interface{}) { return } - err = json.Unmarshal(jsonData, &req) + err = json.Unmarshal(jsonData, &serverbody) if err != nil { return } @@ -45,13 +45,13 @@ func HWCloudConfig(configMap map[string]interface{}) { } -func (a *HuaweiCloud) CreateServer(commands []string) (string, error) { +func (a *HuaweiCloud) CreateServer() (string, error) { - request := &model.CreatePostPaidServersRequest{} - request.Body = &model.CreatePostPaidServersRequestBody{ - Server: &req, + request := &model.CreateServersRequest{} + request.Body = &model.CreateServersRequestBody{ + Server: &serverbody, } - response, err := hwclient.CreatePostPaidServers(request) + response, err := hwclient.CreateServers(request) if err == nil { fmt.Printf("%+v\n", response) } else { @@ -60,3 +60,26 @@ func (a *HuaweiCloud) CreateServer(commands []string) (string, error) { //ids := response.ServerIds return "", nil } + +func (a *HuaweiCloud) RunCommand(commands []string, instanceID string) (string, error) { + //TODO implement me + panic("implement me") +} + +func (a *HuaweiCloud) DestroyServer(instanceID string) (string, error) { + request := &model.DeleteServersRequest{} + var listServersbody = []model.ServerId{ + { + Id: instanceID, + }, + } + request.Body = &model.DeleteServersRequestBody{ + Servers: listServersbody, + } + response, err := hwclient.DeleteServers(request) + if err != nil { + return "", err + } + + return response.String(), nil +} diff --git a/executor/internal/task/pcm_schedule_task.go b/executor/internal/task/pcm_schedule_task.go index b11f0e6..d41e25c 100644 --- a/executor/internal/task/pcm_schedule_task.go +++ b/executor/internal/task/pcm_schedule_task.go @@ -2,12 +2,8 @@ package task import ( "fmt" - "time" - - pcmsdk "gitlink.org.cn/cloudream/common/sdks/pcm" - "gitlink.org.cn/cloudream/common/pkgs/logger" - "gitlink.org.cn/cloudream/common/pkgs/task" + pcmsdk "gitlink.org.cn/cloudream/common/sdks/pcm" schglb "gitlink.org.cn/cloudream/scheduler/common/globals" exectsk "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq/executor/task" ) @@ -22,24 +18,22 @@ func NewPCMSubmitTask(info *exectsk.SubmitTask) *PCMSubmitTask { } } -func (t *PCMSubmitTask) Execute(task *task.Task[TaskContext], ctx TaskContext, complete CompleteFn) { +func (t *PCMSubmitTask) Execute(task *Task, ctx TaskContext) { log := logger.WithType[PCMSubmitTask]("Task") log.Debugf("begin with %v", logger.FormatStruct(t.SubmitTask)) defer log.Debugf("end") - err := t.do(task.ID(), ctx) + //err := t.do(task, ctx) + err := error(nil) if err != nil { //TODO 若任务失败,上报的状态failed字段根据情况修改 - ctx.reporter.Report(task.ID(), exectsk.NewSubmitTaskStatus("failed", err.Error())) + task.SendStatus(exectsk.NewSubmitTaskStatus("failed", err.Error())) + } else { + task.SendStatus(exectsk.NewSubmitTaskStatus("succeeded", "")) } - ctx.reporter.ReportNow() - - complete(err, CompleteOption{ - RemovingDelay: time.Minute, - }) } -func (t *PCMSubmitTask) do(taskID string, ctx TaskContext) error { +func (t *PCMSubmitTask) do(task *Task, ctx TaskContext) error { log := logger.WithType[PCMSubmitTask]("Task") pcmCli, err := schglb.PCMPool.Acquire() @@ -79,7 +73,7 @@ func (t *PCMSubmitTask) do(taskID string, ctx TaskContext) error { } if tsResp.TaskStatus != prevStatus { - ctx.reporter.Report(taskID, exectsk.NewSubmitTaskStatus(tsResp.TaskStatus, "")) + task.SendStatus(exectsk.NewSubmitTaskStatus(tsResp.TaskStatus, "")) } prevStatus = tsResp.TaskStatus diff --git a/executor/internal/task/pcm_upload_img.go b/executor/internal/task/pcm_upload_img.go index 6a17506..7991465 100644 --- a/executor/internal/task/pcm_upload_img.go +++ b/executor/internal/task/pcm_upload_img.go @@ -2,10 +2,7 @@ package task import ( "fmt" - "time" - "gitlink.org.cn/cloudream/common/pkgs/logger" - "gitlink.org.cn/cloudream/common/pkgs/task" pcmsdk "gitlink.org.cn/cloudream/common/sdks/pcm" schglb "gitlink.org.cn/cloudream/scheduler/common/globals" exectsk "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq/executor/task" @@ -21,24 +18,19 @@ func NewPCMUploadImage(info *exectsk.UploadImage) *PCMUploadImage { } } -func (t *PCMUploadImage) Execute(task *task.Task[TaskContext], ctx TaskContext, complete CompleteFn) { +func (t *PCMUploadImage) Execute(task *Task, ctx TaskContext) { log := logger.WithType[PCMUploadImage]("Task") log.Debugf("begin") defer log.Debugf("end") - err := t.do(task.ID(), ctx) + err := t.do(task, ctx) if err != nil { //TODO 若任务失败,上报的状态failed字段根据情况修改 - ctx.reporter.Report(task.ID(), exectsk.NewUploadImageStatus("failed", err.Error(), pcmsdk.ImageID(""), "")) + task.SendStatus(exectsk.NewUploadImageStatus("failed", err.Error(), pcmsdk.ImageID(""), "")) } - ctx.reporter.ReportNow() - - complete(err, CompleteOption{ - RemovingDelay: time.Minute, - }) } -func (t *PCMUploadImage) do(taskID string, ctx TaskContext) error { +func (t *PCMUploadImage) do(task *Task, ctx TaskContext) error { pcmCli, err := schglb.PCMPool.Acquire() if err != nil { return fmt.Errorf("new pcm client: %w", err) @@ -53,7 +45,7 @@ func (t *PCMUploadImage) do(taskID string, ctx TaskContext) error { return err } - ctx.reporter.Report(taskID, exectsk.NewUploadImageStatus(resp.Result, "", resp.ImageID, resp.Name)) + task.SendStatus(exectsk.NewUploadImageStatus(resp.Result, "", resp.ImageID, resp.Name)) return nil } diff --git a/executor/internal/task/scheduler_create_ecs.go b/executor/internal/task/scheduler_create_ecs.go index ae4d3d6..5ad6090 100644 --- a/executor/internal/task/scheduler_create_ecs.go +++ b/executor/internal/task/scheduler_create_ecs.go @@ -3,11 +3,12 @@ package task import ( "fmt" "gitlink.org.cn/cloudream/common/pkgs/logger" - "gitlink.org.cn/cloudream/common/pkgs/task" cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" schglb "gitlink.org.cn/cloudream/scheduler/common/globals" + "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq/executor" exectsk "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq/executor/task" "gitlink.org.cn/cloudream/scheduler/executor/internal/config" + "gitlink.org.cn/cloudream/scheduler/executor/internal/globals" "gitlink.org.cn/cloudream/scheduler/executor/internal/task/create_ecs" ) @@ -21,22 +22,21 @@ func NewScheduleCreateECS(info *exectsk.ScheduleCreateECS) *ScheduleCreateECS { } } -func (t *ScheduleCreateECS) Execute(task *task.Task[TaskContext], ctx TaskContext, complete CompleteFn) { +func (t *ScheduleCreateECS) Execute(task *Task, ctx TaskContext) { log := logger.WithType[ScheduleCreateECS]("Task") log.Debugf("begin") defer log.Debugf("end") - err := t.do(task.ID(), ctx) + err := t.do(task, ctx) if err != nil { log.Error(err) return } - ctx.reporter.ReportNow() log.Info("ScheduleCreateECS...") } -func (t *ScheduleCreateECS) do(taskID string, ctx TaskContext) error { +func (t *ScheduleCreateECS) do(task *Task, ctx TaskContext) error { stgCli, err := schglb.CloudreamStoragePool.Acquire() if err != nil { return fmt.Errorf("new cloudream storage client: %w", err) @@ -52,51 +52,84 @@ func (t *ScheduleCreateECS) do(taskID string, ctx TaskContext) error { } println(resp.Name) - //factory := create_ecs.GetFactory(config.CloudName) - //provider := factory.CreateProvider() - //address, err := provider.CreateServer(resp.Name) - //if err != nil { - // ctx.reporter.Report(taskID, exectsk.NewScheduleCreateECSStatus("", err.Error())) - // return err - //} - // - //ctx.reporter.Report(taskID, exectsk.NewScheduleCreateECSStatus("http://"+address+":5001", "")) CDSRcloneID := schglb.CloudreamStorageConfig.URL + "/object/download?userID=1&objectID=" + schglb.CDSRclone.CDSRcloneID CDSRcloneConfigID := schglb.CloudreamStorageConfig.URL + "/object/download?userID=1&objectID=" + schglb.CDSRclone.CDSRcloneConfigID println("CDSRcloneID: " + CDSRcloneID) println("CDSRcloneConfigID: " + CDSRcloneConfigID) - commands := []string{} + var commands []string commandContent := "yum install -y fuse3" commands = append(commands, commandContent) commandContent = "mkdir -p /opt/rclone/ \n mkdir -p /mnt/cds/" commands = append(commands, commandContent) commandContent = "cd /opt/rclone \n python3 -c 'import requests;response=requests.get(\"" + CDSRcloneID + "\",stream=True);response.raise_for_status();boundary=response.headers.get(\"Content-Type\").split(\"boundary=\")[-1].encode();content=response.content;body=[part.split(b\"\\r\\n\\r\\n\",1)[1].rsplit(b\"\\r\\n--\",1)[0] for part in content.split(b\"--\"+boundary+b\"\\r\\n\") if b\"filename=\" in part][0];open(\"rclone\",\"wb\").write(body);print(\"success\")'\n" - println(commandContent) commands = append(commands, commandContent) commandContent = "cd /opt/rclone \n python3 -c 'import requests;response=requests.get(\"" + CDSRcloneConfigID + "\",stream=True);response.raise_for_status();boundary=response.headers.get(\"Content-Type\").split(\"boundary=\")[-1].encode();content=response.content;body=[part.split(b\"\\r\\n\\r\\n\",1)[1].rsplit(b\"\\r\\n--\",1)[0] for part in content.split(b\"--\"+boundary+b\"\\r\\n\") if b\"filename=\" in part][0];open(\"rclone.conf\",\"wb\").write(body);print(\"success\")'\n" - println(commandContent) commands = append(commands, commandContent) commandContent = "cd /opt/rclone \n chmod +x rclone" commands = append(commands, commandContent) commandContent = "cd /opt/rclone \n nohup ./rclone mount cds: /mnt/cds --vfs-cache-mode full --vfs-read-wait 0 --vfs-read-chunk-size 128M --cache-db-purge -vv > rclone.log 2>&1 &" commands = append(commands, commandContent) - //commandContent = "cd /mnt/cds/bkt1/tiny_model/ \n pip config set global.index-url https://pypi.tuna.tsinghua.edu.cn/simple \n python3 -m pip install --upgrade pip setuptools \n python3 -m pip install transformers --ignore-installed pyyaml \n python3 -m pip install -r requirement.txt \n sh start.sh" commandContent = "cd /mnt/cds/bkt1/tiny_model/ \n sh execute.sh" commands = append(commands, commandContent) + // 创建云主机 factory := create_ecs.GetFactory(config.CloudName) provider := factory.CreateProvider() - address, err := provider.CreateServer(commands) + + instanceID, err := provider.CreateServer() if err != nil { - ctx.reporter.Report(taskID, exectsk.NewScheduleCreateECSStatus("", err.Error())) + task.SendStatus(exectsk.NewScheduleCreateECSStatus("", t.ModelID, err.Error())) return err } - ctx.reporter.Report(taskID, exectsk.NewScheduleCreateECSStatus(address, "")) + address, err := provider.RunCommand(commands, instanceID) + if err != nil { + task.SendStatus(exectsk.NewScheduleCreateECSStatus("", t.ModelID, err.Error())) + return err + } - return nil + // 返回执行结果 + task.SendStatus(exectsk.NewScheduleCreateECSStatus(address, t.ModelID, "")) + println("create ECS success, waiting msg...") + + // 监听更新操作 + for { + taskOperate, err := task.taskChan.Chan.Receive() + if err != nil { + task.SendStatus(exectsk.NewScheduleCreateECSStatus("", t.ModelID, err.Error())) + return err + } + + info, ok := taskOperate.(executor.TaskOperateInfo) + if !ok { + task.SendStatus(exectsk.NewScheduleCreateECSStatus("", t.ModelID, "invalid task operate info")) + return fmt.Errorf("invalid task operate info") + } + + switch info.Command { + case globals.RESTART: + var commands []string + commandContent := "yum install -y fuse3" + commands = append(commands, commandContent) + result, err := provider.RunCommand(commands, instanceID) + if err != nil { + task.SendStatus(exectsk.NewScheduleCreateECSStatus("", t.ModelID, err.Error())) + return err + } + task.SendStatus(exectsk.NewScheduleCreateECSStatus(result, t.ModelID, "")) + case globals.STOP: + println("STOP") + case globals.DESTROY: + result, err := provider.DestroyServer(instanceID) + if err != nil { + task.SendStatus(exectsk.NewScheduleCreateECSStatus("", t.ModelID, err.Error())) + return err + } + task.SendStatus(exectsk.NewScheduleCreateECSStatus(result, t.ModelID, "")) + } + } } func init() { diff --git a/executor/internal/task/storage_create_package.go b/executor/internal/task/storage_create_package.go index 9281f0b..e14eae6 100644 --- a/executor/internal/task/storage_create_package.go +++ b/executor/internal/task/storage_create_package.go @@ -2,10 +2,7 @@ package task import ( "fmt" - "time" - "gitlink.org.cn/cloudream/common/pkgs/logger" - "gitlink.org.cn/cloudream/common/pkgs/task" cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" schglb "gitlink.org.cn/cloudream/scheduler/common/globals" exectsk "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq/executor/task" @@ -21,24 +18,19 @@ func NewStorageCreatePackage(info *exectsk.StorageCreatePackage) *StorageCreateP } } -func (t *StorageCreatePackage) Execute(task *task.Task[TaskContext], ctx TaskContext, complete CompleteFn) { +func (t *StorageCreatePackage) Execute(task *Task, ctx TaskContext) { log := logger.WithType[StorageCreatePackage]("Task") log.Debugf("begin") defer log.Debugf("end") - err := t.do(task.ID(), ctx) + err := t.do(task, ctx) if err != nil { //TODO 若任务失败,上报的状态failed字段根据情况修改 - ctx.reporter.Report(task.ID(), exectsk.NewStorageCreatePackageStatus("failed", err.Error(), 0)) + task.SendStatus(exectsk.NewStorageCreatePackageStatus("failed", err.Error(), 0)) } - ctx.reporter.ReportNow() - - complete(err, CompleteOption{ - RemovingDelay: time.Minute, - }) } -func (t *StorageCreatePackage) do(taskID string, ctx TaskContext) error { +func (t *StorageCreatePackage) do(task *Task, ctx TaskContext) error { stgCli, err := schglb.CloudreamStoragePool.Acquire() if err != nil { return fmt.Errorf("new cloudream storage client: %w", err) @@ -57,7 +49,7 @@ func (t *StorageCreatePackage) do(taskID string, ctx TaskContext) error { } // TODO 根据接口result返回情况修改 - ctx.reporter.Report(taskID, exectsk.NewStorageCreatePackageStatus("completed", "", resp.PackageID)) + task.SendStatus(exectsk.NewStorageCreatePackageStatus("completed", "", resp.PackageID)) return nil } diff --git a/executor/internal/task/storage_load_package.go b/executor/internal/task/storage_load_package.go index cf21c59..ff4df89 100644 --- a/executor/internal/task/storage_load_package.go +++ b/executor/internal/task/storage_load_package.go @@ -2,10 +2,7 @@ package task import ( "fmt" - "time" - "gitlink.org.cn/cloudream/common/pkgs/logger" - "gitlink.org.cn/cloudream/common/pkgs/task" cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" schglb "gitlink.org.cn/cloudream/scheduler/common/globals" exectsk "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq/executor/task" @@ -21,7 +18,7 @@ func NewStorageLoadPackage(info *exectsk.StorageLoadPackage) *StorageLoadPackage } } -func (t *StorageLoadPackage) Execute(task *task.Task[TaskContext], ctx TaskContext, complete CompleteFn) { +func (t *StorageLoadPackage) Execute(task *Task, ctx TaskContext) { log := logger.WithType[StorageLoadPackage]("Task") log.Debugf("begin with %v", logger.FormatStruct(t.StorageLoadPackage)) defer log.Debugf("end") @@ -29,15 +26,10 @@ func (t *StorageLoadPackage) Execute(task *task.Task[TaskContext], ctx TaskConte packagePath, err := t.do(ctx) if err != nil { //TODO 若任务失败,上报的状态failed字段根据情况修改 - ctx.reporter.Report(task.ID(), exectsk.NewStorageLoadPackageStatus(err.Error(), "")) + task.SendStatus(exectsk.NewStorageLoadPackageStatus(err.Error(), "")) } else { - ctx.reporter.Report(task.ID(), exectsk.NewStorageLoadPackageStatus("", packagePath)) + task.SendStatus(exectsk.NewStorageLoadPackageStatus("", packagePath)) } - ctx.reporter.ReportNow() - - complete(err, CompleteOption{ - RemovingDelay: time.Minute, - }) } func (t *StorageLoadPackage) do(ctx TaskContext) (string, error) { diff --git a/executor/internal/task/storage_move_object.go b/executor/internal/task/storage_move_object.go new file mode 100644 index 0000000..0feda24 --- /dev/null +++ b/executor/internal/task/storage_move_object.go @@ -0,0 +1,53 @@ +package task + +import ( + "fmt" + "gitlink.org.cn/cloudream/common/pkgs/logger" + schglb "gitlink.org.cn/cloudream/scheduler/common/globals" + exectsk "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq/executor/task" +) + +type StorageMoveObject struct { + *exectsk.StorageMoveObject +} + +func NewStorageMoveObject(info *exectsk.StorageMoveObject) *StorageMoveObject { + return &StorageMoveObject{StorageMoveObject: info} +} + +func (t *StorageMoveObject) Execute(task *Task, ctx TaskContext) { + log := logger.WithType[StorageMoveObject]("Task") + log.Debugf("begin with %v", logger.FormatStruct(t.StorageMoveObject)) + defer log.Debugf("end") + + err := t.do() + if err != nil { + task.SendStatus(exectsk.NewStorageMoveObjectStatus(err.Error())) + } else { + task.SendStatus(exectsk.NewStorageMoveObjectStatus("")) + } +} + +func (t *StorageMoveObject) do() error { + stgCli, err := schglb.CloudreamStoragePool.Acquire() + if err != nil { + return fmt.Errorf("new cloudream storage client: %w", err) + } + defer schglb.CloudreamStoragePool.Release(stgCli) + + move, err := stgCli.Object().Move(t.ObjectMove) + if err != nil { + return fmt.Errorf("move object: %w", err) + } + + // 判断全部object是否都移动成功 + if len(move.Successes) != len(t.ObjectMove.Movings) { + return fmt.Errorf("move object: %d objects failed", len(t.ObjectMove.Movings)-len(move.Successes)) + } + + return nil +} + +func init() { + Register(NewStorageMoveObject) +} diff --git a/executor/internal/task/task.go b/executor/internal/task/task.go index 664cc46..3ca984f 100644 --- a/executor/internal/task/task.go +++ b/executor/internal/task/task.go @@ -1,56 +1,80 @@ package task import ( - "fmt" - "reflect" - - "gitlink.org.cn/cloudream/common/pkgs/task" + "gitlink.org.cn/cloudream/common/pkgs/logger" "gitlink.org.cn/cloudream/common/utils/reflect2" + "gitlink.org.cn/cloudream/common/utils/sync2" + "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq/executor" exectsk "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq/executor/task" - reporter "gitlink.org.cn/cloudream/scheduler/executor/internal/reporter" + mgrmq "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq/manager" + "gitlink.org.cn/cloudream/scheduler/executor/internal/globals" + "reflect" ) -type TaskContext struct { - reporter *reporter.Reporter +type TaskChan[T any] struct { + Chan sync2.UnboundChannel[T] } -// 需要在Task结束后主动调用,completing函数将在Manager加锁期间被调用, -// 因此适合进行执行结果的设置 -type CompleteFn = task.CompleteFn - -type Manager struct { - task.Manager[TaskContext] +func NewTaskChan[T any]() *TaskChan[T] { + return &TaskChan[T]{Chan: *sync2.NewUnboundChannel[T]()} } -type TaskBody = task.TaskBody[TaskContext] +type Task struct { + id string + taskChan TaskChan[any] + TaskStatusChan TaskChan[any] +} -type Task = task.Task[TaskContext] +type TaskContext struct{} -type CompleteOption = task.CompleteOption - -func NewManager(reporter *reporter.Reporter) Manager { - return Manager{ - Manager: task.NewManager(TaskContext{ - reporter: reporter, - }), +func NewTask(id string) *Task { + return &Task{ + taskChan: *NewTaskChan[any](), + TaskStatusChan: *NewTaskChan[any](), + id: id, + //body: body, } } -func (m *Manager) StartByInfo(info exectsk.TaskInfo) (*Task, error) { - infoType := reflect2.TypeOfValue(info) - - ctor, ok := taskFromInfoCtors[infoType] - if !ok { - return nil, fmt.Errorf("unknow info type") - } - - return m.StartNew(ctor(info)), nil +type TaskBody interface { + Execute(task *Task, ctx TaskContext) } -var taskFromInfoCtors map[reflect.Type]func(exectsk.TaskInfo) TaskBody = make(map[reflect.Type]func(exectsk.TaskInfo) task.TaskBody[TaskContext]) +func (c *Task) SendStatus(status exectsk.TaskStatus) { + + taskStatus := mgrmq.NewExecutorTaskStatus(globals.ExecutorID, c.ID(), status) + err := c.TaskStatusChan.Chan.Send(taskStatus) + if err != nil { + logger.Error("send task status error: ", err.Error()) + } +} + +func (c *Task) SendTaskOperate(info executor.TaskOperateInfo) { + + err := c.taskChan.Chan.Send(info) + if err != nil { + logger.Error(err.Error()) + } +} + +func (c *Task) WaitTaskOperate() *any { + + receive, err := c.taskChan.Chan.Receive() + if err != nil { + logger.Error(err.Error()) + return nil + } + return &receive +} + +func (t *Task) ID() string { + return t.id +} + +var TaskFromInfoCtors map[reflect.Type]func(exectsk.TaskInfo) TaskBody = make(map[reflect.Type]func(exectsk.TaskInfo) TaskBody) func Register[TInfo exectsk.TaskInfo, TTaskBody TaskBody](ctor func(info TInfo) TTaskBody) { - taskFromInfoCtors[reflect2.TypeOf[TInfo]()] = func(info exectsk.TaskInfo) TaskBody { + TaskFromInfoCtors[reflect2.TypeOf[TInfo]()] = func(info exectsk.TaskInfo) TaskBody { return ctor(info.(TInfo)) } } diff --git a/executor/main.go b/executor/main.go index d871b52..a54d5fb 100644 --- a/executor/main.go +++ b/executor/main.go @@ -8,11 +8,9 @@ import ( "gitlink.org.cn/cloudream/scheduler/executor/internal/config" myglbs "gitlink.org.cn/cloudream/scheduler/executor/internal/globals" "gitlink.org.cn/cloudream/scheduler/executor/internal/http" - "gitlink.org.cn/cloudream/scheduler/executor/internal/reporter" + "gitlink.org.cn/cloudream/scheduler/executor/internal/manager" "gitlink.org.cn/cloudream/scheduler/executor/internal/services" - "gitlink.org.cn/cloudream/scheduler/executor/internal/task" "os" - "time" ) func main() { @@ -36,9 +34,9 @@ func main() { myglbs.Init(config.Cfg().Application.ExecutorID) schglb.InitRcloneConfig(config.Cfg().Rclone.CDSRcloneID, config.Cfg().Rclone.CDSRcloneConfigID) - rpter := reporter.NewReporter(myglbs.ExecutorID, time.Second*time.Duration(config.Cfg().ReportIntervalSec)) + //rpter := reporter.NewReporter(myglbs.ExecutorID, time.Second*time.Duration(config.Cfg().ReportIntervalSec)) // - taskMgr := task.NewManager(&rpter) + taskMgr := manager.NewManager() // //mqSvr, err := execmq.NewServer(services.NewService(&taskMgr), &config.Cfg().RabbitMQ) //if err != nil { @@ -52,7 +50,7 @@ func main() { // 启动服务 //go serveMQServer(mqSvr) - go serveReporter(&rpter) + //go serveReporter(&rpter) svc := services.NewService(&taskMgr) server, err := http.NewServer(config.Cfg().Application.Address, svc) @@ -82,13 +80,13 @@ func serveMQServer(server *execmq.Server) { logger.Info("mq server stopped") } -func serveReporter(rpt *reporter.Reporter) { - logger.Info("start serving reporter") - - err := rpt.Serve() - if err != nil { - logger.Errorf("rpt stopped with error: %s", err.Error()) - } - - logger.Info("rpt stopped") -} +//func serveReporter(rpt *reporter.Reporter) { +// logger.Info("start serving reporter") +// +// err := rpt.Serve() +// if err != nil { +// logger.Errorf("rpt stopped with error: %s", err.Error()) +// } +// +// logger.Info("rpt stopped") +//} diff --git a/manager/internal/executormgr/executormgr.go b/manager/internal/executormgr/executormgr.go index 74102e7..a1589ea 100644 --- a/manager/internal/executormgr/executormgr.go +++ b/manager/internal/executormgr/executormgr.go @@ -2,16 +2,16 @@ package executormgr import ( "bufio" - "encoding/json" "fmt" + "gitlink.org.cn/cloudream/common/pkgs/async" log "gitlink.org.cn/cloudream/common/pkgs/logger" + "gitlink.org.cn/cloudream/common/utils/serder" + jobTask "gitlink.org.cn/cloudream/scheduler/manager/internal/task" "io" "strings" "sync" "time" - "gitlink.org.cn/cloudream/common/utils/sync2" - schglb "gitlink.org.cn/cloudream/scheduler/common/globals" schmod "gitlink.org.cn/cloudream/scheduler/common/models" exemq "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq/executor" @@ -20,12 +20,11 @@ import ( ) type task struct { - statusChan *sync2.Channel[exetsk.TaskStatus] + statusChan *async.UnboundChannel[mgrmq.ExecutorTaskStatus] } type ExecutorStatus struct { - executorID schmod.ExecutorID - tasks map[string]task // key 为 TaskID - lastReportTime time.Time + executorID schmod.ExecutorID + tasks map[string]task // key 为 TaskID } var ErrWaitReportTimeout = fmt.Errorf("wait report timeout") @@ -57,149 +56,139 @@ func NewManager(reportTimeout time.Duration) (*Manager, error) { }, nil } -func (m *Manager) ReceiveExecutorTaskStatus(url string) { +func (m *Manager) ReceiveExecutorTaskStatus(url string) (*mgrmq.ExecutorTaskStatus, error) { + client, err := ExecutorPool.AcquireByUrl(url) if err != nil { log.Error(err) - return + return &mgrmq.ExecutorTaskStatus{}, err } resp, err := client.GetReportInfo() if err != nil { log.Error(err) - return + return &mgrmq.ExecutorTaskStatus{}, err } - defer resp.Body.Close() reader := bufio.NewReader(resp.Body) - for { - line, err := reader.ReadString('\n') - if err != nil && err != io.EOF { - log.Error("Error reading from response body:", err) - return - } - if line == "" { - continue // Skip empty lines - } - line = strings.TrimPrefix(line, "data: ") - line = strings.TrimSpace(line) - if len(line) > 0 { - var msg mgrmq.ReportExecutorTaskStatus - if err := json.Unmarshal([]byte(line), &msg); err != nil { - fmt.Println("Error unmarshalling JSON:", err) + line, err := reader.ReadString('\n') + if err != nil && err != io.EOF { + log.Error("Error reading from response body:", err) + return &mgrmq.ExecutorTaskStatus{}, err + } + // TODO 第一次获取的值包含执行器所有任务,用于失败重试 + executorInfo := convertLine(line) + // 将第一次的executor放入到池子中 + exec := &ExecutorStatus{ + executorID: executorInfo.ExecutorID, + tasks: make(map[string]task), + } + + m.executors[executorInfo.ExecutorID] = exec + + go func() { + for { + line, err = reader.ReadString('\n') + if err != nil { + if err != io.EOF { + log.Error("Error reading from response body:", err) + } + return + } + + status := convertLine(line) + if status == nil { continue } - log.Info("Received: %s", msg) - m.Report(msg.ExecutorID, msg.TaskStatus) + + m.Report(*status) } - if err == io.EOF { - break - } - } + }() + + return executorInfo, nil } -func (m *Manager) Report(execID schmod.ExecutorID, taskStatus []mgrmq.ExecutorTaskStatus) { +func convertLine(line string) *mgrmq.ExecutorTaskStatus { + if line == "" { + return nil + } + + line = strings.TrimPrefix(line, "data: ") + line = strings.TrimSpace(line) + if len(line) == 0 { + return nil + } + + readResp, err := serder.JSONToObjectEx[mgrmq.ExecutorTaskStatus]([]byte(line)) + if err != nil { + log.Error(err) + return nil + } + + return &readResp +} + +func (m *Manager) Report(status mgrmq.ExecutorTaskStatus) { m.lock.Lock() defer m.lock.Unlock() - exec, ok := m.executors[execID] - if !ok { - exec = &ExecutorStatus{ - executorID: execID, - tasks: make(map[string]task), - } - m.executors[execID] = exec + exec := m.executors[status.ExecutorID] + if exec == nil { + log.Error("Executor not found: ", status.ExecutorID) + return } + // 由于先将task chan放入到池子中再执行的task,所以这里的task必存在 + tsk := exec.tasks[status.TaskID] - exec.lastReportTime = time.Now() + // TODO 考虑主动检测channel是否关闭,然后取消task + if tsk.statusChan.Send(status) != nil { + delete(exec.tasks, status.TaskID) - for _, s := range taskStatus { - tsk, ok := exec.tasks[s.TaskID] - if !ok { - continue - } - - // TODO 考虑主动检测channel是否关闭,然后取消task - if tsk.statusChan.Send(s.Status) != nil { - delete(exec.tasks, s.TaskID) - - if len(exec.tasks) == 0 { - delete(m.executors, execID) - } + if len(exec.tasks) == 0 { + delete(m.executors, exec.executorID) } } } // 启动一个Task -func (m *Manager) StartTask(info exetsk.TaskInfo, ccInfo schmod.ComputingCenter) *sync2.Channel[exetsk.TaskStatus] { +func (m *Manager) StartTask(info exetsk.TaskInfo, ccInfo schmod.ComputingCenter) (*jobTask.JobTask[mgrmq.ExecutorTaskStatus], error) { m.lock.Lock() defer m.lock.Unlock() - ch := sync2.NewChannel[exetsk.TaskStatus]() + newJobTask := jobTask.NewJobTask[mgrmq.ExecutorTaskStatus]() + ch := newJobTask.Chan() client, err := ExecutorPool.AcquireByUrl(ccInfo.ExecutorURL) - //resp, err := m.exeCli.StartTask(exemq.NewStartTask(info)) if err != nil { ch.CloseWithError(fmt.Errorf("start task: %w", err)) - return ch + return newJobTask, err } + executorID := schmod.ExecutorID(ccInfo.ExecutorID) // 检测是否连接过这个Executor,如果第一次连,则发送请求监听上报信息 - _, ok := m.executors[schmod.ExecutorID(ccInfo.ExecutorID)] + _, ok := m.executors[executorID] if !ok { - go m.ReceiveExecutorTaskStatus(ccInfo.ExecutorURL) - } - - resp, err := client.SubmitTask(exemq.NewStartTask(info)) - if err != nil { - ch.CloseWithError(fmt.Errorf("start task: %w", err)) - return ch - } - - exeInfo, ok := m.executors[resp.ExecutorID] - if !ok { - exeInfo = &ExecutorStatus{ - executorID: resp.ExecutorID, - tasks: make(map[string]task), - lastReportTime: time.Now(), + _, err = m.ReceiveExecutorTaskStatus(ccInfo.ExecutorURL) + if err != nil { + ch.CloseWithError(fmt.Errorf("start task: %w", err)) + return newJobTask, err } - m.executors[resp.ExecutorID] = exeInfo } - exeInfo.tasks[resp.TaskID] = task{ + // 上面已经将executor放入到池子中了,这里的executor必存在 + exeInfo := m.executors[executorID] + exeInfo.tasks[newJobTask.ID()] = task{ statusChan: ch, } - return ch -} - -func (m *Manager) Serve() error { - InitExecutorPool() - - ticker := time.NewTicker(time.Second) - defer ticker.Stop() - - for { - select { - case <-ticker.C: - func() { - m.lock.Lock() - defer m.lock.Unlock() - - now := time.Now() - for exeID, exeInfo := range m.executors { - dt := now.Sub(exeInfo.lastReportTime) - - if dt < m.reportTimeout { - continue - } - - for _, tsk := range exeInfo.tasks { - tsk.statusChan.CloseWithError(ErrWaitReportTimeout) - } - - delete(m.executors, exeID) - } - }() - } + _, err = client.SubmitTask(exemq.NewStartTask(newJobTask.ID(), info)) + if err != nil { + ch.CloseWithError(fmt.Errorf("start task: %w", err)) + return newJobTask, err } + + return newJobTask, nil +} + +func (m *Manager) Serve() { + InitExecutorPool() } diff --git a/manager/internal/jobmgr/event/instance_create.go b/manager/internal/jobmgr/event/instance_create.go index c7e05e3..a08b4b8 100644 --- a/manager/internal/jobmgr/event/instance_create.go +++ b/manager/internal/jobmgr/event/instance_create.go @@ -2,27 +2,61 @@ package event import ( "gitlink.org.cn/cloudream/common/pkgs/future" + "gitlink.org.cn/cloudream/common/pkgs/types" schsdk "gitlink.org.cn/cloudream/common/sdks/scheduler" + cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" + "gitlink.org.cn/cloudream/common/utils/serder" ) -type CreateInstanceFuture = *future.SetValueFuture[CreateInstanceResult] +type OperateInstanceFuture = *future.SetValueFuture[OperateInstanceResult] -type InstanceCreate struct { - DataSet schsdk.JobFileInfo - Result CreateInstanceFuture +type InstanceOperate struct { + Info InstanceOperateInfo + Result OperateInstanceFuture } -type CreateInstanceResult struct { +type OperateInstanceResult struct { + OperateResult string + Err error JobID schsdk.JobID FilesUploadScheme schsdk.JobFilesUploadScheme } -func NewInstanceCreate(dataSet schsdk.JobFileInfo, future CreateInstanceFuture) *InstanceCreate { - return &InstanceCreate{ - DataSet: dataSet, - Result: future, +type InstanceOperateInfo interface { + Instance() +} + +type InstanceInfoBase struct{} + +func (i *InstanceInfoBase) Instance() {} + +var InstanceOperateInfoTypeUnion = types.NewTypeUnion[InstanceOperateInfo]( + (*InstanceCreateInfo)(nil), + (*InstanceUpdateInfo)(nil), +) + +var _ = serder.UseTypeUnionInternallyTagged(&InstanceOperateInfoTypeUnion, "type") + +type InstanceCreateInfo struct { + serder.Metadata `union:"Create"` + InstanceInfoBase + DataSet schsdk.JobFileInfo +} + +type InstanceUpdateInfo struct { + serder.Metadata `union:"Update"` + InstanceInfoBase + Type string `json:"type"` + Info schsdk.UpdateMultiInstanceJobInfo `json:"info"` + PackageID cdssdk.PackageID `json:"packageID"` +} + +func NewInstanceOperate(info InstanceOperateInfo, future OperateInstanceFuture) *InstanceOperate { + return &InstanceOperate{ + Info: info, + Result: future, } } -func (s *InstanceCreate) Noop() { +func (s *InstanceOperate) Noop() { } diff --git a/manager/internal/jobmgr/event/update.go b/manager/internal/jobmgr/event/update.go new file mode 100644 index 0000000..aa7f6d2 --- /dev/null +++ b/manager/internal/jobmgr/event/update.go @@ -0,0 +1,23 @@ +package event + +import "gitlink.org.cn/cloudream/common/pkgs/future" + +type JobUpdateFuture = *future.SetValueFuture[UpdateResult] + +type Update struct { + Command string + Result JobUpdateFuture +} + +func (s *Update) Noop() {} + +type UpdateResult struct { + Err error +} + +func NewUpdate(command string, jobUpdateFuture JobUpdateFuture) *Update { + return &Update{ + Command: command, + Result: jobUpdateFuture, + } +} diff --git a/manager/internal/jobmgr/event/utils.go b/manager/internal/jobmgr/event/utils.go index 496ef1f..d032f3a 100644 --- a/manager/internal/jobmgr/event/utils.go +++ b/manager/internal/jobmgr/event/utils.go @@ -2,6 +2,7 @@ package event import ( "context" + "gitlink.org.cn/cloudream/common/pkgs/future" "gitlink.org.cn/cloudream/scheduler/manager/internal/jobmgr" ) @@ -51,3 +52,25 @@ func WaitTypeAnd[T jobmgr.Event](ctx context.Context, set *jobmgr.EventSet, cond // 断言返回的事件为类型T,并返回该事件和操作成功标志。 return ret.(T), ok } + +func BeginWaitType[T jobmgr.Event](set *jobmgr.EventSet) future.Future1[jobmgr.Event] { + // 等待一个满足特定类型和条件的事件。 + return set.BeginWait(func(evt jobmgr.Event) bool { + _, ok := evt.(T) + return ok + }) +} + +func BeginWaitTypeAnd[T jobmgr.Event](set *jobmgr.EventSet, cond func(val T) bool) future.Future1[jobmgr.Event] { + // 等待一个满足特定类型和条件的事件。 + return set.BeginWait(func(evt jobmgr.Event) bool { + // 尝试将事件断言为特定类型T,并检查断言是否成功。 + e, ok := evt.(T) + if !ok { + return false // 如果事件不是期望的类型T,则返回false。 + } + + // 如果事件是类型T且满足给定条件,则返回true。 + return cond(e) + }) +} diff --git a/manager/internal/jobmgr/event_set.go b/manager/internal/jobmgr/event_set.go index 346dcbe..4557aa0 100644 --- a/manager/internal/jobmgr/event_set.go +++ b/manager/internal/jobmgr/event_set.go @@ -3,6 +3,7 @@ package jobmgr import ( "context" "errors" + "gitlink.org.cn/cloudream/common/pkgs/logger" "sync" "gitlink.org.cn/cloudream/common/pkgs/future" @@ -69,10 +70,11 @@ func (s *EventSet) Wait(ctx context.Context, cond EventWaitCondition) (Event, bo future: fut, } s.waiters = append(s.waiters, waiter) + logger.Info("append waiter: %p", &waiter) s.lock.Unlock() - val, err := fut.WaitValue(ctx) + val, err := fut.Wait(ctx) if err != nil { return nil, false @@ -80,3 +82,26 @@ func (s *EventSet) Wait(ctx context.Context, cond EventWaitCondition) (Event, bo return val, true } + +func (s *EventSet) BeginWait(cond EventWaitCondition) future.Future1[Event] { + s.lock.Lock() + + for i, evt := range s.events { + if cond(evt) { + s.events = lo2.RemoveAt(s.events, i) + s.lock.Unlock() + return future.NewReadyValue1(evt) + } + } + + fut := future.NewSetValue[Event]() + waiter := EventWaiter{ + condition: cond, + future: fut, + } + s.waiters = append(s.waiters, waiter) + + s.lock.Unlock() + + return fut +} diff --git a/manager/internal/jobmgr/job/multiInstance_update_job.go b/manager/internal/jobmgr/job/multiInstance_update_job.go new file mode 100644 index 0000000..0e30207 --- /dev/null +++ b/manager/internal/jobmgr/job/multiInstance_update_job.go @@ -0,0 +1,30 @@ +package job + +import ( + schsdk "gitlink.org.cn/cloudream/common/sdks/scheduler" + jobmod "gitlink.org.cn/cloudream/scheduler/common/models/job" +) + +type UpdateMultiInstanceJob struct { + Info schsdk.UpdateMultiInstanceJobInfo + Files jobmod.JobFiles + + //InstanceIDs []schsdk.JobID + //UpdateStrategy string +} + +func NewUpdateMultiInstanceJob(info schsdk.UpdateMultiInstanceJobInfo) *UpdateMultiInstanceJob { + return &UpdateMultiInstanceJob{ + Info: info, + } +} + +func (j *UpdateMultiInstanceJob) GetInfo() schsdk.JobInfo { + return &j.Info +} + +func (j *UpdateMultiInstanceJob) Dump() jobmod.JobBodyDump { + return &jobmod.UpdateMultiInstanceJobDump{ + Files: j.Files, + } +} diff --git a/manager/internal/jobmgr/job/state/adjusting.go b/manager/internal/jobmgr/job/state/adjusting.go index 5360d82..514223c 100644 --- a/manager/internal/jobmgr/job/state/adjusting.go +++ b/manager/internal/jobmgr/job/state/adjusting.go @@ -128,15 +128,14 @@ func (s *Adjusting) doPackageScheduling(ctx context.Context, rtx jobmgr.JobState if scheme.Action == jobmod.ActionMove { logger.Debugf("begin move pacakge %v to %v", file.PackageID, s.targetCCInfo.CDSStorageID) - wt := rtx.Mgr.ExecMgr.StartTask(exectsk.NewCacheMovePackage(1, file.PackageID, s.targetCCInfo.CDSNodeID), s.targetCCInfo) - defer wt.Close() - - status, err := wt.Receive(ctx) + taskStatus, err := rtx.Mgr.ExecMgr.StartTask(exectsk.NewCacheMovePackage(1, file.PackageID, s.targetCCInfo.CDSNodeID), s.targetCCInfo) if err != nil { return fmt.Errorf("moving package: %w", err) } - moveStatus := status.(*exectsk.CacheMovePackageStatus) + fut := taskStatus.Receive() + status := <-fut.Chan() + moveStatus := status.Value.Status.(*exectsk.CacheMovePackageStatus) if moveStatus.Error != "" { return fmt.Errorf("moving package: %s", moveStatus.Error) } @@ -147,15 +146,15 @@ func (s *Adjusting) doPackageScheduling(ctx context.Context, rtx jobmgr.JobState if scheme.Action == jobmod.ActionLoad { logger.Debugf("begin load pacakge %v to %v", file.PackageID, s.targetCCInfo.CDSStorageID) - wt := rtx.Mgr.ExecMgr.StartTask(exectsk.NewStorageLoadPackage(1, file.PackageID, s.targetCCInfo.CDSStorageID), s.targetCCInfo) - defer wt.Close() - - status, err := wt.Receive(ctx) + taskStatus, err := rtx.Mgr.ExecMgr.StartTask(exectsk.NewStorageLoadPackage(1, file.PackageID, s.targetCCInfo.CDSStorageID), s.targetCCInfo) if err != nil { - return fmt.Errorf("loading package: %w", err) + return fmt.Errorf("moving package: %w", err) } - loadStatus := status.(*exectsk.StorageLoadPackageStatus) + fut := taskStatus.Receive() + status := <-fut.Chan() + + loadStatus := status.Value.Status.(*exectsk.StorageLoadPackageStatus) if loadStatus.Error != "" { return fmt.Errorf("loading package: %s", loadStatus.Error) } @@ -175,15 +174,15 @@ func (s *Adjusting) doImageScheduling(ctx context.Context, rtx jobmgr.JobStateRu } // TODO UserID - wt := rtx.Mgr.ExecMgr.StartTask(exectsk.NewCacheMovePackage(1, *file.PackageID, s.targetCCInfo.CDSNodeID), s.targetCCInfo) - defer wt.Close() - - status, err := wt.Receive(ctx) + taskStatus, err := rtx.Mgr.ExecMgr.StartTask(exectsk.NewCacheMovePackage(1, *file.PackageID, s.targetCCInfo.CDSNodeID), s.targetCCInfo) if err != nil { return fmt.Errorf("moving package: %w", err) } - moveStatus := status.(*exectsk.CacheMovePackageStatus) + fut := taskStatus.Receive() + status := <-fut.Chan() + + moveStatus := status.Value.Status.(*exectsk.CacheMovePackageStatus) if moveStatus.Error != "" { return fmt.Errorf("moving package: %s", moveStatus.Error) } @@ -208,15 +207,18 @@ func (s *Adjusting) doImageScheduling(ctx context.Context, rtx jobmgr.JobStateRu return fmt.Errorf("there must be only 1 object in the package which will be imported") } - wt2 := rtx.Mgr.ExecMgr.StartTask(exectsk.NewUploadImage(s.targetCCInfo.PCMParticipantID, cdssdk.MakeIPFSFilePath(pkgObjs.Objects[0].FileHash)), s.targetCCInfo) - defer wt2.Close() + taskStatus2, err := rtx.Mgr.ExecMgr.StartTask(exectsk.NewUploadImage(s.targetCCInfo.PCMParticipantID, cdssdk.MakeIPFSFilePath(pkgObjs.Objects[0].FileHash)), s.targetCCInfo) + if err != nil { + return fmt.Errorf("moving package: %w", err) + } - status2, err := wt2.Receive(ctx) + fut2 := taskStatus2.Receive() + status2 := <-fut2.Chan() if err != nil { return fmt.Errorf("uploading image: %w", err) } - uploadStatus := status2.(*exectsk.UploadImageStatus) + uploadStatus := status2.Value.Status.(*exectsk.UploadImageStatus) if uploadStatus.Error != "" { return fmt.Errorf("uploading image: %s", uploadStatus.Error) } diff --git a/manager/internal/jobmgr/job/state/executing.go b/manager/internal/jobmgr/job/state/executing.go index 0e36a4c..393d20e 100644 --- a/manager/internal/jobmgr/job/state/executing.go +++ b/manager/internal/jobmgr/job/state/executing.go @@ -6,6 +6,11 @@ import ( "github.com/samber/lo" schsdk "gitlink.org.cn/cloudream/common/sdks/scheduler" cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" + schmod "gitlink.org.cn/cloudream/scheduler/common/models" + "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq/executor" + mgrmq "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq/manager" + "gitlink.org.cn/cloudream/scheduler/manager/internal/executormgr" + jobTask "gitlink.org.cn/cloudream/scheduler/manager/internal/task" "path/filepath" "gitlink.org.cn/cloudream/common/pkgs/logger" @@ -73,9 +78,6 @@ func (s *NormalJobExecuting) do(rtx jobmgr.JobStateRunContext, jo *jobmgr.Job) e packageID = runningJob.Files.Dataset.PackageID } - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - pcmImgInfo, err := rtx.Mgr.DB.PCMImage().GetByImageIDAndCCID(rtx.Mgr.DB.SQLCtx(), jobFiles.Image.ImageID, targetCCID) if err != nil { return fmt.Errorf("getting pcm image info: %w", err) @@ -96,37 +98,20 @@ func (s *NormalJobExecuting) do(rtx jobmgr.JobStateRunContext, jo *jobmgr.Job) e // TODO 判断是否是模型推理任务,如果是,则进行扩缩容管理 if modelJobInfo != nil { - //address := nodeExpansion(jobFiles) - // - //node := schsdk.NodeInfo{ - // InstanceID: jo.JobID, - // Address: address, - //} - //jobmgr.SetNodeData(schsdk.JobID(jo.JobSetID), schsdk.ModelID(modelJobInfo.ModelID), node) - // 发送扩容任务 - wt := rtx.Mgr.ExecMgr.StartTask(exetsk.NewScheduleCreateECS( + ecs := exetsk.NewScheduleCreateECS( userID, packageID, - ), ccInfo) + schsdk.ModelID(modelJobInfo.ModelID), + ) + task, err := rtx.Mgr.ExecMgr.StartTask(ecs, ccInfo) - for { - status, err := wt.Receive(ctx) - if err != nil { - return err - } - taskStatus := status.(*exetsk.ScheduleCreateECSStatus) - if taskStatus.Error != "" { - log.Error(taskStatus.Error) - return nil - } - node := schsdk.NodeInfo{ - InstanceID: jo.JobID, - Address: schsdk.Address(taskStatus.Address), - } - jobmgr.SetNodeData(schsdk.JobID(jo.JobSetID), schsdk.ModelID(modelJobInfo.ModelID), node) - log.Infof("node expansion: %v", taskStatus.Address) + if err != nil { + log.Error(err.Error()) + return err } + + return s.listen(rtx, jo, task, ccInfo) } stgCli, err := schglb.CloudreamStoragePool.Acquire() @@ -166,7 +151,7 @@ func (s *NormalJobExecuting) do(rtx jobmgr.JobStateRunContext, jo *jobmgr.Job) e cmd = runtime.Command } - wt := rtx.Mgr.ExecMgr.StartTask(exetsk.NewSubmitTask( + task, err := rtx.Mgr.ExecMgr.StartTask(exetsk.NewSubmitTask( ccInfo.PCMParticipantID, pcmImgInfo.PCMImageID, // TODO 选择资源的算法 @@ -176,17 +161,16 @@ func (s *NormalJobExecuting) do(rtx jobmgr.JobStateRunContext, jo *jobmgr.Job) e // params, TODO params不应该是kv数组,而应该是字符串数组 []schsdk.KVPair{}, ), ccInfo) - defer wt.Close() + if err != nil { + log.Error(err.Error()) + return err + } + + taskFut := task.Receive() for { - status, err := wt.Receive(ctx) - if err != nil { - return err - } - tskStatus := status.(*exetsk.SubmitTaskStatus) - if tskStatus.Error != "" { - return fmt.Errorf("submitting task: %s", tskStatus.Error) - } + msg := <-taskFut.Chan() + tskStatus := msg.Value.Status.(*exetsk.SubmitTaskStatus) if tskStatus.Status != s.lastStatus { log.Infof("task %s -> %s", s.lastStatus, tskStatus.Status) @@ -203,37 +187,63 @@ func (s *NormalJobExecuting) do(rtx jobmgr.JobStateRunContext, jo *jobmgr.Job) e case pcmsdk.TaskStatusFailed: return fmt.Errorf("task failed") } + + taskFut = task.Receive() } } -// 模拟 -var nodesAddress = []string{"120.46.183.86:22", "121.36.5.116:22"} -var count = 0 +func (s *NormalJobExecuting) listen(rtx jobmgr.JobStateRunContext, jo *jobmgr.Job, task *jobTask.JobTask[mgrmq.ExecutorTaskStatus], ccInfo schmod.ComputingCenter) error { + log := logger.WithType[NormalJobExecuting]("State").WithField("TaskID", task.ID()) -func nodeExpansion(files *jobmod.JobFiles) schsdk.Address { - if count >= 2 { - logger.Info("There is no available node") - return "" + waitFut := event.BeginWaitType[*event.Update](rtx.EventSet) + taskFut := task.Receive() + + for { + select { + case v1 := <-waitFut.Chan(): + // 对任务进行更新操作 + client, err := executormgr.ExecutorPool.AcquireByUrl(ccInfo.ExecutorURL) + if err != nil { + return fmt.Errorf("getting executor client: %w", err) + } + evt := v1.Value.(*event.Update) + operateResp, err := client.OperateTask(executor.NewTaskOperateInfo(task.ID(), evt.Command)) + if err != nil { + return fmt.Errorf("operate task: %w", err) + } + + evt.Result.SetValue(event.UpdateResult{ + Err: operateResp.Err, + }) + + if operateResp.Err != nil { + return fmt.Errorf("operate task: %w", operateResp.Err) + } + + // 持续等待 + waitFut = event.BeginWaitType[*event.Update](rtx.EventSet) + case msg := <-taskFut.Chan(): + switch v2 := msg.Value.Status.(type) { + case *exetsk.ScheduleCreateECSStatus: + // 扩容任务,将结果放到池子中 + node := schsdk.NodeInfo{ + InstanceID: jo.JobID, + Address: schsdk.Address(v2.Address), + } + + jobmgr.SetNodeData(schsdk.JobID(jo.JobSetID), v2.ModelID, node) + log.Infof("node expansion: %v", v2.Address) + case error: + fmt.Println("Received error:", v2.Error()) + default: + fmt.Println("Received unexpected type") + } + + // 持续接收 + taskFut = task.Receive() + + } } - address := nodesAddress[count] - count++ - - client := utils.GetSSHClient("pcm", "", address) - defer client.Close() - // 创建SSH会话 - session, err := client.NewSession() - if err != nil { - logger.Warn("Failed to create session: %s", err) - } - defer session.Close() - - // 执行远程命令 - output, err := session.CombinedOutput("sh /home/pcm/modeltest/http/start.sh") - if err != nil { - logger.Warn("Failed to run command: %s", err) - } - - return schsdk.Address(output) } type DataReturnJobExecuting struct { @@ -277,20 +287,24 @@ func (s *DataReturnJobExecuting) do(rtx jobmgr.JobStateRunContext, jo *jobmgr.Jo } packageName := utils.MakeResourcePackageName(reJob.TargetJobID) - wt := rtx.Mgr.ExecMgr.StartTask(exetsk.NewStorageCreatePackage( + task, err := rtx.Mgr.ExecMgr.StartTask(exetsk.NewStorageCreatePackage( userID, // TOOD 用户ID ccInfo.CDSStorageID, reJob.TargetJobOutputPath, reJob.Info.BucketID, packageName, ), ccInfo) - defer wt.Close() + if err != nil { + log.Error(err.Error()) + return err + } - status, err := wt.Receive(ctx) + fut := task.Receive() if err != nil { return err } - tskStatus := status.(*exetsk.StorageCreatePackageStatus) + status := <-fut.Chan() + tskStatus := status.Value.Status.(*exetsk.StorageCreatePackageStatus) if tskStatus.Error != "" { return fmt.Errorf("creating package: %s", tskStatus.Error) } diff --git a/manager/internal/jobmgr/job/state/multiInstance_running.go b/manager/internal/jobmgr/job/state/multiInstance_running.go index 297ce8b..86a03b8 100644 --- a/manager/internal/jobmgr/job/state/multiInstance_running.go +++ b/manager/internal/jobmgr/job/state/multiInstance_running.go @@ -3,6 +3,7 @@ package state import ( "context" "fmt" + "gitlink.org.cn/cloudream/common/pkgs/future" "gitlink.org.cn/cloudream/common/pkgs/logger" schsdk "gitlink.org.cn/cloudream/common/sdks/scheduler" jobmod "gitlink.org.cn/cloudream/scheduler/common/models/job" @@ -11,6 +12,8 @@ import ( "gitlink.org.cn/cloudream/scheduler/manager/internal/jobmgr" "gitlink.org.cn/cloudream/scheduler/manager/internal/jobmgr/event" "gitlink.org.cn/cloudream/scheduler/manager/internal/jobmgr/job" + "strings" + "sync" ) type MultiInstanceRunning struct { @@ -39,67 +42,120 @@ func (s *MultiInstanceRunning) do(rtx jobmgr.JobStateRunContext, jo *jobmgr.Job) multInstJob := jo.Body.(*job.MultiInstanceJob) + waitFut := event.BeginWaitType[*event.InstanceOperate](rtx.EventSet) for { - // 监听创建实例事件 - ic, ok := event.WaitType[*event.InstanceCreate](ctx, rtx.EventSet) - if !ok { - logger.Info("MultiInstanceRunning canceled") - break - } + chanValue := <-waitFut.Chan() + instanceInfo := chanValue.Value.(*event.InstanceOperate) + instanceFuture := instanceInfo.Result logger.Info("wait a event happened") + waitFut = event.BeginWaitType[*event.InstanceOperate](rtx.EventSet) - dataSet := ic.DataSet - //如果是模型扩容任务,直接使用父Job的资源文件 - if &multInstJob.Info.ModelJobInfo != nil { - dataSet = multInstJob.Info.Files.Dataset + switch info := instanceInfo.Info.(type) { + case *event.InstanceCreateInfo: + createInstance(rtx, info, s.preScheduler, jo, multInstJob, instanceFuture) + case *event.InstanceUpdateInfo: + updateInstance(rtx, info, multInstJob, instanceFuture) } - // 构建InstanceJobInfo - infoFiles := schsdk.JobFilesInfo{ - Dataset: dataSet, - Code: multInstJob.Info.Files.Code, - Image: multInstJob.Info.Files.Image, - } - - newLocalJobID := fmt.Sprintf("%s_%s", multInstJob.Info.LocalJobID, utils.GenerateRandomID()) - - instJobInfo := &schsdk.InstanceJobInfo{ - Type: schsdk.JobTypeInstance, - LocalJobID: newLocalJobID, - Files: infoFiles, - Runtime: multInstJob.Info.Runtime, - Resources: multInstJob.Info.Resources, - ModelJobInfo: multInstJob.Info.ModelJobInfo, - } - - files := jobmod.JobFiles{ - Code: multInstJob.Files.Code, - Image: multInstJob.Files.Image, - } - - // 生成预调度方案和文件上传方案 - jobSchedule, filesUploadScheme, err := s.preScheduler.ScheduleJob(instJobInfo) - if err != nil { - ic.Result.SetError(err) - continue - } - - // 创建实例并运行 - instanceJob := job.NewInstanceJob(*instJobInfo, files) - jobID := rtx.Mgr.AddJob(jo.JobSetID, instanceJob, NewPreSchuduling(*jobSchedule)) - - // 在多实例任务中新增这个实例的任务ID - multInstJob.SubJobs = append(multInstJob.SubJobs, jobID) - - // 将实例ID和文件上传方案返回 - ic.Result.SetValue(event.CreateInstanceResult{ - JobID: jobID, - FilesUploadScheme: *filesUploadScheme, - }) - } } +func updateInstance(rtx jobmgr.JobStateRunContext, updateInfo *event.InstanceUpdateInfo, parentJob *job.MultiInstanceJob, updateInstanceFuture event.OperateInstanceFuture) { + + // 更新策略 + strategy := updateInfo.Info.UpdateStrategy + println("update strategy: " + strategy) + + var failJobs []string + var wg sync.WaitGroup + + for i := 0; i < len(parentJob.SubJobs); i++ { + // 发送请求进行任务更新 + instanceID := parentJob.SubJobs[i] + wg.Add(1) + go func() { + defer wg.Done() + fut := future.NewSetValue[event.UpdateResult]() + rtx.Mgr.PostEvent(instanceID, event.NewUpdate("update", fut)) + _, err := fut.Wait(context.TODO()) + + if err != nil { + logger.Error(err.Error()) + failJobs = append(failJobs, string(instanceID)) + } + println() + }() + } + + wg.Wait() + + if len(failJobs) == 0 { + updateInstanceFuture.SetValue(event.OperateInstanceResult{ + Err: nil, + }) + return + } + + // 返回更新失败的instance + result := strings.Join(failJobs, ",") + updateInstanceFuture.SetValue(event.OperateInstanceResult{ + OperateResult: result, + Err: fmt.Errorf("error"), + }) +} + +func createInstance(rtx jobmgr.JobStateRunContext, info *event.InstanceCreateInfo, preScheduler prescheduler.PreScheduler, jo *jobmgr.Job, multInstJob *job.MultiInstanceJob, future event.OperateInstanceFuture) { + dataSet := info.DataSet + + //如果是模型扩容任务,直接使用父Job的资源文件 + if &multInstJob.Info.ModelJobInfo != nil { + dataSet = multInstJob.Info.Files.Dataset + } + + // 构建InstanceJobInfo + infoFiles := schsdk.JobFilesInfo{ + Dataset: dataSet, + Code: multInstJob.Info.Files.Code, + Image: multInstJob.Info.Files.Image, + } + + newLocalJobID := fmt.Sprintf("%s_%s", multInstJob.Info.LocalJobID, utils.GenerateRandomID()) + + instJobInfo := &schsdk.InstanceJobInfo{ + Type: schsdk.JobTypeInstance, + LocalJobID: newLocalJobID, + Files: infoFiles, + Runtime: multInstJob.Info.Runtime, + Resources: multInstJob.Info.Resources, + ModelJobInfo: multInstJob.Info.ModelJobInfo, + } + + files := jobmod.JobFiles{ + Code: multInstJob.Files.Code, + Image: multInstJob.Files.Image, + } + + // 生成预调度方案和文件上传方案 + jobSchedule, filesUploadScheme, err := preScheduler.ScheduleJob(instJobInfo) + if err != nil { + future.SetError(err) + return + } + + // 创建实例并运行 + instanceJob := job.NewInstanceJob(*instJobInfo, files) + jobID := rtx.Mgr.AddJob(jo.JobSetID, instanceJob, NewPreSchuduling(*jobSchedule)) + + // 在多实例任务中新增这个实例的任务ID + multInstJob.SubJobs = append(multInstJob.SubJobs, jobID) + + // 将实例ID和文件上传方案返回 + future.SetValue(event.OperateInstanceResult{ + JobID: jobID, + FilesUploadScheme: *filesUploadScheme, + }) +} + func (s *MultiInstanceRunning) Dump(ctx jobmgr.JobStateRunContext, job *jobmgr.Job) jobmod.JobStateDump { return &jobmod.MultiInstCreateRunningDump{} } diff --git a/manager/internal/jobmgr/job/state/multiInstance_update.go b/manager/internal/jobmgr/job/state/multiInstance_update.go new file mode 100644 index 0000000..b44031b --- /dev/null +++ b/manager/internal/jobmgr/job/state/multiInstance_update.go @@ -0,0 +1,134 @@ +package state + +import ( + "context" + "fmt" + "gitlink.org.cn/cloudream/common/pkgs/future" + "gitlink.org.cn/cloudream/common/pkgs/logger" + schsdk "gitlink.org.cn/cloudream/common/sdks/scheduler" + cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" + schglb "gitlink.org.cn/cloudream/scheduler/common/globals" + jobmod "gitlink.org.cn/cloudream/scheduler/common/models/job" + exectsk "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq/executor/task" + "gitlink.org.cn/cloudream/scheduler/manager/internal/jobmgr" + "gitlink.org.cn/cloudream/scheduler/manager/internal/jobmgr/event" + "gitlink.org.cn/cloudream/scheduler/manager/internal/jobmgr/job" +) + +type MultiInstanceUpdate struct { + originalJob jobmod.JobDump +} + +func NewMultiInstanceUpdate(originalJob jobmod.JobDump) *MultiInstanceUpdate { + return &MultiInstanceUpdate{ + originalJob: originalJob, + } +} + +func (s *MultiInstanceUpdate) Run(rtx jobmgr.JobStateRunContext, job *jobmgr.Job) { + s.do(rtx, job) +} + +func (s *MultiInstanceUpdate) do(rtx jobmgr.JobStateRunContext, jo *jobmgr.Job) error { + updateJob := jo.Body.(*job.UpdateMultiInstanceJob) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // 监听取消事件 + go func() { + event.WaitType[*event.Cancel](ctx, rtx.EventSet) + cancel() + }() + + var pkgID cdssdk.PackageID + // 等待回源任务完成 + if rt, ok := updateJob.Info.Files.Code.(*schsdk.DataReturnJobFileInfo); ok { + evt, ok := event.WaitTypeAnd[*event.JobCompleted](ctx, rtx.EventSet, func(val *event.JobCompleted) bool { + return val.Job.GetInfo().GetLocalJobID() == rt.DataReturnLocalJobID + }) + if !ok { + return jobmgr.ErrJobCancelled + } + if evt.Err != nil { + return fmt.Errorf("depended job %s was failed", evt.Job.JobID) + } + rtJob, ok := evt.Job.Body.(*job.DataReturnJob) + if !ok { + return fmt.Errorf("job %s is not a DataReturn job(which is %T)", evt.Job.JobID, evt.Job) + } + pkgID = rtJob.DataReturnPackageID + } + + // 获取包对象列表 + stgCli, err := schglb.CloudreamStoragePool.Acquire() + if err != nil { + return fmt.Errorf("new cloudream storage client: %w", err) + } + defer schglb.CloudreamStoragePool.Release(stgCli) + // TODO UserID + pkgObjs, err := stgCli.Object().GetPackageObjects(cdssdk.ObjectGetPackageObjects{UserID: 1, PackageID: pkgID}) + if err != nil { + return fmt.Errorf("getting package objects: %w", err) + } + + // 获取原始任务信息 + originalMultiInstanceJobBody := s.originalJob.Body.(*jobmod.MultiInstanceJobDump) + originalPackageID := originalMultiInstanceJobBody.Files.Code.PackageID + var objArr []cdssdk.MovingObject + for _, obj := range pkgObjs.Objects { + objArr = append(objArr, cdssdk.MovingObject{ + ObjectID: obj.ObjectID, + PackageID: originalPackageID, + Path: obj.Path, + }) + } + // TODO UserID + objMoveParam := cdssdk.ObjectMove{ + UserID: 1, + Movings: objArr, + } + + ccInfo, err := rtx.Mgr.DB.ComputingCenter().GetByID(rtx.Mgr.DB.SQLCtx(), originalMultiInstanceJobBody.TargetCCID) + if err != nil { + return fmt.Errorf("getting computing center info: %w", err) + } + + // 将增量包合并到原有包中 + taskStatus, err := rtx.Mgr.ExecMgr.StartTask(exectsk.NewStorageMoveObject(objMoveParam), ccInfo) + if err != nil { + return fmt.Errorf("moving package: %w", err) + } + + statusFut := taskStatus.Receive() + status := <-statusFut.Chan() + moveStatus := status.Value.Status.(*exectsk.StorageMoveObjectStatus) + if moveStatus.Error != "" { + return fmt.Errorf("moving package: %s", moveStatus.Error) + } + + // 发送事件,更新各个instance + updateInfo := event.InstanceUpdateInfo{ + Info: updateJob.Info, + } + fut := future.NewSetValue[event.OperateInstanceResult]() + rtx.Mgr.PostEvent(s.originalJob.JobID, event.NewInstanceOperate(&updateInfo, fut)) + + result, err := fut.Wait(context.TODO()) + + if err != nil { + return err + } + println(result.JobID) + + if result.Err != nil { + return fmt.Errorf("update instance failed: %s", result.OperateResult) + } + + logger.Info("update instance success!") + return nil +} + +func (s *MultiInstanceUpdate) Dump(ctx jobmgr.JobStateRunContext, job *jobmgr.Job) jobmod.JobStateDump { + return &jobmod.MultiInstanceUpdateDump{} +} diff --git a/manager/internal/jobmgr/job/state/prescheduling.go b/manager/internal/jobmgr/job/state/prescheduling.go index 8d6fe7d..3a8dd12 100644 --- a/manager/internal/jobmgr/job/state/prescheduling.go +++ b/manager/internal/jobmgr/job/state/prescheduling.go @@ -150,15 +150,15 @@ func (s *PreScheduling) doPackageScheduling(ctx context.Context, rtx jobmgr.JobS if scheme.Action == jobmod.ActionMove { logger.Debugf("begin move pacakge %v to %v", file.PackageID, s.targetCCInfo.CDSNodeID) - wt := rtx.Mgr.ExecMgr.StartTask(exectsk.NewCacheMovePackage(1, file.PackageID, s.targetCCInfo.CDSNodeID), s.targetCCInfo) - defer wt.Close() - - status, err := wt.Receive(ctx) + taskStatus, err := rtx.Mgr.ExecMgr.StartTask(exectsk.NewCacheMovePackage(1, file.PackageID, s.targetCCInfo.CDSNodeID), s.targetCCInfo) if err != nil { return fmt.Errorf("moving package: %w", err) } - moveStatus := status.(*exectsk.CacheMovePackageStatus) + fut := taskStatus.Receive() + status := <-fut.Chan() + + moveStatus := status.Value.Status.(*exectsk.CacheMovePackageStatus) if moveStatus.Error != "" { return fmt.Errorf("moving package: %s", moveStatus.Error) } @@ -169,15 +169,15 @@ func (s *PreScheduling) doPackageScheduling(ctx context.Context, rtx jobmgr.JobS if scheme.Action == jobmod.ActionLoad { logger.Debugf("begin load pacakge %v to %v", file.PackageID, s.targetCCInfo.CDSStorageID) - wt := rtx.Mgr.ExecMgr.StartTask(exectsk.NewStorageLoadPackage(1, file.PackageID, s.targetCCInfo.CDSStorageID), s.targetCCInfo) - defer wt.Close() - - status, err := wt.Receive(ctx) + taskStatus, err := rtx.Mgr.ExecMgr.StartTask(exectsk.NewStorageLoadPackage(1, file.PackageID, s.targetCCInfo.CDSStorageID), s.targetCCInfo) if err != nil { return fmt.Errorf("moving package: %w", err) } - loadStatus := status.(*exectsk.StorageLoadPackageStatus) + fut := taskStatus.Receive() + status := <-fut.Chan() + + loadStatus := status.Value.Status.(*exectsk.StorageLoadPackageStatus) if loadStatus.Error != "" { return fmt.Errorf("moving package: %s", loadStatus.Error) } @@ -228,15 +228,15 @@ func (s *PreScheduling) doImageScheduling(ctx context.Context, rtx jobmgr.JobSta } // TODO UserID - wt := rtx.Mgr.ExecMgr.StartTask(exectsk.NewCacheMovePackage(1, *file.PackageID, s.targetCCInfo.CDSNodeID), s.targetCCInfo) - defer wt.Close() - - status, err := wt.Receive(ctx) + taskStatus, err := rtx.Mgr.ExecMgr.StartTask(exectsk.NewCacheMovePackage(1, *file.PackageID, s.targetCCInfo.CDSNodeID), s.targetCCInfo) if err != nil { return fmt.Errorf("moving package: %w", err) } - moveStatus := status.(*exectsk.CacheMovePackageStatus) + fut := taskStatus.Receive() + status := <-fut.Chan() + + moveStatus := status.Value.Status.(*exectsk.CacheMovePackageStatus) if moveStatus.Error != "" { return fmt.Errorf("moving package: %s", moveStatus.Error) } @@ -261,15 +261,15 @@ func (s *PreScheduling) doImageScheduling(ctx context.Context, rtx jobmgr.JobSta return fmt.Errorf("there must be only 1 object in the package which will be imported") } - wt2 := rtx.Mgr.ExecMgr.StartTask(exectsk.NewUploadImage(s.targetCCInfo.PCMParticipantID, cdssdk.MakeIPFSFilePath(pkgObjs.Objects[0].FileHash)), s.targetCCInfo) - defer wt2.Close() - - status2, err := wt2.Receive(ctx) + taskStatus2, err := rtx.Mgr.ExecMgr.StartTask(exectsk.NewUploadImage(s.targetCCInfo.PCMParticipantID, cdssdk.MakeIPFSFilePath(pkgObjs.Objects[0].FileHash)), s.targetCCInfo) if err != nil { - return fmt.Errorf("uploading image: %w", err) + return fmt.Errorf("moving package: %w", err) } - uploadStatus := status2.(*exectsk.UploadImageStatus) + fut2 := taskStatus2.Receive() + status2 := <-fut2.Chan() + + uploadStatus := status2.Value.Status.(*exectsk.UploadImageStatus) if uploadStatus.Error != "" { return fmt.Errorf("uploading image: %s", uploadStatus.Error) } diff --git a/manager/internal/jobmgr/jobmgr.go b/manager/internal/jobmgr/jobmgr.go index 355833f..b6ee2e0 100644 --- a/manager/internal/jobmgr/jobmgr.go +++ b/manager/internal/jobmgr/jobmgr.go @@ -232,7 +232,7 @@ func (m *Manager) AddJob(jobSetID schsdk.JobSetID, jobBody JobBody, jobState Job m.pubLock.Lock() defer m.pubLock.Unlock() - jobID := schsdk.JobID(fmt.Sprintf("%d", m.jobIDIndex+1)) + jobID := schsdk.JobID(fmt.Sprintf("%d", m.jobIDIndex)) m.jobIDIndex += 1 job := &mgrJob{ diff --git a/manager/internal/mq/executor.go b/manager/internal/mq/executor.go index f5235c1..bf2f6e3 100644 --- a/manager/internal/mq/executor.go +++ b/manager/internal/mq/executor.go @@ -1,11 +1,6 @@ package mq -import ( - "gitlink.org.cn/cloudream/common/pkgs/mq" - mgrmq "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq/manager" -) - -func (svc *Service) ReportExecutorTaskStatus(msg *mgrmq.ReportExecutorTaskStatus) (*mgrmq.ReportExecutorTaskStatusResp, *mq.CodeMessage) { - //svc.exeMgr.Report(msg.ExecutorID, msg.TaskStatus) - return mq.ReplyOK(mgrmq.NewReportExecutorTaskStatusResp()) -} +//func (svc *Service) ReportExecutorTaskStatus(msg *mgrmq.ReportExecutorTaskStatus) (*mgrmq.ReportExecutorTaskStatusResp, *mq.CodeMessage) { +// svc.exeMgr.Report(msg.ExecutorID, msg.TaskStatus) +// return mq.ReplyOK(mgrmq.NewReportExecutorTaskStatusResp()) +//} diff --git a/manager/internal/mq/job.go b/manager/internal/mq/job.go index 428e0e8..4213566 100644 --- a/manager/internal/mq/job.go +++ b/manager/internal/mq/job.go @@ -28,7 +28,7 @@ func (svc *Service) SubmitJobSet(msg *mgrmq.SubmitJobSet) (*mgrmq.SubmitJobSetRe for _, jobInfo := range msg.JobSet.Jobs { switch info := jobInfo.(type) { case *schsdk.NormalJobInfo: - job := job.NewNormalJob(*info) + jo := job.NewNormalJob(*info) preSch, ok := msg.PreScheduleScheme.JobSchemes[info.LocalJobID] if !ok { @@ -36,31 +36,52 @@ func (svc *Service) SubmitJobSet(msg *mgrmq.SubmitJobSet) (*mgrmq.SubmitJobSetRe } jobs = append(jobs, jobmgr.SubmittingJob{ - Body: job, + Body: jo, InitState: state.NewPreSchuduling(preSch), }) case *schsdk.DataReturnJobInfo: - job := job.NewDataReturnJob(*info) + jo := job.NewDataReturnJob(*info) jobs = append(jobs, jobmgr.SubmittingJob{ - Body: job, + Body: jo, InitState: state.NewWaitTargetComplete(), }) case *schsdk.MultiInstanceJobInfo: preSch, ok := msg.PreScheduleScheme.JobSchemes[info.LocalJobID] - job := job.NewMultiInstanceJob(*info, preSch) + jo := job.NewMultiInstanceJob(*info, preSch) if !ok { return nil, mq.Failed(errorcode.OperationFailed, fmt.Sprintf("pre schedule scheme for job %s is not found", info.LocalJobID)) } jobs = append(jobs, jobmgr.SubmittingJob{ - Body: job, + Body: jo, InitState: state.NewMultiInstanceInit(), }) + case *schsdk.UpdateMultiInstanceJobInfo: + modelJob := job.NewUpdateMultiInstanceJob(*info) + instanceJobSets := svc.jobMgr.DumpJobSet(modelJob.Info.MultiInstanceJobSetID) + if len(instanceJobSets) == 0 { + return nil, mq.Failed(errorcode.OperationFailed, fmt.Sprintf("job set %s is not found", modelJob.Info.MultiInstanceJobSetID)) + } + + // 找到多实例任务本身 + var multiInstanceJobDump jobmod.JobDump + for i := 0; i < len(instanceJobSets); i++ { + jobDump := instanceJobSets[i] + if _, ok := jobDump.Body.(*jobmod.MultiInstanceJobDump); ok { + multiInstanceJobDump = jobDump + break + } + } + + jobs = append(jobs, jobmgr.SubmittingJob{ + Body: modelJob, + InitState: state.NewMultiInstanceUpdate(multiInstanceJobDump), + }) } } @@ -70,10 +91,13 @@ func (svc *Service) SubmitJobSet(msg *mgrmq.SubmitJobSet) (*mgrmq.SubmitJobSetRe func (svc *Service) CreateInstance(instInfo *mgrmq.CreateInstance) (*mgrmq.CreateInstanceResp, *mq.CodeMessage) { logger.Debugf("start create instance") - fut := future.NewSetValue[event.CreateInstanceResult]() - svc.jobMgr.PostEvent(instInfo.JobID, event.NewInstanceCreate(instInfo.DataSet, fut)) + fut := future.NewSetValue[event.OperateInstanceResult]() + info := event.InstanceCreateInfo{ + DataSet: instInfo.DataSet, + } + svc.jobMgr.PostEvent(instInfo.JobID, event.NewInstanceOperate(&info, fut)) - result, err := fut.WaitValue(context.TODO()) + result, err := fut.Wait(context.TODO()) if err != nil { return nil, mq.Failed(errorcode.OperationFailed, err.Error()) diff --git a/manager/internal/task/task.go b/manager/internal/task/task.go new file mode 100644 index 0000000..a002685 --- /dev/null +++ b/manager/internal/task/task.go @@ -0,0 +1,51 @@ +package jobTask + +import ( + "fmt" + "gitlink.org.cn/cloudream/common/pkgs/async" + "gitlink.org.cn/cloudream/common/pkgs/future" + "gitlink.org.cn/cloudream/common/pkgs/logger" + "math/rand" + "time" +) + +type JobTask[T any] struct { + id string + taskChan async.UnboundChannel[T] +} + +func NewJobTask[T any]() *JobTask[T] { + return &JobTask[T]{ + id: getTaskID(), + taskChan: *async.NewUnboundChannel[T](), + } +} + +func getTaskID() string { + now := time.Now() + nano := now.UnixNano() + rand.Seed(time.Now().UnixNano()) + randomNumber := rand.Intn(9000) + 1000 // 生成1000到9999之间的随机数 + + taskID := fmt.Sprintf("id_%d_%d", nano, randomNumber) + + return taskID +} + +func (c *JobTask[T]) Receive() future.Future1[T] { + + return c.taskChan.Receive() +} + +func (c *JobTask[T]) Send(info any) { + + logger.Info("send http") +} + +func (c *JobTask[T]) Chan() *async.UnboundChannel[T] { + return &c.taskChan +} + +func (c *JobTask[T]) ID() string { + return c.id +} diff --git a/manager/main.go b/manager/main.go index c5ca7d0..99c28d1 100644 --- a/manager/main.go +++ b/manager/main.go @@ -76,7 +76,8 @@ func main() { // 启动服务 go serveJobManager(jobMgr) - go serveExecutorManager(exeMgr) + //go serveExecutorManager(exeMgr) + go exeMgr.Serve() go serveAdvisorManager(advMgr) @@ -108,17 +109,6 @@ func serveMQServer(server *mgrmq.Server) { logger.Info("mq server stopped") } -func serveExecutorManager(mgr *executormgr.Manager) { - logger.Info("start serving executor manager") - - err := mgr.Serve() - if err != nil { - logger.Errorf("executor manager stopped with error: %s", err.Error()) - } - - logger.Info("executor manager stopped") -} - func serveAdvisorManager(mgr *advisormgr.Manager) { logger.Info("start serving advisor manager")