完善任务的输入输出路径

This commit is contained in:
Sydonian 2024-06-05 15:43:17 +08:00
parent 71e4054d5e
commit fe5ee4f9f4
16 changed files with 140 additions and 57 deletions

View File

@ -10,7 +10,8 @@ create table ComputingCenter (
PCMParticipantID bigint not null comment '计算中心在PCM系统的ID',
CDSNodeID bigint null comment '计算中心在存储系统的ID',
CDSStorageID bigint null comment '此算力中心的存储服务对应在存储系统中的ID',
Name varchar(100) not null comment '计算中心名称'
Name varchar(100) not null comment '计算中心名称',
Bootstrap JSON comment '任务启动方式'
) comment = '计算中心';
create table Image (

View File

@ -39,8 +39,8 @@ type JobFiles struct {
}
type PackageJobFile struct {
PackageID cdssdk.PackageID `json:"packageID"`
FullPath string `json:"fullPath"` // Load之后的完整文件路径
PackageID cdssdk.PackageID `json:"packageID"`
PackagePath string `json:"packagePath"` // Load之后的文件路径一个相对路径需要加上CDS数据库中的RemoteBase才是完整路径
}
type ImageJobFile struct {

View File

@ -29,6 +29,8 @@ type ComputingCenter struct {
CDSStorageID cdssdk.StorageID `json:"cdsStorageID" db:"CDSStorageID"`
// 计算中心名称
Name string `json:"name" db:"Name"`
// 任务启动方式
Bootstrap schsdk.Bootstrap `json:"bootstrap" db:"Bootstrap"`
}
type Image struct {

View File

@ -15,13 +15,19 @@ func (db *DB) ComputingCenter() *ComputingCenterDB {
}
func (*ComputingCenterDB) GetByID(ctx SQLContext, id schsdk.CCID) (schmod.ComputingCenter, error) {
var ret schmod.ComputingCenter
var ret TempComputingCenter
err := sqlx.Get(ctx, &ret, "select * from ComputingCenter where CCID = ?", id)
return ret, err
return ret.ToComputingCenter(), err
}
func (*ComputingCenterDB) GetAll(ctx SQLContext) ([]schmod.ComputingCenter, error) {
var tmp []TempComputingCenter
err := sqlx.Select(ctx, &tmp, "select * from ComputingCenter")
var ret []schmod.ComputingCenter
err := sqlx.Select(ctx, &ret, "select * from ComputingCenter")
for _, t := range tmp {
ret = append(ret, t.ToComputingCenter())
}
return ret, err
}

40
common/pkgs/db/model.go Normal file
View File

@ -0,0 +1,40 @@
package db
import (
"fmt"
"reflect"
schsdk "gitlink.org.cn/cloudream/common/sdks/scheduler"
"gitlink.org.cn/cloudream/common/utils/serder"
schmod "gitlink.org.cn/cloudream/scheduler/common/models"
)
type TempComputingCenter struct {
schmod.ComputingCenter
Bootstrap BootstrapWarpper `db:"Bootstrap"`
}
func (c *TempComputingCenter) ToComputingCenter() schmod.ComputingCenter {
cc := c.ComputingCenter
cc.Bootstrap = c.Bootstrap.Value
return cc
}
type BootstrapWarpper struct {
Value schsdk.Bootstrap
}
func (o *BootstrapWarpper) Scan(src interface{}) error {
data, ok := src.([]uint8)
if !ok {
return fmt.Errorf("unknow src type: %v", reflect.TypeOf(data))
}
boot, err := serder.JSONToObjectEx[schsdk.Bootstrap](data)
if err != nil {
return err
}
o.Value = boot
return nil
}

View File

@ -10,8 +10,8 @@ type StorageLoadPackage struct {
}
type StorageLoadPackageStatus struct {
TaskStatusBase
Error string `json:"error"`
FullPath string `json:"fullPath"`
Error string `json:"error"`
PackagePath string `json:"packagePath"`
}
func NewStorageLoadPackage(userID cdssdk.UserID, packageID cdssdk.PackageID, storageID cdssdk.StorageID) *StorageLoadPackage {
@ -21,10 +21,10 @@ func NewStorageLoadPackage(userID cdssdk.UserID, packageID cdssdk.PackageID, sto
StorageID: storageID,
}
}
func NewStorageLoadPackageStatus(err string, fullPath string) *StorageLoadPackageStatus {
func NewStorageLoadPackageStatus(err string, packagePath string) *StorageLoadPackageStatus {
return &StorageLoadPackageStatus{
Error: err,
FullPath: fullPath,
Error: err,
PackagePath: packagePath,
}
}

View File

@ -12,6 +12,7 @@ type SubmitTask struct {
PCMResourceID pcmsdk.ResourceID `json:"pcmResourceID"`
CMD string `json:"cmd"`
Envs []schsdk.KVPair `json:"envs"`
Params []schsdk.KVPair `json:"params"`
}
type SubmitTaskStatus struct {
TaskStatusBase
@ -19,13 +20,14 @@ type SubmitTaskStatus struct {
Error string `json:"error"`
}
func NewSubmitTask(pcmParticipantID pcmsdk.ParticipantID, pcmImageID pcmsdk.ImageID, pcmResourceID pcmsdk.ResourceID, cmd string, envs []schsdk.KVPair) *SubmitTask {
func NewSubmitTask(pcmParticipantID pcmsdk.ParticipantID, pcmImageID pcmsdk.ImageID, pcmResourceID pcmsdk.ResourceID, cmd string, envs []schsdk.KVPair, params []schsdk.KVPair) *SubmitTask {
return &SubmitTask{
PCMParticipantID: pcmParticipantID,
PCMImageID: pcmImageID,
PCMResourceID: pcmResourceID,
CMD: cmd,
Envs: envs,
Params: params,
}
}

View File

@ -13,8 +13,8 @@ import (
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
)
func MakeJobOutputFullPath(stgDir string, userID cdssdk.UserID, jobID schsdk.JobID) string {
return filepath.Join(stgDir, strconv.FormatInt(int64(userID), 10), "jobs", string(jobID), "output")
func MakeJobOutputPath(userID cdssdk.UserID, jobID schsdk.JobID) string {
return filepath.Join("jobs", strconv.FormatInt(int64(userID), 10), string(jobID), "output")
}
func MakeResourcePackageName(jobID schsdk.JobID) string {

View File

@ -5,7 +5,6 @@ import (
"time"
pcmsdk "gitlink.org.cn/cloudream/common/sdks/pcm"
schsdk "gitlink.org.cn/cloudream/common/sdks/scheduler"
"gitlink.org.cn/cloudream/common/pkgs/logger"
"gitlink.org.cn/cloudream/common/pkgs/task"
@ -53,7 +52,7 @@ func (t *PCMSubmitTask) do(taskID string, ctx TaskContext) error {
ResourceID: t.PCMResourceID,
CMD: t.CMD,
Envs: t.Envs,
Params: []schsdk.KVPair{},
Params: t.Params,
})
if err != nil {

View File

@ -26,12 +26,12 @@ func (t *StorageLoadPackage) Execute(task *task.Task[TaskContext], ctx TaskConte
log.Debugf("begin with %w", logger.FormatStruct(t.StorageLoadPackage))
defer log.Debugf("end")
fullPath, err := t.do(ctx)
packagePath, err := t.do(ctx)
if err != nil {
//TODO 若任务失败上报的状态failed字段根据情况修改
ctx.reporter.Report(task.ID(), exectsk.NewStorageLoadPackageStatus(err.Error(), ""))
} else {
ctx.reporter.Report(task.ID(), exectsk.NewStorageLoadPackageStatus("", fullPath))
ctx.reporter.Report(task.ID(), exectsk.NewStorageLoadPackageStatus("", packagePath))
}
ctx.reporter.ReportNow()
@ -56,7 +56,7 @@ func (t *StorageLoadPackage) do(ctx TaskContext) (string, error) {
return "", err
}
return resp.FullPath, nil
return resp.PackagePath, nil
}
func init() {

View File

@ -7,10 +7,10 @@ import (
)
type DataReturnJob struct {
Info schsdk.DataReturnJobInfo
TargetJobCCID schsdk.CCID // 目标任务所在计算中心的ID
TargetJobOutputFullPath string // 目标任务的结果输出路径
DataReturnPackageID cdssdk.PackageID // 回源之后得到的PackageID
Info schsdk.DataReturnJobInfo
TargetJobCCID schsdk.CCID // 目标任务所在计算中心的ID
TargetJobOutputPath string // 目标任务的结果输出路径,相对路径
DataReturnPackageID cdssdk.PackageID // 回源之后得到的PackageID
}
func NewResourceJob(info schsdk.DataReturnJobInfo) *DataReturnJob {

View File

@ -6,10 +6,10 @@ import (
)
type InstanceJob struct {
Info schsdk.InstanceJobInfo // 提交任务时提供的任务描述信息
Files jobmod.JobFiles // 任务需要的文件
TargetCCID schsdk.CCID // 将要运行此任务的算力中心ID
OutputFullPath string // 程序结果的完整输出路径
Info schsdk.InstanceJobInfo // 提交任务时提供的任务描述信息
Files jobmod.JobFiles // 任务需要的文件
TargetCCID schsdk.CCID // 将要运行此任务的算力中心ID
OutputPath string // 程序结果输出路径一个相对路径需要加上CDS数据库中记录RemoteBase才是完整路径
}
func NewInstanceJob(info schsdk.InstanceJobInfo, files jobmod.JobFiles) *InstanceJob {

View File

@ -6,10 +6,10 @@ import (
)
type NormalJob struct {
Info schsdk.NormalJobInfo // 提交任务时提供的任务描述信息
Files jobmod.JobFiles // 任务需要的文件
TargetCCID schsdk.CCID // 将要运行此任务的算力中心ID
OutputFullPath string // 程序结果的完整输出路径
Info schsdk.NormalJobInfo // 提交任务时提供的任务描述信息
Files jobmod.JobFiles // 任务需要的文件
TargetCCID schsdk.CCID // 将要运行此任务的算力中心ID
OutputPath string // 程序结果输出路径一个相对路径需要加上CDS数据库中记录RemoteBase才是完整路径
}
func NewNormalJob(info schsdk.NormalJobInfo) *NormalJob {

View File

@ -66,22 +66,9 @@ func (s *Adjusting) do(rtx jobmgr.JobStateRunContext, jo *jobmgr.Job) error {
logger.WithField("JobID", jo.JobID).Infof("job is scheduled to %v(%v)", ccInfo.Name, ccInfo.CCID)
stgCli, err := schglb.CloudreamStoragePool.Acquire()
if err != nil {
return fmt.Errorf("new cds client: %w", err)
}
defer schglb.CloudreamStoragePool.Release(stgCli)
// 已经确定最终执行的目标计算中心,则可以生成结果输出路径了
stgInfo, err := stgCli.StorageGetInfo(cdssdk.StorageGetInfoReq{
UserID: userID,
StorageID: ccInfo.CDSStorageID,
})
if err != nil {
return fmt.Errorf("getting cds storage info: %w", err)
}
// TODO UserID
outputFullPath := utils.MakeJobOutputFullPath(stgInfo.Directory, 1, jo.JobID)
outputPath := utils.MakeJobOutputPath(userID, jo.JobID)
var jobFilesInfo schsdk.JobFilesInfo
var jobFiles *jobmod.JobFiles
@ -91,7 +78,7 @@ func (s *Adjusting) do(rtx jobmgr.JobStateRunContext, jo *jobmgr.Job) error {
jobFilesInfo = runningJob.Info.Files
jobFiles = &runningJob.Files
runningJob.TargetCCID = s.scheme.TargetCCID
runningJob.OutputFullPath = outputFullPath
runningJob.OutputPath = outputPath
case *job.MultiInstanceJob:
jobFilesInfo = runningJob.Info.Files
jobFiles = &runningJob.Files
@ -100,7 +87,7 @@ func (s *Adjusting) do(rtx jobmgr.JobStateRunContext, jo *jobmgr.Job) error {
jobFilesInfo = runningJob.Info.Files
jobFiles = &runningJob.Files
runningJob.TargetCCID = s.scheme.TargetCCID
runningJob.OutputFullPath = outputFullPath
runningJob.OutputPath = outputPath
}
wg := sync.WaitGroup{}
@ -164,11 +151,13 @@ func (s *Adjusting) doPackageScheduling(ctx context.Context, rtx jobmgr.JobState
return fmt.Errorf("moving package: %w", err)
}
moveStatus := status.(*exectsk.StorageLoadPackageStatus)
if moveStatus.Error != "" {
return fmt.Errorf("moving package: %s", moveStatus.Error)
loadStatus := status.(*exectsk.StorageLoadPackageStatus)
if loadStatus.Error != "" {
return fmt.Errorf("moving package: %s", loadStatus.Error)
}
file.PackagePath = loadStatus.PackagePath
return nil
}

View File

@ -3,11 +3,15 @@ package state
import (
"context"
"fmt"
"path/filepath"
"github.com/samber/lo"
schsdk "gitlink.org.cn/cloudream/common/sdks/scheduler"
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
"gitlink.org.cn/cloudream/common/pkgs/logger"
pcmsdk "gitlink.org.cn/cloudream/common/sdks/pcm"
schglb "gitlink.org.cn/cloudream/scheduler/common/globals"
jobmod "gitlink.org.cn/cloudream/scheduler/common/models/job"
exetsk "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq/executor/task"
"gitlink.org.cn/cloudream/scheduler/common/utils"
@ -42,21 +46,25 @@ func (s *NormalJobExecuting) Dump(rtx jobmgr.JobStateRunContext, jo *jobmgr.Job)
}
func (s *NormalJobExecuting) do(rtx jobmgr.JobStateRunContext, jo *jobmgr.Job) error {
//norJob := jo.Body.(*job.NormalJob)
// TODO UserID
userID := cdssdk.UserID(1)
var runtime *schsdk.JobRuntimeInfo
var jobFiles *jobmod.JobFiles
var targetCCID schsdk.CCID
var outputPath string
switch runningJob := jo.Body.(type) {
case *job.NormalJob:
runtime = &runningJob.Info.Runtime
jobFiles = &runningJob.Files
targetCCID = runningJob.TargetCCID
outputPath = runningJob.OutputPath
case *job.InstanceJob:
runtime = &runningJob.Info.Runtime
jobFiles = &runningJob.Files
targetCCID = runningJob.TargetCCID
outputPath = runningJob.OutputPath
}
log := logger.WithType[NormalJobExecuting]("State").WithField("JobID", jo.JobID)
@ -74,7 +82,6 @@ func (s *NormalJobExecuting) do(rtx jobmgr.JobStateRunContext, jo *jobmgr.Job) e
return fmt.Errorf("getting computing center info: %w", err)
}
// TODO 需要添加DATA_IN、DATA_OUT等环境变量这些数据从Job的信息中来获取
ress, err := rtx.Mgr.DB.CCResource().GetByCCID(rtx.Mgr.DB.SQLCtx(), targetCCID)
if err != nil {
return fmt.Errorf("getting computing center resource: %w", err)
@ -83,13 +90,49 @@ func (s *NormalJobExecuting) do(rtx jobmgr.JobStateRunContext, jo *jobmgr.Job) e
return fmt.Errorf("no resource found at computing center %v", targetCCID)
}
stgCli, err := schglb.CloudreamStoragePool.Acquire()
if err != nil {
return fmt.Errorf("new cds client: %w", err)
}
defer schglb.CloudreamStoragePool.Release(stgCli)
getStg, err := stgCli.StorageGet(cdssdk.StorageGet{
UserID: userID,
})
if err != nil {
return fmt.Errorf("request to cds: %w", err)
}
// 判断算力中心是否支持环境变量配置如果不支持则读取脚本内容并拼接在Command参数后面
var envs []schsdk.KVPair
var params []string
var cmd string
envs = append(envs, schsdk.KVPair{Key: schsdk.JobDataInEnv, Value: filepath.Join(getStg.RemoteBase, jobFiles.Dataset.PackagePath)})
envs = append(envs, schsdk.KVPair{Key: schsdk.JobDataOutEnv, Value: filepath.Join(getStg.RemoteBase, outputPath)})
envs = append(envs, runtime.Envs...)
switch boot := ccInfo.Bootstrap.(type) {
case *schsdk.DirectBootstrap:
cmd = runtime.Command
case *schsdk.NoEnvBootstrap:
cmd = boot.ScriptFileName
params = append(params, runtime.Command)
envMap := lo.Map(envs, func(env schsdk.KVPair, _ int) string {
return fmt.Sprintf("%s=%s", env.Key, env.Value)
})
params = append(params, envMap...)
default:
cmd = runtime.Command
}
wt := rtx.Mgr.ExecMgr.StartTask(exetsk.NewSubmitTask(
ccInfo.PCMParticipantID,
pcmImgInfo.PCMImageID,
// TODO 选择资源的算法
ress[0].PCMResourceID,
runtime.Command,
runtime.Envs,
cmd,
envs,
// params, TODO params不应该是kv数组而应该是字符串数组
[]schsdk.KVPair{},
))
defer wt.Close()
@ -140,6 +183,7 @@ func (s *DataReturnJobExecuting) Dump(rtx jobmgr.JobStateRunContext, jo *jobmgr.
func (s *DataReturnJobExecuting) do(rtx jobmgr.JobStateRunContext, jo *jobmgr.Job) error {
reJob := jo.Body.(*job.DataReturnJob)
userID := cdssdk.UserID(1)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
@ -158,9 +202,9 @@ func (s *DataReturnJobExecuting) do(rtx jobmgr.JobStateRunContext, jo *jobmgr.Jo
logger.Infof("the outputs of job %v(local) will be scheduled to computing center %v(%v)", reJob.Info.TargetLocalJobID, ccInfo.Name, ccInfo.CCID)
wt := rtx.Mgr.ExecMgr.StartTask(exetsk.NewStorageCreatePackage(
1, // TOOD 用户ID
userID, // TOOD 用户ID
ccInfo.CDSStorageID,
reJob.TargetJobOutputFullPath,
reJob.TargetJobOutputPath,
reJob.Info.BucketID,
utils.MakeResourcePackageName(jo.JobID),
))

View File

@ -53,7 +53,7 @@ func (s *WaitTargetComplete) do(rtx jobmgr.JobStateRunContext, jo *jobmgr.Job) e
}
reJob.TargetJobCCID = norJob.TargetCCID
reJob.TargetJobOutputFullPath = norJob.OutputFullPath
reJob.TargetJobOutputPath = norJob.OutputPath
return nil
}