JCC-CSScheduler/manager/internal/jobmgr/job/state/adjusting.go

293 lines
8.2 KiB
Go

package state
import (
"context"
"errors"
"fmt"
"sync"
"time"
schsdk "gitlink.org.cn/cloudream/common/sdks/scheduler"
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
schglb "gitlink.org.cn/cloudream/scheduler/common/globals"
schmod "gitlink.org.cn/cloudream/scheduler/common/models"
jobmod "gitlink.org.cn/cloudream/scheduler/common/models/job"
exectsk "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq/executor/task"
"gitlink.org.cn/cloudream/scheduler/common/utils"
"gitlink.org.cn/cloudream/scheduler/manager/internal/jobmgr"
"gitlink.org.cn/cloudream/scheduler/manager/internal/jobmgr/event"
"gitlink.org.cn/cloudream/scheduler/manager/internal/jobmgr/job"
)
type Adjusting struct {
scheme jobmod.JobScheduleScheme
targetCCInfo schmod.ComputingCenter
}
func NewAdjusting(scheme jobmod.JobScheduleScheme) *Adjusting {
return &Adjusting{
scheme: scheme,
}
}
func (s *Adjusting) Run(rtx jobmgr.JobStateRunContext, jo *jobmgr.Job) {
err := s.do(rtx, jo)
if err != nil {
rtx.Mgr.ChangeState(jo, FailureComplete(err))
} else {
rtx.Mgr.ChangeState(jo, NewNormalJobReadyToExecute())
}
}
func (s *Adjusting) Dump(rtx jobmgr.JobStateRunContext, jo *jobmgr.Job) jobmod.JobStateDump {
return &jobmod.AdjustingDump{
Scheme: s.scheme,
}
}
func (s *Adjusting) do(rtx jobmgr.JobStateRunContext, jo *jobmgr.Job) error {
//norJob := jo.Body.(*job.NormalJob)
var jobFilesInfo schsdk.JobFilesInfo
var jobFiles *jobmod.JobFiles
switch runningJob := jo.Body.(type) {
case *job.NormalJob:
jobFilesInfo = runningJob.Info.Files
jobFiles = &runningJob.Files
runningJob.TargetCCID = s.scheme.TargetCCID
case *job.MultiInstanceJob:
jobFilesInfo = runningJob.Info.Files
jobFiles = &runningJob.Files
runningJob.TargetCCID = s.scheme.TargetCCID
case *job.InstanceJob:
jobFilesInfo = runningJob.Info.Files
jobFiles = &runningJob.Files
runningJob.TargetCCID = s.scheme.TargetCCID
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// 监听取消事件
go func() {
event.WaitType[*event.Cancel](ctx, rtx.EventSet)
cancel()
}()
ccInfo, err := rtx.Mgr.DB.ComputingCenter().GetByID(rtx.Mgr.DB.SQLCtx(), s.scheme.TargetCCID)
if err != nil {
return fmt.Errorf("getting computing center info: %w", err)
}
s.targetCCInfo = ccInfo
stgCli, err := schglb.CloudreamStoragePool.Acquire()
if err != nil {
return fmt.Errorf("new cds client: %w", err)
}
defer schglb.CloudreamStoragePool.Release(stgCli)
// 已经确定最终执行的目标计算中心,则可以生成结果输出路径了
stgInfo, err := stgCli.StorageGetInfo(cdssdk.StorageGetInfoReq{
StorageID: ccInfo.CDSStorageID,
})
if err != nil {
return fmt.Errorf("getting cds storage info: %w", err)
}
// TODO UserID
//norJob.OutputFullPath = utils.MakeJobOutputFullPath(stgInfo.Directory, 1, jo.JobID)
utils.MakeJobOutputFullPath(stgInfo.Directory, 1, jo.JobID)
wg := sync.WaitGroup{}
wg.Add(3)
var e1, e2, e3 error
go func() {
defer wg.Done()
e1 = s.doPackageScheduling(ctx, rtx, jobFilesInfo.Dataset, &jobFiles.Dataset, &s.scheme.Dataset)
if e1 != nil {
cancel()
}
}()
go func() {
defer wg.Done()
e2 = s.doPackageScheduling(ctx, rtx, jobFilesInfo.Code, &jobFiles.Code, &s.scheme.Code)
if e2 != nil {
cancel()
}
}()
go func() {
defer wg.Done()
e3 = s.doImageScheduling(ctx, rtx, s.scheme.TargetCCID, jobFilesInfo.Image, &jobFiles.Image, &s.scheme.Image)
if e3 != nil {
cancel()
}
}()
wg.Wait()
return errors.Join(e1, e2, e3)
}
func (s *Adjusting) doPackageScheduling(ctx context.Context, rtx jobmgr.JobStateRunContext, fileInfo schsdk.JobFileInfo, file *jobmod.PackageJobFile, scheme *jobmod.FileScheduleScheme) error {
switch info := fileInfo.(type) {
case *schsdk.LocalJobFileInfo:
evt, ok := event.WaitTypeAnd[*event.LocalFileUploaded](ctx, rtx.EventSet, func(e *event.LocalFileUploaded) bool {
return e.LocalPath == info.LocalPath
})
if !ok {
return fmt.Errorf("local file %s not uploaded", info.LocalPath)
}
if evt.Error != nil {
return evt.Error
}
file.PackageID = evt.PackageID
case *schsdk.PackageJobFileInfo:
file.PackageID = info.PackageID
case *schsdk.DataReturnJobFileInfo:
return nil
default:
return fmt.Errorf("unknown dataset type: %T", info)
}
if scheme.Action == jobmod.ActionMove {
wt := rtx.Mgr.ExecMgr.StartTask(exectsk.NewCacheMovePackage(1, file.PackageID, s.targetCCInfo.CDSNodeID))
defer wt.Close()
status, err := wt.Receive(ctx)
if err != nil {
return fmt.Errorf("moving package: %w", err)
}
moveStatus := status.(*exectsk.CacheMovePackageStatus)
if moveStatus.Error != "" {
return fmt.Errorf("moving package: %s", moveStatus.Error)
}
return nil
}
if scheme.Action == jobmod.ActionLoad {
wt := rtx.Mgr.ExecMgr.StartTask(exectsk.NewStorageLoadPackage(1, file.PackageID, s.targetCCInfo.CDSStorageID))
defer wt.Close()
status, err := wt.Receive(ctx)
if err != nil {
return fmt.Errorf("moving package: %w", err)
}
moveStatus := status.(*exectsk.CacheMovePackageStatus)
if moveStatus.Error != "" {
return fmt.Errorf("moving package: %s", moveStatus.Error)
}
return nil
}
return nil
}
func (s *Adjusting) doImageScheduling(ctx context.Context, rtx jobmgr.JobStateRunContext, targetCCID schsdk.CCID, fileInfo schsdk.JobFileInfo, file *jobmod.ImageJobFile, scheme *jobmod.FileScheduleScheme) error {
switch info := fileInfo.(type) {
case *schsdk.LocalJobFileInfo:
evt, ok := event.WaitTypeAnd[*event.LocalFileUploaded](ctx, rtx.EventSet, func(e *event.LocalFileUploaded) bool {
return e.LocalPath == info.LocalPath
})
if !ok {
return fmt.Errorf("local file %s not uploaded", info.LocalPath)
}
if evt.Error != nil {
return evt.Error
}
// 上传完毕,则可以新建一个空的镜像的记录
// TODO 镜像名称
imgID, err := rtx.Mgr.DB.Image().Create(rtx.Mgr.DB.SQLCtx(), &evt.PackageID, fmt.Sprintf("UPLOAD@%s", time.Now().Unix()), time.Now())
if err != nil {
return fmt.Errorf("creating image info: %w", err)
}
// 填充ImageID和PackageID
file.ImageID = imgID
file.PackageID = &evt.PackageID
case *schsdk.ImageJobFileInfo:
imageInfo, err := rtx.Mgr.DB.Image().GetByID(rtx.Mgr.DB.SQLCtx(), info.ImageID)
if err != nil {
return fmt.Errorf("getting image info: %w", err)
}
file.ImageID = imageInfo.ImageID
file.PackageID = imageInfo.CDSPackageID
}
if scheme.Action == jobmod.ActionImportImage {
if file.PackageID == nil {
return fmt.Errorf("image %v has no associated package, which cannot be uploaded to %v", file.ImageID, s.targetCCInfo.CCID)
}
// TODO UserID
wt := rtx.Mgr.ExecMgr.StartTask(exectsk.NewCacheMovePackage(1, *file.PackageID, s.targetCCInfo.CDSNodeID))
defer wt.Close()
status, err := wt.Receive(ctx)
if err != nil {
return fmt.Errorf("moving package: %w", err)
}
moveStatus := status.(*exectsk.CacheMovePackageStatus)
if moveStatus.Error != "" {
return fmt.Errorf("moving package: %s", moveStatus.Error)
}
stgCli, err := schglb.CloudreamStoragePool.Acquire()
if err != nil {
return fmt.Errorf("new cloudream storage client: %w", err)
}
defer schglb.CloudreamStoragePool.Release(stgCli)
// TODO UserID
pkgObjs, err := stgCli.Object().GetPackageObjects(cdssdk.ObjectGetPackageObjects{UserID: 1, PackageID: *file.PackageID})
if err != nil {
return fmt.Errorf("getting package objects: %w", err)
}
if len(pkgObjs.Objects) == 0 {
return fmt.Errorf("no object in the package which will be imported")
}
if len(pkgObjs.Objects) > 1 {
return fmt.Errorf("there must be only 1 object in the package which will be imported")
}
wt2 := rtx.Mgr.ExecMgr.StartTask(exectsk.NewUploadImage(s.targetCCInfo.PCMParticipantID, cdssdk.MakeIPFSFilePath(pkgObjs.Objects[0].FileHash)))
defer wt2.Close()
status2, err := wt2.Receive(ctx)
if err != nil {
return fmt.Errorf("uploading image: %w", err)
}
uploadStatus := status2.(*exectsk.UploadImageStatus)
if uploadStatus.Error != "" {
return fmt.Errorf("uploading image: %s", uploadStatus.Error)
}
// TODO 镜像名称
err = rtx.Mgr.DB.PCMImage().Create(rtx.Mgr.DB.SQLCtx(), file.ImageID, targetCCID, uploadStatus.PCMImageID, uploadStatus.Name, time.Now())
if err != nil {
return fmt.Errorf("creating image info: %w", err)
}
return nil
}
return nil
}