diff --git a/common/globals/pools.go b/common/globals/pools.go index 5597fd9..b350c50 100644 --- a/common/globals/pools.go +++ b/common/globals/pools.go @@ -55,3 +55,9 @@ func InitRcloneConfig(rcloneID string, rcloneConfigID string) { CDSRcloneConfigID: rcloneConfigID, } } + +var InferencePlatform schsdk.InferencePlatform + +func InitInferencePlatform(cfg schsdk.InferencePlatform) { + InferencePlatform = cfg +} diff --git a/common/models/models.go b/common/models/models.go index d92db10..0de6419 100644 --- a/common/models/models.go +++ b/common/models/models.go @@ -87,15 +87,14 @@ type Models struct { } type ModelResource struct { - ModelID int64 `json:"modelID" db:"modelID"` - CDSStorageID cdssdk.StorageID `json:"CDSStorageID" db:"CDSStorageID"` - OjbStgID int `json:"OjbStgID" db:"OjbStgID"` - ModelPath string `json:"modelPath" db:"modelPath"` - StartShellPath string `json:"startShellPath" db:"startShellPath"` - ServerPort int `json:"serverPort" db:"serverPort"` - ServerUrlPath string `json:"serverUrlPath" db:"serverUrlPath"` - StopShellPath string `json:"stopShellPath" db:"stopShellPath"` - FinetuningShellPath string `json:"finetuningShellPath" db:"finetuningShellPath"` + ModelID int64 `json:"modelID" db:"modelID"` + OjbStgID int64 `json:"OjbStgID" db:"OjbStgID"` + ModelPath string `json:"modelPath" db:"modelPath"` + StartShellPath string `json:"startShellPath" db:"startShellPath"` + ServerPort int64 `json:"serverPort" db:"serverPort"` + ServerUrlPath string `json:"serverUrlPath" db:"serverUrlPath"` + StopShellPath string `json:"stopShellPath" db:"stopShellPath"` + FinetuningShellPath string `json:"finetuningShellPath" db:"finetuningShellPath"` } type ObjectStorage struct { diff --git a/common/pkgs/db/models.go b/common/pkgs/db/models.go index ced74d0..707e5fa 100644 --- a/common/pkgs/db/models.go +++ b/common/pkgs/db/models.go @@ -3,7 +3,6 @@ package db import ( "github.com/jmoiron/sqlx" schsdk "gitlink.org.cn/cloudream/common/sdks/scheduler" - cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" schmod "gitlink.org.cn/cloudream/scheduler/common/models" ) @@ -22,8 +21,8 @@ func (*ModelsDB) GetAll(ctx SQLContext) ([]schmod.Models, error) { return ret, err } -func (*ModelsDB) GetModelByID(ctx SQLContext, modelID schsdk.ModelID, CDSStorageID cdssdk.StorageID) (schmod.ModelResource, error) { +func (*ModelsDB) GetModelByID(ctx SQLContext, modelID schsdk.ModelID, OjbStgID int64) (schmod.ModelResource, error) { var ret schmod.ModelResource - err := sqlx.Get(ctx, &ret, "select * from ModelResource where modelID = ? and CDSStorageID = ?", modelID, CDSStorageID) + err := sqlx.Get(ctx, &ret, "select * from ModelResource where modelID = ? and OjbStgID = ?", modelID, OjbStgID) return ret, err } diff --git a/common/pkgs/db/object_storage.go b/common/pkgs/db/object_storage.go index de6137a..dffb201 100644 --- a/common/pkgs/db/object_storage.go +++ b/common/pkgs/db/object_storage.go @@ -16,6 +16,6 @@ func (db *DB) ObjectStorage() *ObjectStorageDB { func (*ObjectStorageDB) GetObjectStorageByStorageID(ctx SQLContext, CDSStorageID cdssdk.StorageID) (schmod.ObjectStorage, error) { var ret schmod.ObjectStorage - err := sqlx.Get(ctx, &ret, "select access_key_id, secret_access_key, endpoint, bucket, CDSStorageID, mountType from ObjectStorage where CDSStorageID = ?", CDSStorageID) + err := sqlx.Get(ctx, &ret, "select * from ObjectStorage where CDSStorageID = ?", CDSStorageID) return ret, err } diff --git a/common/pkgs/mq/executor/task/scheduler_model_finetuning.go b/common/pkgs/mq/executor/task/scheduler_model_finetuning.go index d3a869f..ef5a4a2 100644 --- a/common/pkgs/mq/executor/task/scheduler_model_finetuning.go +++ b/common/pkgs/mq/executor/task/scheduler_model_finetuning.go @@ -4,8 +4,10 @@ import schsdk "gitlink.org.cn/cloudream/common/sdks/scheduler" type SchedulerModelFinetuning struct { TaskInfoBase - CMD string `json:"cmd"` - Envs []schsdk.KVPair `json:"envs"` + Type string `json:"type"` + CMD string `json:"cmd"` + Envs []schsdk.KVPair `json:"envs"` + InferencePlatform schsdk.InferencePlatform `json:"inferencePlatform"` } type SchedulerModelFinetuningStatus struct { @@ -13,10 +15,11 @@ type SchedulerModelFinetuningStatus struct { Error error `json:"error"` } -func NewSchedulerModelFinetuning(cmd string, envs []schsdk.KVPair) *SchedulerModelFinetuning { +func NewSchedulerModelFinetuning(cmd string, envs []schsdk.KVPair, inferencePlatform schsdk.InferencePlatform) *SchedulerModelFinetuning { return &SchedulerModelFinetuning{ - CMD: cmd, - Envs: envs, + CMD: cmd, + Envs: envs, + InferencePlatform: inferencePlatform, } } diff --git a/executor/internal/config/config.go b/executor/internal/config/config.go index 5669272..92d306c 100644 --- a/executor/internal/config/config.go +++ b/executor/internal/config/config.go @@ -16,14 +16,15 @@ type Application struct { } type Config struct { - Logger log.Config `json:"logger"` - ReportIntervalSec int `json:"reportIntervalSec"` - RabbitMQ mymq.Config `json:"rabbitMQ"` - CloudreamStorage cdssdk.Config `json:"cloudreamStorage"` - PCM pcmsdk.Config `json:"pcm"` - Application Application `json:"application"` - Rclone schsdk.Rclone `json:"rclone"` - CloudECS map[string]interface{} `json:"createECS"` + Logger log.Config `json:"logger"` + ReportIntervalSec int `json:"reportIntervalSec"` + RabbitMQ mymq.Config `json:"rabbitMQ"` + CloudreamStorage cdssdk.Config `json:"cloudreamStorage"` + PCM pcmsdk.Config `json:"pcm"` + Application Application `json:"application"` + Rclone schsdk.Rclone `json:"rclone"` + CloudECS map[string]interface{} `json:"createECS"` + InferencePlatform schsdk.InferencePlatform `json:"inferencePlatform"` } var cfg Config diff --git a/executor/internal/task/create_ecs/alicloud.go b/executor/internal/task/create_ecs/alicloud.go index c3a75f3..cc7af97 100644 --- a/executor/internal/task/create_ecs/alicloud.go +++ b/executor/internal/task/create_ecs/alicloud.go @@ -275,3 +275,9 @@ func getInstanceIP(instanceID string, regionId string) (string, error) { return "", nil } + +func (a *AliCloud) AvailableCheck(instanceID string) bool { + // 待实现 + + return true +} diff --git a/executor/internal/task/create_ecs/factory.go b/executor/internal/task/create_ecs/factory.go index 8e7891c..4d89ea5 100644 --- a/executor/internal/task/create_ecs/factory.go +++ b/executor/internal/task/create_ecs/factory.go @@ -10,6 +10,7 @@ type CloudProvider interface { StopInstance(instanceID string) (string, error) RebootInstances(instanceID string) (string, error) StartInstances(instanceID string) (string, error) + AvailableCheck(instanceID string) bool } type CloudFactory interface { diff --git a/executor/internal/task/create_ecs/huaweicloud.go b/executor/internal/task/create_ecs/huaweicloud.go index f84a348..6b1a237 100644 --- a/executor/internal/task/create_ecs/huaweicloud.go +++ b/executor/internal/task/create_ecs/huaweicloud.go @@ -12,6 +12,11 @@ import ( // HuaweiCloud实现了CloudProvider接口 type HuaweiCloud struct{} +func (a *HuaweiCloud) AvailableCheck(instanceID string) bool { + //TODO implement me + panic("implement me") +} + func (a *HuaweiCloud) StartInstances(instanceID string) (string, error) { //TODO implement me panic("implement me") diff --git a/executor/internal/task/create_ecs/sugoncloud.go b/executor/internal/task/create_ecs/sugoncloud.go index 4ac76d1..cff120b 100644 --- a/executor/internal/task/create_ecs/sugoncloud.go +++ b/executor/internal/task/create_ecs/sugoncloud.go @@ -112,6 +112,7 @@ type SugonCloud struct { var ecsConfig map[string]interface{} var authConfig map[string]interface{} +var instanceKV map[string]string var sugonClient exemq.HttpClient var efileClient exemq.HttpClient @@ -119,6 +120,7 @@ func SugonCloudConfig(authConfigs map[string]interface{}, ecsConfigs map[string] authConfigs["get_token_url"] = "https://ac.sugon.com/ac/openapi/v2/tokens" authConfig = authConfigs ecsConfig = ecsConfigs + instanceKV = make(map[string]string) // 获取token token, err := getToken(authConfigs) @@ -144,9 +146,6 @@ func SugonCloudConfig(authConfigs map[string]interface{}, ecsConfigs map[string] efileClient = *ec } -func (s *SugonCloud) CreateServer111() (string, string, error) { - return "", "", fmt.Errorf("not support") -} func (s *SugonCloud) CreateServer() (string, string, error) { instanceServiceName := "auto_instance_" + time.Now().Format("20060102150405") @@ -192,6 +191,7 @@ func (s *SugonCloud) CreateServer() (string, string, error) { url, err := sugonClient.GetInstanceUrl(token, instanceID) logger.Info("create ecs success, instanceID: " + instanceID + " url: " + url) + instanceKV[instanceID] = instanceServiceName return instanceID, url, nil } @@ -220,8 +220,9 @@ func (s *SugonCloud) RunCommand(commands []string, instanceID string, timeout in time.Sleep(3 * time.Second) continue } - return content, err + break } + return content, err } _, err := sugonClient.RunCommand(token, instanceID, commands[i]) @@ -270,3 +271,17 @@ func (s *SugonCloud) StartInstances(instanceID string) (string, error) { instance, err := sugonClient.OperateSugonInstance(token, instanceID, schsdk.RunECS) return instance, err } + +func (s *SugonCloud) AvailableCheck(instanceID string) bool { + instanceName, ok := instanceKV[instanceID] + if !ok { + return false + } + token, _ := getToken(authConfig) + instanceID, _, err := sugonClient.GetInstanceID(token, instanceName) + if err != nil || instanceID == "" { + logger.Error(err.Error()) + return false + } + return true +} diff --git a/executor/internal/task/scheduler_create_ecs.go b/executor/internal/task/scheduler_create_ecs.go index ce21433..098cade 100644 --- a/executor/internal/task/scheduler_create_ecs.go +++ b/executor/internal/task/scheduler_create_ecs.go @@ -34,7 +34,6 @@ func (t *ScheduleCreateECS) Execute(task *Task, ctx TaskContext) { log.Debugf("begin") defer log.Debugf("end") - //task.SendStatus(exectsk.NewScheduleCreateECSStatus("address", "")) err := t.do(task, ctx) if err != nil { log.Error(err) @@ -49,7 +48,7 @@ func (t *ScheduleCreateECS) do(task *Task, ctx TaskContext) error { factory := create_ecs.GetFactory(config.CloudName) provider := factory.CreateProvider() instanceID, ecsIP, err := provider.CreateServer() - //instanceID, ecsIP, err := "i-bp19q01cjmr62vstszh3", "47.98.122.29", error(nil) + //instanceID, ecsIP, err := "i-bp18see6gypratlt3nhp", "47.96.28.209", error(nil) if err != nil { task.SendStatus(exectsk.NewScheduleCreateECSStatus("", schsdk.CreateECS, err.Error())) return err @@ -87,12 +86,12 @@ func (t *ScheduleCreateECS) do(task *Task, ctx TaskContext) error { arr := utils.SplitCommands(t.Command) commands = append(commands, arr...) - _, err = provider.RunCommand(commands, instanceID, 2000) - if err != nil { - logger.Error("run command error: " + err.Error()) - } + //_, err = provider.RunCommand(commands, instanceID, 2000) + //if err != nil { + // logger.Error("run command error: " + err.Error()) + //} - address := "http://" + ecsIP + ":" + strconv.Itoa(t.ModelResource.ServerPort) + "/" + t.ModelResource.ServerUrlPath + address := "http://" + ecsIP + ":" + strconv.FormatInt(t.ModelResource.ServerPort, 10) + "/" + t.ModelResource.ServerUrlPath if config.CloudName == schmod.SugonCloud { address = ecsIP + "/" + t.ModelResource.ServerUrlPath address = strings.Replace(address, "//", "/", -1) @@ -138,6 +137,7 @@ func (t *ScheduleCreateECS) do(task *Task, ctx TaskContext) error { continue } task.SendStatus(exectsk.NewScheduleCreateECSStatus("", schsdk.DestroyECS, "")) + break case schsdk.OperateServer: executeCommands(provider, instanceID, task, info.Runtime) case schsdk.GPUMonitor: @@ -151,12 +151,18 @@ func (t *ScheduleCreateECS) do(task *Task, ctx TaskContext) error { res, err = provider.RunCommand(commands, instanceID, 2000) } if err != nil { + // 如果命令执行失败,判断机器是否可用 + check := provider.AvailableCheck(instanceID) + if !check { + task.SendStatus(exectsk.NewScheduleCreateECSStatus("", schsdk.Invalid, err.Error())) + break + } task.SendStatus(exectsk.NewScheduleCreateECSStatus("", "", err.Error())) continue } task.SendStatus(exectsk.NewScheduleCreateECSStatus(res, schsdk.GPUMonitor, "")) default: - //executeCommands(provider, instanceID, task, info.Command) + executeCommands(provider, instanceID, task, info.Runtime) } } diff --git a/executor/internal/task/scheduler_model_finetuning.go b/executor/internal/task/scheduler_model_finetuning.go index d18a579..4401399 100644 --- a/executor/internal/task/scheduler_model_finetuning.go +++ b/executor/internal/task/scheduler_model_finetuning.go @@ -1,7 +1,9 @@ package task import ( + "errors" "gitlink.org.cn/cloudream/common/pkgs/logger" + schsdk "gitlink.org.cn/cloudream/common/sdks/scheduler" exectsk "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq/executor/task" "gitlink.org.cn/cloudream/scheduler/common/utils" "gitlink.org.cn/cloudream/scheduler/executor/internal/config" @@ -22,7 +24,6 @@ func (t *SchedulerModelFinetuning) Execute(task *Task, ctx TaskContext) { log.Debugf("begin") defer log.Debugf("end") - //task.SendStatus(exectsk.NewScheduleCreateECSStatus("address", "")) err := t.do(task, ctx) if err != nil { log.Error(err) @@ -33,11 +34,16 @@ func (t *SchedulerModelFinetuning) Execute(task *Task, ctx TaskContext) { } func (t *SchedulerModelFinetuning) do(task *Task, ctx TaskContext) error { - var commands []string - commandContent := "sed -i '/SCH_DATA_IN/d' ~/.bashrc && echo 'export SCH_DATA_IN=" + strings.Replace(t.Envs[0].Value, "\\", "/", -1) + "' >> ~/.bashrc" - commands = append(commands, commandContent) - commandContent = "sed -i '/SCH_DATA_OUT/d' ~/.bashrc && echo 'export SCH_DATA_OUT=" + strings.Replace(t.Envs[1].Value, "\\", "/", -1) + "' >> ~/.bashrc" - commands = append(commands, commandContent) + // 设置环境变量 + commands := utils.ConvertEnvsToCommand(t.Envs) + + if t.Type == schsdk.DataPreprocess { + _, err := getDataPreprocessCommands(t.Envs, t.InferencePlatform) + if err != nil { + task.SendStatus(exectsk.NewSchedulerModelFinetuningStatus(err)) + return err + } + } arr := utils.SplitCommands(t.CMD) commands = append(commands, arr...) @@ -67,6 +73,32 @@ func (t *SchedulerModelFinetuning) do(task *Task, ctx TaskContext) error { return nil } +func getDataPreprocessCommands(envs []schsdk.KVPair, inferencePlatform schsdk.InferencePlatform) ([]string, error) { + + if inferencePlatform.PlatformName == "" { + return nil, errors.New("inferencePlatform.PlatformName is empty") + } + + var commands []string + + // 读取当前目录下的 data_preprocess.py 文件 + fileContent := "" + + fileContent = strings.ReplaceAll(fileContent, "@base_url@", inferencePlatform.ApiBaseUrl) + fileContent = strings.ReplaceAll(fileContent, "@api_key@", inferencePlatform.ApiKey) + fileContent = strings.ReplaceAll(fileContent, "@input_file_path@", "") + fileContent = strings.ReplaceAll(fileContent, "@output_file@", "") + fileContent = strings.ReplaceAll(fileContent, "@base_url@", inferencePlatform.ApiBaseUrl) + fileContent = strings.ReplaceAll(fileContent, "@base_url@", inferencePlatform.ApiBaseUrl) + + commandContent := "echo -e '" + fileContent + "' > /opt/generate_data.py" + commands = append(commands, commandContent) + commandContent = "echo -e '" + fileContent + "' > /opt/generate_data.py" + commands = append(commands, commandContent) + + return commands, nil +} + func init() { Register(NewSchedulerModelFinetuning) } diff --git a/executor/main.go b/executor/main.go index 2385958..3dfb0e9 100644 --- a/executor/main.go +++ b/executor/main.go @@ -32,6 +32,7 @@ func main() { myglbs.InitExecutorID(config.Cfg().Application.ExecutorID) schglb.InitRcloneConfig(config.Cfg().Rclone.CDSRcloneID, config.Cfg().Rclone.CDSRcloneConfigID) + schglb.InitInferencePlatform(config.Cfg().InferencePlatform) //rpter := reporter.NewReporter(myglbs.ExecutorID, time.Second*time.Duration(config.Cfg().ReportIntervalSec)) // diff --git a/manager/internal/jobmgr/job/state/executing.go b/manager/internal/jobmgr/job/state/executing.go index aba68ef..883612e 100644 --- a/manager/internal/jobmgr/job/state/executing.go +++ b/manager/internal/jobmgr/job/state/executing.go @@ -24,17 +24,17 @@ import ( "gitlink.org.cn/cloudream/scheduler/manager/internal/jobmgr/job" ) -type NormalJobExecuting struct { +type JobExecuting struct { lastStatus pcmsdk.TaskStatus } -func NewNormalJobExecuting() *NormalJobExecuting { - return &NormalJobExecuting{ +func NewNormalJobExecuting() *JobExecuting { + return &JobExecuting{ lastStatus: "Begin", } } -func (s *NormalJobExecuting) Run(rtx jobmgr.JobStateRunContext, jo *jobmgr.Job) { +func (s *JobExecuting) Run(rtx jobmgr.JobStateRunContext, jo *jobmgr.Job) { err := s.do(rtx, jo) if err != nil { rtx.Mgr.ChangeState(jo, FailureComplete(err)) @@ -43,13 +43,13 @@ func (s *NormalJobExecuting) Run(rtx jobmgr.JobStateRunContext, jo *jobmgr.Job) } } -func (s *NormalJobExecuting) Dump(rtx jobmgr.JobStateRunContext, jo *jobmgr.Job) jobmod.JobStateDump { +func (s *JobExecuting) Dump(rtx jobmgr.JobStateRunContext, jo *jobmgr.Job) jobmod.JobStateDump { return &jobmod.NormalJobExecutingDump{ TaskStatus: s.lastStatus, } } -func (s *NormalJobExecuting) do(rtx jobmgr.JobStateRunContext, jo *jobmgr.Job) error { +func (s *JobExecuting) do(rtx jobmgr.JobStateRunContext, jo *jobmgr.Job) error { // TODO UserID userID := cdssdk.UserID(1) err := error(nil) @@ -74,6 +74,9 @@ func (s *NormalJobExecuting) do(rtx jobmgr.JobStateRunContext, jo *jobmgr.Job) e } cmd, envs := getRuntimeCommand(runningJob.Info.Runtime, runningJob.Files.Dataset.PackageID, runningJob.OutputPath, getStg.RemoteBase, *ccInfo) err = s.submitNormalTask(rtx, cmd, envs, *ccInfo, pcmImgInfo, ress[0].PCMResourceID) + if err != nil { + logger.Error(err.Error()) + } case *job.FinetuningJob: ccInfo, getStg, err := getCCInfoAndStgInfo(rtx, runningJob.TargetCCID, userID) @@ -82,6 +85,9 @@ func (s *NormalJobExecuting) do(rtx jobmgr.JobStateRunContext, jo *jobmgr.Job) e } cmd, envs := getRuntimeCommand(runningJob.Info.Runtime, runningJob.Files.Dataset.PackageID, runningJob.OutputPath, getStg.RemoteBase, *ccInfo) err = s.submitFinetuningTask(rtx, cmd, envs, *ccInfo) + if err != nil { + logger.Error(err.Error()) + } case *job.InstanceJob: ccInfo, getStg, err := getCCInfoAndStgInfo(rtx, runningJob.TargetCCID, userID) @@ -90,11 +96,16 @@ func (s *NormalJobExecuting) do(rtx jobmgr.JobStateRunContext, jo *jobmgr.Job) e } _, envs := getRuntimeCommand(runningJob.Info.Runtime, runningJob.Files.Dataset.PackageID, runningJob.OutputPath, getStg.RemoteBase, *ccInfo) err = s.submitInstanceTask(rtx, jo, runningJob, *ccInfo, getStg.StorageID, userID, envs) + if err != nil { + logger.Error(err.Error()) + // 创建失败,从多实例任务中删除 + postDeleteInstanceEvent(rtx, jo, runningJob) + } } return err } -func (s *NormalJobExecuting) submitNormalTask(rtx jobmgr.JobStateRunContext, cmd string, envs []schsdk.KVPair, ccInfo schmod.ComputingCenter, pcmImgInfo schmod.PCMImage, resourceID pcmsdk.ResourceID) error { +func (s *JobExecuting) submitNormalTask(rtx jobmgr.JobStateRunContext, cmd string, envs []schsdk.KVPair, ccInfo schmod.ComputingCenter, pcmImgInfo schmod.PCMImage, resourceID pcmsdk.ResourceID) error { task, err := rtx.Mgr.ExecMgr.StartTask(exetsk.NewSubmitTask( ccInfo.PCMParticipantID, pcmImgInfo.PCMImageID, @@ -136,10 +147,11 @@ func (s *NormalJobExecuting) submitNormalTask(rtx jobmgr.JobStateRunContext, cmd } } -func (s *NormalJobExecuting) submitFinetuningTask(rtx jobmgr.JobStateRunContext, cmd string, envs []schsdk.KVPair, ccInfo schmod.ComputingCenter) error { +func (s *JobExecuting) submitFinetuningTask(rtx jobmgr.JobStateRunContext, cmd string, envs []schsdk.KVPair, ccInfo schmod.ComputingCenter) error { task, err := rtx.Mgr.ExecMgr.StartTask(exetsk.NewSchedulerModelFinetuning( cmd, envs, + schglb.InferencePlatform, ), ccInfo) if err != nil { @@ -159,27 +171,28 @@ func (s *NormalJobExecuting) submitFinetuningTask(rtx jobmgr.JobStateRunContext, return nil } -func (s *NormalJobExecuting) submitInstanceTask(rtx jobmgr.JobStateRunContext, jo *jobmgr.Job, runningJob *job.InstanceJob, ccInfo schmod.ComputingCenter, +func (s *JobExecuting) submitInstanceTask(rtx jobmgr.JobStateRunContext, jo *jobmgr.Job, runningJob *job.InstanceJob, ccInfo schmod.ComputingCenter, storageID cdssdk.StorageID, userID cdssdk.UserID, envs []schsdk.KVPair) error { modelJobInfo := runningJob.Info.ModelJobInfo - // 先从数据库中查询是否已经预置了模型 - modelInfo, err := rtx.Mgr.DB.Models().GetModelByID(rtx.Mgr.DB.SQLCtx(), modelJobInfo.ModelID, storageID) - if &modelInfo == nil { - logger.Error(err.Error()) - return fmt.Errorf("the model is not exists: %w", err) - } - if err != nil { - return fmt.Errorf("getting computing center info: %w", err) - } - objectStorage, err := rtx.Mgr.DB.ObjectStorage().GetObjectStorageByStorageID(rtx.Mgr.DB.SQLCtx(), storageID) if err != nil { logger.Error(err.Error()) return fmt.Errorf("getting object storage info: %w", err) } - println(objectStorage.CDSStorageID) + + // 先从数据库中查询是否已经预置了模型 + modelInfo, err := rtx.Mgr.DB.Models().GetModelByID(rtx.Mgr.DB.SQLCtx(), modelJobInfo.ModelID, objectStorage.ID) + if &modelInfo == nil { + logger.Error(err.Error()) + return fmt.Errorf("the model is not exists: %w", err) + } + if err != nil { + logger.Error(err.Error()) + return fmt.Errorf("getting model info info: %w", err) + } + // 发送扩容任务 ecs := exetsk.NewScheduleCreateECS( userID, @@ -227,11 +240,12 @@ func (s *NormalJobExecuting) submitInstanceTask(rtx jobmgr.JobStateRunContext, j case *exetsk.ScheduleCreateECSStatus: if v2.Error != "" { logger.Error("update task fail, error: " + v2.Error) - if v2.Operate == schsdk.CreateECS { - // 创建失败,从多实例任务中删除 - postDeleteInstanceEvent(rtx, jo, runningJob) + if v2.Operate == schsdk.CreateECS || v2.Operate == schsdk.Invalid { + // 创建失败或者检测不可用,从多实例任务中删除 + v2.Operate = schsdk.DestroyECS + } else { + continue } - continue } switch v2.Operate { @@ -358,7 +372,7 @@ func (s *DataReturnJobExecuting) do(rtx jobmgr.JobStateRunContext, jo *jobmgr.Jo reJob := jo.Body.(*job.DataReturnJob) userID := cdssdk.UserID(1) - log := logger.WithType[NormalJobExecuting]("State").WithField("JobID", jo.JobID) + log := logger.WithType[JobExecuting]("State").WithField("JobID", jo.JobID) ctx, cancel := context.WithCancel(context.Background()) defer cancel()