优化数据集路径、输出路径的产生逻辑

This commit is contained in:
Sydonian 2023-10-08 16:46:19 +08:00
parent 50b0bbef62
commit 85e7a58512
11 changed files with 69 additions and 37 deletions

View File

@ -11,5 +11,8 @@
"password": "123456",
"vhost": "/"
},
"cloudreamStorage": {
"url": "http://localhost:7890"
},
"reportTimeoutSecs": 20
}

View File

@ -10,6 +10,7 @@ type NormalJob struct {
Info schsdk.NormalJobInfo `json:"info"` // 提交任务时提供的任务描述信息
Files JobFiles `json:"files"` // 任务需要的文件
TargetSlwNodeID uopsdk.SlwNodeID `json:"targetSlwNodeID"` // 将要运行此任务的算力中心ID
OutputFullPath string `json:"outputFullPath"` // 程序结果的完整输出路径
}
func NewNormalJob(jobSetID schsdk.JobSetID, jobID schsdk.JobID, info schsdk.NormalJobInfo) *NormalJob {
@ -35,7 +36,8 @@ type JobFiles struct {
}
type PackageJobFile struct {
PackageID int64 `json:"packageID"`
PackageID int64 `json:"packageID"`
FullPath string `json:"fullPath"` // Load之后的完整文件路径
}
type ImageJobFile struct {

View File

@ -10,8 +10,8 @@ type StorageLoadPackage struct {
}
type StorageLoadPackageStatus struct {
TaskStatusBase
Status string `json:"status"`
Error string `json:"error"`
Error string `json:"error"`
FullPath string `json:"fullPath"`
}
func NewStorageLoadPackage(userID int64, packageID int64, storageID int64) *StorageLoadPackage {
@ -21,9 +21,9 @@ func NewStorageLoadPackage(userID int64, packageID int64, storageID int64) *Stor
StorageID: storageID,
}
}
func NewStorageLoadPackageStatus(status string, err string) *StorageLoadPackageStatus {
func NewStorageLoadPackageStatus(err string, fullPath string) *StorageLoadPackageStatus {
return &StorageLoadPackageStatus{
Status: status,
Error: err,
Error: err,
FullPath: fullPath,
}
}

View File

@ -1,22 +0,0 @@
package utils
import (
"fmt"
"path/filepath"
"time"
schsdk "gitlink.org.cn/cloudream/common/sdks/scheduler"
)
func MakeJobDataInPath(jobID schsdk.JobID, dataPackageID string) string {
// TODO 需要与存储服务Load后的地址相同
return filepath.Join(string(jobID), dataPackageID)
}
func MakeJobDataOutPath(jobID schsdk.JobID) string {
return filepath.Join(string(jobID), "output")
}
func MakeResourcePackageName(jobID schsdk.JobID) string {
return fmt.Sprintf("%s@%s", string(jobID), time.Now().Format("2006-01-02 15:04:05"))
}

18
common/utils/utils.go Normal file
View File

@ -0,0 +1,18 @@
package utils
import (
"fmt"
"path/filepath"
"strconv"
"time"
schsdk "gitlink.org.cn/cloudream/common/sdks/scheduler"
)
func MakeJobOutputFullPath(stgDir string, userID int64, jobID schsdk.JobID) string {
return filepath.Join(stgDir, strconv.FormatInt(userID, 10), "jobs", string(jobID), "output")
}
func MakeResourcePackageName(jobID schsdk.JobID) string {
return fmt.Sprintf("%s@%s", string(jobID), time.Now().Format("2006-01-02 15:04:05"))
}

View File

@ -26,12 +26,12 @@ func (t *StorageLoadPackage) Execute(task *task.Task[TaskContext], ctx TaskConte
log.Debugf("begin with %w", logger.FormatStruct(t))
defer log.Debugf("end")
err := t.do(ctx)
fullPath, err := t.do(ctx)
if err != nil {
//TODO 若任务失败上报的状态failed字段根据情况修改
ctx.reporter.Report(task.ID(), exectsk.NewStorageLoadPackageStatus("failed", err.Error()))
ctx.reporter.Report(task.ID(), exectsk.NewStorageLoadPackageStatus(err.Error(), ""))
} else {
ctx.reporter.Report(task.ID(), exectsk.NewStorageLoadPackageStatus("completed", ""))
ctx.reporter.Report(task.ID(), exectsk.NewStorageLoadPackageStatus("", fullPath))
}
ctx.reporter.ReportNow()
@ -40,18 +40,23 @@ func (t *StorageLoadPackage) Execute(task *task.Task[TaskContext], ctx TaskConte
})
}
func (t *StorageLoadPackage) do(ctx TaskContext) error {
func (t *StorageLoadPackage) do(ctx TaskContext) (string, error) {
stgCli, err := schglb.CloudreamStoragePool.Acquire()
if err != nil {
return fmt.Errorf("new cloudream storage client: %w", err)
return "", fmt.Errorf("new cloudream storage client: %w", err)
}
defer schglb.CloudreamStoragePool.Release(stgCli)
return stgCli.StorageLoadPackage(stgsdk.StorageLoadPackageReq{
resp, err := stgCli.StorageLoadPackage(stgsdk.StorageLoadPackageReq{
UserID: t.UserID,
PackageID: t.PackageID,
StorageID: t.StorageID,
})
if err != nil {
return "", err
}
return resp.FullPath, nil
}
func init() {

View File

@ -2,6 +2,7 @@ package config
import (
"gitlink.org.cn/cloudream/common/pkgs/logger"
stgsdk "gitlink.org.cn/cloudream/common/sdks/storage"
"gitlink.org.cn/cloudream/common/utils/config"
scmq "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq"
)
@ -9,6 +10,7 @@ import (
type Config struct {
Logger logger.Config `json:"logger"`
RabbitMQ scmq.Config `json:"rabbitMQ"`
CloudreamStorage stgsdk.Config `json:"cloudreamStorage"`
ReportTimeoutSecs int `json:"reportTimeoutSecs"`
}

View File

@ -13,6 +13,7 @@ import (
jobmod "gitlink.org.cn/cloudream/scheduler/common/models/job"
colmq "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq/collector"
exectsk "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq/executor/task"
"gitlink.org.cn/cloudream/scheduler/common/utils"
"gitlink.org.cn/cloudream/scheduler/manager/internal/jobmgr/event"
)
@ -65,7 +66,25 @@ func (h *AdjustingHandler) Handle(job jobmod.Job) {
return
}
stgCli, err := schglb.CloudreamStoragePool.Acquire()
if err != nil {
h.changeJobState(job, jobmod.NewStateFailed(fmt.Sprintf("new cloudream storage client: %s", err), job.GetState()))
return
}
defer schglb.CloudreamStoragePool.Release(stgCli)
stgInfo, err := stgCli.StorageGetInfo(stgsdk.StorageGetInfoReq{
StorageID: getNodeResp.StorageID,
})
if err != nil {
h.changeJobState(job, jobmod.NewStateFailed(fmt.Sprintf("getting cloudream storage info: %s", err), job.GetState()))
return
}
norJob.TargetSlwNodeID = adjustingState.Scheme.TargetSlwNodeID
// TODO UserID
norJob.OutputFullPath = utils.MakeJobOutputFullPath(stgInfo.Directory, 0, norJob.JobID)
adjJob := &adjustingJob{
job: norJob,
state: adjustingState,
@ -205,6 +224,8 @@ func (h *AdjustingHandler) doPackageScheduling(evt event.Event, job *adjustingJo
return fmt.Errorf("storage load package: %s", loadRet.Error)
}
file.FullPath = loadRet.FullPath
state.Step = jobmod.StepCompleted
return nil
}

View File

@ -11,7 +11,7 @@ import (
jobmod "gitlink.org.cn/cloudream/scheduler/common/models/job"
colmq "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq/collector"
exetsk "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq/executor/task"
"gitlink.org.cn/cloudream/scheduler/common/pkgs/utils"
"gitlink.org.cn/cloudream/scheduler/common/utils"
"gitlink.org.cn/cloudream/scheduler/manager/internal/jobmgr/event"
)
@ -75,7 +75,7 @@ func (h *ExecutingHandler) onNormalJobEvent(evt event.Event, job *executingJob,
return
}
// TODO 需要添加DATA_IN、DATA_OUT等环境变量
// TODO 需要添加DATA_IN、DATA_OUT等环境变量这些数据从Job的信息中来获取
fullTaskID, err := h.mgr.execMgr.StartTask(job.job.GetJobID(),
exetsk.NewScheduleTask(
@ -161,7 +161,7 @@ func (h *ExecutingHandler) onResourceJobEvent(evt event.Event, job *executingJob
fullTaskID, err := h.mgr.execMgr.StartTask(job.job.GetJobID(), exetsk.NewStorageCreatePackage(
0, // TOOD 用户ID
getNodeResp.StorageID,
utils.MakeJobDataOutPath(tarNorJob.JobID),
tarNorJob.OutputFullPath,
resJob.Info.BucketID,
utils.MakeResourcePackageName(resJob.JobID),
resJob.Info.Redundancy,

View File

@ -243,6 +243,8 @@ func (h *PreSchedulingHandler) doPackageScheduling(evt event.Event, job *preSche
return fmt.Errorf("storage load package: %s", loadRet.Error)
}
file.FullPath = loadRet.FullPath
state.Step = jobmod.StepCompleted
return nil
}

View File

@ -32,6 +32,7 @@ func main() {
}
schglb.InitMQPool(&config.Cfg().RabbitMQ)
schglb.InitCloudreamStoragePool(&config.Cfg().CloudreamStorage)
exeMgr, err := executormgr.NewManager(time.Duration(config.Cfg().ReportTimeoutSecs) * time.Second)
if err != nil {