修复pcm联调问题

This commit is contained in:
Sydonian 2023-11-03 11:18:01 +08:00
parent 3cb7095095
commit 66239ff8a8
11 changed files with 119 additions and 106 deletions

View File

@ -2,7 +2,6 @@ package config
import (
log "gitlink.org.cn/cloudream/common/pkgs/logger"
schsdk "gitlink.org.cn/cloudream/common/sdks/scheduler"
stgsdk "gitlink.org.cn/cloudream/common/sdks/storage"
uopsdk "gitlink.org.cn/cloudream/common/sdks/unifyops"
c "gitlink.org.cn/cloudream/common/utils/config"
@ -10,12 +9,11 @@ import (
)
type Config struct {
Logger log.Config `json:"logger"`
RabbitMQ mymq.Config `json:"rabbitMQ"`
CloudreamStorage stgsdk.Config `json:"cloudreamStorage"`
UnifyOps uopsdk.Config `json:"unifyOps"`
SlwNodes []SlwNodeConfig `json:"slwNodes"`
// PCM cldstg.Config `json:"pcm"`
Logger log.Config `json:"logger"`
RabbitMQ mymq.Config `json:"rabbitMQ"`
CloudreamStorage stgsdk.Config `json:"cloudreamStorage"`
UnifyOps uopsdk.Config `json:"unifyOps"`
SlwNodes []uopsdk.SlwNode `json:"slwNodes"`
}
var cfg Config
@ -27,9 +25,3 @@ func Init() error {
func Cfg() *Config {
return &cfg
}
type SlwNodeConfig struct {
SlwNodeID schsdk.SlwNodeID `json:"slwNodeID"`
StgNodeID int64 `json:"stgNodeID"`
StorageID int64 `json:"StorageID"`
}

View File

@ -19,30 +19,23 @@ func (svc *Service) GetSlwNodeInfo(msg *colmq.GetSlwNodeInfo) (*colmq.GetSlwNode
}
defer schglb.UnifyOpsPool.Release(uniOpsCli)
resp, err := uniOpsCli.GetAllSlwNodeInfo()
if err != nil {
logger.Warnf("get slwNode info failed, err: %s", err.Error())
return nil, mq.Failed(errorcode.OperationFailed, "get slwNode info failed")
}
// resp, err := uniOpsCli.GetAllSlwNodeInfo()
// if err != nil {
// logger.Warnf("get slwNode info failed, err: %s", err.Error())
// return nil, mq.Failed(errorcode.OperationFailed, "get slwNode info failed")
// }
node, ok := lo.Find(resp.Nodes, func(item uopsdk.SlwNode) bool { return item.ID == msg.SlwNodeID })
var resp []uopsdk.SlwNode
// TODO 目前计算中心对应的存储系统NodeID和StorageID是写在此服务的配置文件中的所以这里进行两个数据源的合并
resp = svc.mergeConfig(resp)
node, ok := lo.Find(resp, func(item uopsdk.SlwNode) bool { return item.ID == msg.SlwNodeID })
if !ok {
logger.WithField("SlwNodeID", msg.SlwNodeID).
Warnf("slw node not found")
return nil, mq.Failed(errorcode.OperationFailed, "slw node not found")
}
// TODO 目前计算中心对应的存储系统NodeID和StorageID是写在此服务的配置文件中的所以这里进行两个数据源的合并
nodeConfig, ok := lo.Find(config.Cfg().SlwNodes, func(item config.SlwNodeConfig) bool { return item.SlwNodeID == msg.SlwNodeID })
if !ok {
logger.WithField("SlwNodeID", msg.SlwNodeID).
Warnf("config not found for this slw node")
return nil, mq.Failed(errorcode.OperationFailed, "config not found for this slw node")
}
node.StgNodeID = nodeConfig.StgNodeID
node.StorageID = nodeConfig.StorageID
return mq.ReplyOK(colmq.NewGetSlwNodeInfoResp(node))
}
@ -54,23 +47,30 @@ func (svc *Service) GetAllSlwNodeInfo(msg *colmq.GetAllSlwNodeInfo) (*colmq.GetA
}
defer schglb.UnifyOpsPool.Release(uniOpsCli)
resp, err := uniOpsCli.GetAllSlwNodeInfo()
if err != nil {
logger.Warnf("get slwNode info failed, err: %s", err.Error())
return nil, mq.Failed(errorcode.OperationFailed, "get slwNode info failed")
}
//resp, err := uniOpsCli.GetAllSlwNodeInfo()
//if err != nil {
// logger.Warnf("get slwNode info failed, err: %s", err.Error())
// return nil, mq.Failed(errorcode.OperationFailed, "get slwNode info failed")
//}
var resp []uopsdk.SlwNode
// TODO 目前计算中心对应的存储系统NodeID和StorageID是写在此服务的配置文件中的所以这里进行两个数据源的合并
for i, node := range resp.Nodes {
nodeConfig, ok := lo.Find(config.Cfg().SlwNodes, func(item config.SlwNodeConfig) bool { return item.SlwNodeID == node.ID })
resp = svc.mergeConfig(resp)
return mq.ReplyOK(colmq.NewGetAllSlwNodeInfoResp(resp))
}
func (svc *Service) mergeConfig(infos []uopsdk.SlwNode) []uopsdk.SlwNode {
for _, configNode := range config.Cfg().SlwNodes {
infoNode, ok := lo.Find(infos, func(item uopsdk.SlwNode) bool { return item.ID == configNode.ID })
if !ok {
infos = append(infos, configNode)
continue
}
node.StgNodeID = nodeConfig.StgNodeID
node.StorageID = nodeConfig.StorageID
resp.Nodes[i] = node
infoNode.StgNodeID = configNode.StgNodeID
infoNode.StorageID = configNode.StorageID
}
return mq.ReplyOK(colmq.NewGetAllSlwNodeInfoResp(resp.Nodes))
return infos
}

View File

@ -19,7 +19,8 @@
},
"slwNodes": [
{
"slwNodeID": 1,
"id": 1711652475901054976,
"name": "hwj",
"stgNodeID": 1,
"storageID": 1
}

View File

@ -1,36 +0,0 @@
package task
import (
schsdk "gitlink.org.cn/cloudream/common/sdks/scheduler"
)
var _ = Register[*ScheduleTask, *ScheduleTaskStatus]()
type ScheduleTask struct {
TaskInfoBase
SlwNodeID schsdk.SlwNodeID `json:"slwNodeID"`
Envs []schsdk.KVPair `json:"envs"`
SlwNodeImageID schsdk.SlwNodeImageID `json:"slwNodeImageID"`
CMDLine string `json:"cmdLine"`
}
type ScheduleTaskStatus struct {
TaskStatusBase
Status string `json:"status"`
Error string `json:"error"`
}
func NewScheduleTask(slwNodeID schsdk.SlwNodeID, envs []schsdk.KVPair, slwNodeImageID schsdk.SlwNodeImageID, cmdLine string) *ScheduleTask {
return &ScheduleTask{
SlwNodeID: slwNodeID,
Envs: envs,
SlwNodeImageID: slwNodeImageID,
CMDLine: cmdLine,
}
}
func NewScheduleTaskStatus(status string, err string) *ScheduleTaskStatus {
return &ScheduleTaskStatus{
Status: status,
Error: err,
}
}

View File

@ -0,0 +1,39 @@
package task
import (
pcmsdk "gitlink.org.cn/cloudream/common/sdks/pcm"
schsdk "gitlink.org.cn/cloudream/common/sdks/scheduler"
)
var _ = Register[*SubmitTask, *SubmitTaskStatus]()
type SubmitTask struct {
TaskInfoBase
SlwNodeID schsdk.SlwNodeID `json:"slwNodeID"`
SlwNodeImageID schsdk.SlwNodeImageID `json:"slwNodeImageID"`
SlwNodeResourceID pcmsdk.ResourceID `json:"slwNodeResourceID"`
CMD string `json:"cmd"`
Envs []schsdk.KVPair `json:"envs"`
}
type SubmitTaskStatus struct {
TaskStatusBase
Status pcmsdk.TaskStatus `json:"status"`
Error string `json:"error"`
}
func NewSubmitTask(slwNodeID schsdk.SlwNodeID, slwNodeImageID schsdk.SlwNodeImageID, slwNodeResourceID pcmsdk.ResourceID, cmd string, envs []schsdk.KVPair) *SubmitTask {
return &SubmitTask{
SlwNodeID: slwNodeID,
SlwNodeImageID: slwNodeImageID,
SlwNodeResourceID: slwNodeResourceID,
CMD: cmd,
Envs: envs,
}
}
func NewSubmitTaskStatus(status pcmsdk.TaskStatus, err string) *SubmitTaskStatus {
return &SubmitTaskStatus{
Status: status,
Error: err,
}
}

View File

@ -23,7 +23,7 @@ func NewCacheMovePackage(info *exectsk.CacheMovePackage) *CacheMovePackage {
func (t *CacheMovePackage) Execute(task *task.Task[TaskContext], ctx TaskContext, complete CompleteFn) {
log := logger.WithType[CacheMovePackage]("Task")
log.Debugf("begin with %v", logger.FormatStruct(t))
log.Debugf("begin with %v", logger.FormatStruct(t.CacheMovePackage))
defer log.Debugf("end")
cacheInfos, err := t.do(ctx)

View File

@ -5,6 +5,7 @@ import (
"time"
pcmsdk "gitlink.org.cn/cloudream/common/sdks/pcm"
schsdk "gitlink.org.cn/cloudream/common/sdks/scheduler"
"gitlink.org.cn/cloudream/common/pkgs/logger"
"gitlink.org.cn/cloudream/common/pkgs/task"
@ -12,25 +13,25 @@ import (
exectsk "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq/executor/task"
)
type PCMScheduleTask struct {
*exectsk.ScheduleTask
type PCMSubmitTask struct {
*exectsk.SubmitTask
}
func NewPCMScheduleTask(info *exectsk.ScheduleTask) *PCMScheduleTask {
return &PCMScheduleTask{
ScheduleTask: info,
func NewPCMSubmitTask(info *exectsk.SubmitTask) *PCMSubmitTask {
return &PCMSubmitTask{
SubmitTask: info,
}
}
func (t *PCMScheduleTask) Execute(task *task.Task[TaskContext], ctx TaskContext, complete CompleteFn) {
log := logger.WithType[PCMScheduleTask]("Task")
log.Debugf("begin")
func (t *PCMSubmitTask) Execute(task *task.Task[TaskContext], ctx TaskContext, complete CompleteFn) {
log := logger.WithType[PCMSubmitTask]("Task")
log.Debugf("begin with %v", logger.FormatStruct(t.SubmitTask))
defer log.Debugf("end")
err := t.do(task.ID(), ctx)
if err != nil {
//TODO 若任务失败上报的状态failed字段根据情况修改
ctx.reporter.Report(task.ID(), exectsk.NewScheduleTaskStatus("failed", err.Error()))
ctx.reporter.Report(task.ID(), exectsk.NewSubmitTaskStatus("failed", err.Error()))
}
ctx.reporter.ReportNow()
@ -39,7 +40,7 @@ func (t *PCMScheduleTask) Execute(task *task.Task[TaskContext], ctx TaskContext,
})
}
func (t *PCMScheduleTask) do(taskID string, ctx TaskContext) error {
func (t *PCMSubmitTask) do(taskID string, ctx TaskContext) error {
pcmCli, err := schglb.PCMPool.Acquire()
if err != nil {
return fmt.Errorf("new pcm client: %w", err)
@ -47,17 +48,19 @@ func (t *PCMScheduleTask) do(taskID string, ctx TaskContext) error {
defer schglb.PCMPool.Release(pcmCli)
resp, err := pcmCli.SubmitTask(pcmsdk.SubmitTaskReq{
PartID: t.SlwNodeID,
Envs: t.Envs,
ImageID: t.SlwNodeImageID,
CMD: t.CMDLine,
PartID: t.SlwNodeID,
ImageID: t.SlwNodeImageID,
ResourceID: t.SlwNodeResourceID,
CMD: t.CMD,
Envs: t.Envs,
Params: []schsdk.KVPair{},
})
if err != nil {
return err
}
var prevStatus string
var prevStatus pcmsdk.TaskStatus
for {
tsResp, err := pcmCli.GetTask(pcmsdk.GetTaskReq{
PartID: t.SlwNodeID,
@ -68,7 +71,7 @@ func (t *PCMScheduleTask) do(taskID string, ctx TaskContext) error {
}
if tsResp.TaskStatus != prevStatus {
ctx.reporter.Report(taskID, exectsk.NewScheduleTaskStatus(tsResp.TaskStatus, ""))
ctx.reporter.Report(taskID, exectsk.NewSubmitTaskStatus(tsResp.TaskStatus, ""))
}
prevStatus = tsResp.TaskStatus
@ -77,9 +80,14 @@ func (t *PCMScheduleTask) do(taskID string, ctx TaskContext) error {
if tsResp.TaskStatus == pcmsdk.TaskStatusSuccess {
return nil
}
if tsResp.TaskStatus == pcmsdk.TaskStatuFailed {
// TODO 返回更详细的信息
return fmt.Errorf("task failed")
}
}
}
func init() {
Register(NewPCMScheduleTask)
Register(NewPCMSubmitTask)
}

View File

@ -23,7 +23,7 @@ func NewStorageLoadPackage(info *exectsk.StorageLoadPackage) *StorageLoadPackage
func (t *StorageLoadPackage) Execute(task *task.Task[TaskContext], ctx TaskContext, complete CompleteFn) {
log := logger.WithType[StorageLoadPackage]("Task")
log.Debugf("begin with %w", logger.FormatStruct(t))
log.Debugf("begin with %w", logger.FormatStruct(t.StorageLoadPackage))
defer log.Debugf("end")
fullPath, err := t.do(ctx)

View File

@ -6,6 +6,7 @@ import (
schsdk "gitlink.org.cn/cloudream/common/sdks/scheduler"
)
// 不是所关注的Task上报的进度
var ErrUnconcernedTask = errors.New("unconcerned task")
var ErrTaskTimeout = errors.New("task timeout")

View File

@ -5,6 +5,8 @@ import (
"reflect"
"gitlink.org.cn/cloudream/common/pkgs/actor"
"gitlink.org.cn/cloudream/common/pkgs/logger"
pcmsdk "gitlink.org.cn/cloudream/common/sdks/pcm"
schsdk "gitlink.org.cn/cloudream/common/sdks/scheduler"
schglb "gitlink.org.cn/cloudream/scheduler/common/globals"
@ -78,11 +80,13 @@ func (h *ExecutingHandler) onNormalJobEvent(evt event.Event, job *executingJob,
// TODO 需要添加DATA_IN、DATA_OUT等环境变量这些数据从Job的信息中来获取
fullTaskID, err := h.mgr.execMgr.StartTask(job.job.GetJobID(),
exetsk.NewScheduleTask(
exetsk.NewSubmitTask(
norJob.TargetSlwNodeID,
norJob.Info.Runtime.Envs,
info.SlwNodeImageID,
// TODO 资源ID
"6388d3c27f654fa5b11439a3d6098dbc",
norJob.Info.Runtime.Command,
norJob.Info.Runtime.Envs,
))
if err != nil {
h.changeJobState(job.job, jobmod.NewStateFailed(err.Error(), job.state))
@ -92,20 +96,24 @@ func (h *ExecutingHandler) onNormalJobEvent(evt event.Event, job *executingJob,
job.state.FullTaskID = fullTaskID
}
if execRet, err := event.AssertExecutorTaskStatus[*exetsk.ScheduleTaskStatus](evt, job.state.FullTaskID); err != event.ErrUnconcernedTask {
if execRet, err := event.AssertExecutorTaskStatus[*exetsk.SubmitTaskStatus](evt, job.state.FullTaskID); err != event.ErrUnconcernedTask {
if err == event.ErrTaskTimeout {
h.changeJobState(job.job, jobmod.NewStateFailed("schedule task timeout", job.state))
return
}
h.mgr.execMgr.ForgetTask(job.state.FullTaskID)
logger.WithField("JobID", job.job.GetJobID()).
WithField("State", reflect.TypeOf(job.state).String()).
Infof("pcm task state change to: %s", execRet.Status)
if execRet.Error != "" {
if execRet.Status == pcmsdk.TaskStatusSuccess {
h.mgr.execMgr.ForgetTask(job.state.FullTaskID)
h.changeJobState(job.job, jobmod.NewStateSuccess())
} else if execRet.Status == pcmsdk.TaskStatuFailed {
h.mgr.execMgr.ForgetTask(job.state.FullTaskID)
h.changeJobState(job.job, jobmod.NewStateFailed(execRet.Error, job.state))
return
}
h.changeJobState(job.job, jobmod.NewStateSuccess())
}
}

View File

@ -260,7 +260,7 @@ func (h *PreSchedulingHandler) doImageScheduling(evt event.Event, job *preSchedu
state.Step = jobmod.StepUploading
case *schsdk.ImageJobFileInfo:
imageInfo, err := h.mgr.imageMgr.GetImageInfo(file.ImageID)
imageInfo, err := h.mgr.imageMgr.GetImageInfo(info.ImageID)
if err != nil {
return fmt.Errorf("getting image info: %w", err)
}