From cca645287984531ca6b24da53db0bae876ee09d9 Mon Sep 17 00:00:00 2001 From: JeshuaRen <270813223@qq.com> Date: Mon, 30 Sep 2024 16:41:30 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96=E4=BB=A3=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- common/assets/confs/executor.config.json | 14 +++--- common/models/job/job.go | 6 +-- common/utils/utils.go | 45 ++++++++++--------- executor/internal/task/create_ecs/alicloud.go | 5 --- executor/internal/task/create_ecs/factory.go | 3 -- .../internal/task/scheduler_create_ecs.go | 28 ++++++------ .../task/scheduler_data_preprocess.go | 29 +++--------- .../task/scheduler_model_finetuning.go | 44 +++++++++--------- .../jobmgr/job/state/multiInstance_running.go | 2 +- .../jobmgr/job/state/multiInstance_update.go | 1 - manager/internal/jobmgr/node_info.go | 20 ++++----- manager/internal/mq/job.go | 2 + 12 files changed, 86 insertions(+), 113 deletions(-) diff --git a/common/assets/confs/executor.config.json b/common/assets/confs/executor.config.json index 064524d..7ba8615 100644 --- a/common/assets/confs/executor.config.json +++ b/common/assets/confs/executor.config.json @@ -28,8 +28,8 @@ "createECS": { "cloud": "AliCloud", "auth_config": { - "AccessKeyId": "LTAI5tJBqN3uRnzXeiiXTxkT", - "AccessKeySecret": "dilS4SJ0I3SMWtY7h1ByHe3MOULuGA" + "AccessKeyId": "xxx", + "AccessKeySecret": "xxx" }, "ecs_config": { "DryRun": false, @@ -52,8 +52,8 @@ "createECS-hw": { "cloud": "HuaweiCloud", "auth_config": { - "AccessKeyId": "LTAI5tMraAgfzhrF4PF79Js4", - "AccessKeySecret": "aWTkvrBWZt58kvpop7MNwTDMinJFtj" + "AccessKeyId": "xxx", + "AccessKeySecret": "xxx" }, "ecs_config": { "Region": "cn-hangzhou" @@ -62,9 +62,9 @@ "createECS-sugon": { "cloud": "SugonCloud", "auth_config": { - "user": "acgnnmfbwo", - "password": "Pcl@2020", - "orgid": "c8befbc1301665ba2dc5b2826f8dca1e", + "user": "xxx", + "password": "xxx", + "orgid": "xxx", "clusterName": "华东一区【昆山】" }, "ecs_config": { diff --git a/common/models/job/job.go b/common/models/job/job.go index 6321fd1..159e215 100644 --- a/common/models/job/job.go +++ b/common/models/job/job.go @@ -39,9 +39,9 @@ type JobFiles struct { } type PackageJobFile struct { - PackageID cdssdk.PackageID `json:"packageID"` - PackagePath string `json:"packagePath"` // Load之后的文件路径,一个相对路径,需要加上CDS数据库中的RemoteBase才是完整路径 - ECSInstanceID schsdk.ECSInstanceID + PackageID cdssdk.PackageID `json:"packageID"` + PackagePath string `json:"packagePath"` // Load之后的文件路径,一个相对路径,需要加上CDS数据库中的RemoteBase才是完整路径 + ECSInstanceID schsdk.ECSInstanceID // TODO 这个实例ID暂时放在这里,后续会修改 } type ImageJobFile struct { diff --git a/common/utils/utils.go b/common/utils/utils.go index 63d3653..8a51194 100644 --- a/common/utils/utils.go +++ b/common/utils/utils.go @@ -5,6 +5,7 @@ import ( "encoding/hex" "fmt" "gitlink.org.cn/cloudream/common/pkgs/logger" + schglb "gitlink.org.cn/cloudream/scheduler/common/globals" schmod "gitlink.org.cn/cloudream/scheduler/common/models" "math/rand" "strconv" @@ -72,35 +73,35 @@ func GetRcloneCommands(storage schmod.ObjectStorage, userID cdssdk.UserID, mount var commands []string // 下载Rclone - //commandContent := "yum install -y fuse3" - //commands = append(commands, commandContent) - //commandContent = "cd /opt && downloadCode='import requests;response=requests.get(\"@url@\",stream=True);response.raise_for_status();boundary=response.headers.get(\"Content-Type\").split(\"boundary=\")[-1].encode();content=response.content;body=[part.split(b\"\\r\\n\\r\\n\",1)[1].rsplit(b\"\\r\\n--\",1)[0] for part in content.split(b\"--\"+boundary+b\"\\r\\n\") if b\"filename=\" in part][0];open(\"@filename@\",\"wb\").write(body);print(\"success\")' && rclone=\"$cds_url/object/download?userID=$userID&objectID=$rcloneID\" && python3 -c \"$(echo \"$downloadCode\" | sed -e \"s|@url@|$(printf '%s' \"$rclone\" | sed 's/[&/\\]/\\\\&/g')|\" -e \"s|@filename@|rclone|\")\" && chmod +x rclone" - //commandContent = strings.Replace(commandContent, "$cds_url", schglb.CloudreamStorageConfig.URL, -1) - //commandContent = strings.Replace(commandContent, "$rcloneID", schglb.CDSRclone.CDSRcloneID, -1) - //commandContent = strings.Replace(commandContent, "$userID", strconv.FormatInt(int64(userID), 10), -1) - //commands = append(commands, commandContent) - // - //// 生成Rclone配置文件 - //commandContent = "echo -e '[@tagName@] \n type = s3 \n provider = @provider@ \n access_key_id = @ak@ \n secret_access_key = @sk@ \n endpoint = @endpoint@ \n storage_class = STANDARD' > /opt/rclone.conf" - tagName := storage.Bucket + "_" + storage.AK - //commandContent = strings.Replace(commandContent, "@tagName@", tagName, -1) - //commandContent = strings.Replace(commandContent, "@provider@", storage.Manufacturer, -1) - //commandContent = strings.Replace(commandContent, "@ak@", storage.AK, -1) - //commandContent = strings.Replace(commandContent, "@sk@", storage.SK, -1) - //commandContent = strings.Replace(commandContent, "@endpoint@", storage.Endpoint, -1) - //commands = append(commands, commandContent) + commandContent := "yum install -y fuse3" + commands = append(commands, commandContent) + commandContent = "cd /opt && downloadCode='import requests;response=requests.get(\"@url@\",stream=True);response.raise_for_status();boundary=response.headers.get(\"Content-Type\").split(\"boundary=\")[-1].encode();content=response.content;body=[part.split(b\"\\r\\n\\r\\n\",1)[1].rsplit(b\"\\r\\n--\",1)[0] for part in content.split(b\"--\"+boundary+b\"\\r\\n\") if b\"filename=\" in part][0];open(\"@filename@\",\"wb\").write(body);print(\"success\")' && rclone=\"$cds_url/object/download?userID=$userID&objectID=$rcloneID\" && python3 -c \"$(echo \"$downloadCode\" | sed -e \"s|@url@|$(printf '%s' \"$rclone\" | sed 's/[&/\\]/\\\\&/g')|\" -e \"s|@filename@|rclone|\")\" && chmod +x rclone" + commandContent = strings.Replace(commandContent, "$cds_url", schglb.CloudreamStorageConfig.URL, -1) + commandContent = strings.Replace(commandContent, "$rcloneID", schglb.CDSRclone.CDSRcloneID, -1) + commandContent = strings.Replace(commandContent, "$userID", strconv.FormatInt(int64(userID), 10), -1) + commands = append(commands, commandContent) - umountCommand := "umount -l /mnt/oss" - commands = append(commands, umountCommand) + // 生成Rclone配置文件 + commandContent = "echo -e '[@tagName@] \n type = s3 \n provider = @provider@ \n access_key_id = @ak@ \n secret_access_key = @sk@ \n endpoint = @endpoint@ \n storage_class = STANDARD' > /opt/rclone.conf" + tagName := storage.Bucket + "_" + storage.AK + commandContent = strings.Replace(commandContent, "@tagName@", tagName, -1) + commandContent = strings.Replace(commandContent, "@provider@", storage.Manufacturer, -1) + commandContent = strings.Replace(commandContent, "@ak@", storage.AK, -1) + commandContent = strings.Replace(commandContent, "@sk@", storage.SK, -1) + commandContent = strings.Replace(commandContent, "@endpoint@", storage.Endpoint, -1) + commands = append(commands, commandContent) + + //umountCommand := "umount -l /mnt/oss" + //commands = append(commands, umountCommand) // 挂载Rclone - commandContent := "mkdir -p @mountDir@ && cd /opt && nohup ./rclone mount @tagName@:@bucket@ @mountDir@ --vfs-cache-mode full --vfs-read-wait 0 --vfs-read-chunk-size 128M --cache-db-purge -vv > rcloneMount.log 2>&1 &" + commandContent = "mkdir -p @mountDir@ && cd /opt && nohup ./rclone mount @tagName@:@bucket@ @mountDir@ --vfs-cache-mode full --vfs-read-wait 0 --vfs-read-chunk-size 128M --cache-db-purge -vv > rcloneMount.log 2>&1 &" commandContent = strings.Replace(commandContent, "@tagName@", tagName, -1) commandContent = strings.Replace(commandContent, "@bucket@", storage.Bucket, -1) commandContent = strings.Replace(commandContent, "@mountDir@", mountDir, -1) commands = append(commands, commandContent) - //commandContent = "cd /opt && wget https://repo.anaconda.com/miniconda/Miniconda3-latest-Linux-x86_64.sh && bash Miniconda3-latest-Linux-x86_64.sh -b -p $HOME/miniconda3 && eval \"$($HOME/miniconda3/bin/conda shell.bash hook)\" && conda create -n myenv python=3.10 -y" - //commands = append(commands, commandContent) + commandContent = "cd /opt && wget https://repo.anaconda.com/miniconda/Miniconda3-latest-Linux-x86_64.sh && bash Miniconda3-latest-Linux-x86_64.sh -b -p $HOME/miniconda3 && eval \"$($HOME/miniconda3/bin/conda shell.bash hook)\" && conda create -n myenv python=3.10 -y" + commands = append(commands, commandContent) return commands } diff --git a/executor/internal/task/create_ecs/alicloud.go b/executor/internal/task/create_ecs/alicloud.go index cc7af97..31827ad 100644 --- a/executor/internal/task/create_ecs/alicloud.go +++ b/executor/internal/task/create_ecs/alicloud.go @@ -12,7 +12,6 @@ import ( "time" ) -// AliCloud实现了CloudProvider接口 type AliCloud struct{} var aliclient = &ecs.Client{} @@ -36,11 +35,8 @@ func AliConfig(authConfigs map[string]interface{}, ecsConfigs map[string]interfa } config := &openapi.Config{} - // 您的AccessKey ID config.AccessKeyId = tea.String(authConfigs["AccessKeyId"].(string)) - // 您的AccessKey Secret config.AccessKeySecret = tea.String(authConfigs["AccessKeySecret"].(string)) - // 您的可用区ID config.RegionId = requestParam.RegionId aliclient, _ = ecs.NewClient(config) } @@ -159,7 +155,6 @@ func (a *AliCloud) StartInstances(instanceID string) (string, error) { } func runShellCommand(commandContent string, instanceID string, regionId string) (*string, error) { - // 从CDS下载文件 commandRequest := ecs.RunCommandRequest{ InstanceId: []*string{&instanceID}, CommandContent: tea.String(commandContent), diff --git a/executor/internal/task/create_ecs/factory.go b/executor/internal/task/create_ecs/factory.go index 4d89ea5..7748023 100644 --- a/executor/internal/task/create_ecs/factory.go +++ b/executor/internal/task/create_ecs/factory.go @@ -2,7 +2,6 @@ package create_ecs import schmod "gitlink.org.cn/cloudream/scheduler/common/models" -// CloudProvider 是一个接口,定义了创建服务器的方法 type CloudProvider interface { CreateServer() (string, string, error) RunCommand(commands []string, instanceID string, timeout int) (string, error) @@ -17,14 +16,12 @@ type CloudFactory interface { CreateProvider() CloudProvider } -// HuaweiCloudFactory 实现了CloudFactory接口 type HuaweiCloudFactory struct{} func (f *HuaweiCloudFactory) CreateProvider() CloudProvider { return &HuaweiCloud{} } -// AliCloudFactory 实现了CloudFactory接口 type AliCloudFactory struct{} func (f *AliCloudFactory) CreateProvider() CloudProvider { diff --git a/executor/internal/task/scheduler_create_ecs.go b/executor/internal/task/scheduler_create_ecs.go index 0fb60ef..bd1f0b1 100644 --- a/executor/internal/task/scheduler_create_ecs.go +++ b/executor/internal/task/scheduler_create_ecs.go @@ -42,8 +42,8 @@ func (t *ScheduleCreateECS) do(task *Task, ctx TaskContext) error { // 创建云主机 factory := create_ecs.GetFactory(config.CloudName) provider := factory.CreateProvider() - //instanceID, ecsIP, err := provider.CreateServer() - instanceID, ecsIP, err := "i-bp16imo8en907iy1oixd", "120.55.45.90", error(nil) + instanceID, ecsIP, err := provider.CreateServer() + //instanceID, ecsIP, err := "i-bp16imo8en907iy1oixd", "120.55.45.90", error(nil) if err != nil { task.SendStatus(exectsk.NewScheduleCreateECSStatus("", schsdk.CreateECS, err.Error())) return err @@ -74,18 +74,18 @@ func (t *ScheduleCreateECS) do(task *Task, ctx TaskContext) error { commands = append(commands, utils.HandleCommand(startScript)) // 安装依赖包,用于获取GPU信息 - //commandContent := getPipCommand() - //commands = append(commands, commandContent) + commandContent := getPipCommand() + commands = append(commands, commandContent) // 获取用户输入的命令 arr := utils.SplitCommands(t.Command) commands = append(commands, arr...) // 执行命令 - //_, err = provider.RunCommand(commands, instanceID, 2000) - //if err != nil { - // logger.Error("run command error: " + err.Error()) - //} + _, err = provider.RunCommand(commands, instanceID, 2000) + if err != nil { + logger.Error("run command error: " + err.Error()) + } address := "http://" + ecsIP + ":" + strconv.FormatInt(t.ModelResource.ServerPort, 10) + "/" + t.ModelResource.ServerUrlPath if config.CloudName == schmod.SugonCloud { @@ -128,15 +128,15 @@ func (t *ScheduleCreateECS) do(task *Task, ctx TaskContext) error { task.SendStatus(exectsk.NewScheduleCreateECSStatus("", schsdk.PauseECS, "")) case schsdk.DestroyECS: logger.Info("destroy ecs") - //_, err := provider.DeleteInstance(instanceID) - //if err != nil { - // task.SendStatus(exectsk.NewScheduleCreateECSStatus("", "", err.Error())) - // continue - //} + _, err := provider.DeleteInstance(instanceID) + if err != nil { + task.SendStatus(exectsk.NewScheduleCreateECSStatus("", "", err.Error())) + continue + } task.SendStatus(exectsk.NewScheduleCreateECSStatus("", schsdk.DestroyECS, "")) break case schsdk.RestartServer: - commandContent := utils.RemountRclone(t.ObjectStorage, t.UserID, schsdk.MountDir) + commandContent = utils.RemountRclone(t.ObjectStorage, t.UserID, schsdk.MountDir) info.Runtime.Command = info.Runtime.Command + "\n" + commandContent commandContent = schsdk.MountDir + "/" + t.ModelResource.StopShellPath info.Runtime.Command = info.Runtime.Command + "\n" + utils.HandleCommand(commandContent) diff --git a/executor/internal/task/scheduler_data_preprocess.go b/executor/internal/task/scheduler_data_preprocess.go index 4bcde32..a407572 100644 --- a/executor/internal/task/scheduler_data_preprocess.go +++ b/executor/internal/task/scheduler_data_preprocess.go @@ -62,8 +62,8 @@ func (t *SchedulerDataPreprocess) do(task *Task, ctx TaskContext) error { provider := factory.CreateProvider() // 创建服务器 - //instanceID, ecsIP, err := provider.CreateServer() - instanceID, ecsIP, err := "i-bp16imo8en907iy1oixd", "120.55.45.90", error(nil) + instanceID, ecsIP, err := provider.CreateServer() + //instanceID, ecsIP, err := "i-bp16imo8en907iy1oixd", "120.55.45.90", error(nil) if err != nil { task.SendStatus(exectsk.NewSchedulerDataPreprocessStatus("", err)) return err @@ -89,39 +89,20 @@ func getDataPreprocessCommands(envs []schsdk.KVPair, inferencePlatform schsdk.In var commands []string - // 获取当前工作目录 + // 读取预置的脚本 currentDir, err := filepath.Abs(".") if err != nil { fmt.Println("Error getting current directory:", err) } - parentDir := filepath.Dir(currentDir) - - // 指定要读取的文件名 - fileName := "example.txt" // 替换为你要读取的文件名 - - // 构造完整路径 + fileName := "./scripts/data_preprocess.py" filePath := filepath.Join(parentDir, fileName) - - // 读取文件 data, err := ioutil.ReadFile(filePath) if err != nil { fmt.Println("Error reading file:", err) } + fileContent := string(data) - // 输出文件内容 - fmt.Println("File content:") - fmt.Println(string(data)) - content := string(data) - - // 读取文件 - //content, err := ioutil.ReadFile("D:\\Work\\Codes\\new\\workspace\\workspace\\scheduler\\common\\assets\\scripts\\data_preprocess.py") - //if err != nil { - // logger.Error(err) - // return nil, err - //} - - fileContent := string(content) fileContent = strings.ReplaceAll(fileContent, "@base_url@", inferencePlatform.ApiBaseUrl) fileContent = strings.ReplaceAll(fileContent, "@api_key@", inferencePlatform.ApiKey) inputPath := schsdk.MountDir + "/" + envs[0].Value diff --git a/executor/internal/task/scheduler_model_finetuning.go b/executor/internal/task/scheduler_model_finetuning.go index 919ccb1..b86c42a 100644 --- a/executor/internal/task/scheduler_model_finetuning.go +++ b/executor/internal/task/scheduler_model_finetuning.go @@ -45,23 +45,23 @@ func (t *SchedulerModelFinetuning) do(task *Task, ctx TaskContext) error { provider := factory.CreateProvider() instanceID := t.InstanceID - // 如果没有指定实例ID,则创建一个 - //if t.InstanceID == "" { - // // 创建服务器 - // instID, ecsIP, err := provider.CreateServer() - // if err != nil { - // task.SendStatus(exectsk.NewSchedulerModelFinetuningStatus(err)) - // return err - // } - // instanceID = instID - // logger.Info("create ECS success, instance id: " + instanceID + ", ip: " + ecsIP) - // - // if t.ObjectStorage.MountType == schsdk.RcloneMount { - // // 获取Rclone挂载命令 - // mountCommands := utils.GetRcloneCommands(t.ObjectStorage, t.UserID, schsdk.MountDir) - // commands = append(commands, mountCommands...) - // } - //} + // 如果没有指定实例ID,则创建一个(即预处理服务器与微调服务器分块) + if t.InstanceID == "" { + // 创建服务器 + instID, ecsIP, err := provider.CreateServer() + if err != nil { + task.SendStatus(exectsk.NewSchedulerModelFinetuningStatus(err)) + return err + } + instanceID = instID + logger.Info("create ECS success, instance id: " + instanceID + ", ip: " + ecsIP) + + if t.ObjectStorage.MountType == schsdk.RcloneMount { + // 获取Rclone挂载命令 + mountCommands := utils.GetRcloneCommands(t.ObjectStorage, t.UserID, schsdk.MountDir) + commands = append(commands, mountCommands...) + } + } mountCommands := utils.GetRcloneCommands(t.ObjectStorage, t.UserID, schsdk.MountDir) commands = append(commands, mountCommands...) @@ -76,11 +76,11 @@ func (t *SchedulerModelFinetuning) do(task *Task, ctx TaskContext) error { // 执行微调任务 _, err := provider.RunCommand(commands, instanceID, 2000) // 执行结束后销毁服务器 - //_, err2 := provider.DeleteInstance(instanceID) - //if err2 != nil { - // task.SendStatus(exectsk.NewSchedulerModelFinetuningStatus(err)) - // return err2 - //} + _, err2 := provider.DeleteInstance(instanceID) + if err2 != nil { + task.SendStatus(exectsk.NewSchedulerModelFinetuningStatus(err)) + return err2 + } if err != nil { task.SendStatus(exectsk.NewSchedulerModelFinetuningStatus(err)) return err diff --git a/manager/internal/jobmgr/job/state/multiInstance_running.go b/manager/internal/jobmgr/job/state/multiInstance_running.go index 398bdb6..adfde0b 100644 --- a/manager/internal/jobmgr/job/state/multiInstance_running.go +++ b/manager/internal/jobmgr/job/state/multiInstance_running.go @@ -43,7 +43,7 @@ func (s *MultiInstanceRunning) do(rtx jobmgr.JobStateRunContext, jo *jobmgr.Job) multInstJob := jo.Body.(*job.MultiInstanceJob) - //go pollingInstance(rtx, multInstJob) + go pollingInstance(rtx, multInstJob) waitFut := event.BeginWaitType[*event.InstanceOperate](rtx.EventSet) for { diff --git a/manager/internal/jobmgr/job/state/multiInstance_update.go b/manager/internal/jobmgr/job/state/multiInstance_update.go index c552d2e..e06090e 100644 --- a/manager/internal/jobmgr/job/state/multiInstance_update.go +++ b/manager/internal/jobmgr/job/state/multiInstance_update.go @@ -95,7 +95,6 @@ func (s *MultiInstanceUpdate) do(rtx jobmgr.JobStateRunContext, jo *jobmgr.Job) } // 发送事件,更新各个instance - //updateJob.Info.Runtime.Command = strings.Replace(updateJob.Info.Runtime.Command, "$1", fullPath, -1) updateJob.Info.Runtime.Envs = append(updateJob.Info.Runtime.Envs, schsdk.KVPair{Key: schsdk.FinetuningOutEnv, Value: fullPath}) updateInfo := event.InstanceUpdateInfo{ Info: updateJob.Info, diff --git a/manager/internal/jobmgr/node_info.go b/manager/internal/jobmgr/node_info.go index 064984d..c6158b3 100644 --- a/manager/internal/jobmgr/node_info.go +++ b/manager/internal/jobmgr/node_info.go @@ -168,21 +168,19 @@ func getCacheData(c *cache.Cache) schsdk.NodeUsageRateInfo { } } - for _, v := range infoMap { + for k, v := range infoMap { // 对v 进行排序 sort.Slice(v, func(i, j int) bool { return v[i].Timestamp < v[j].Timestamp }) - //switch k { - //case "MemoryUtilization": - // nodeUsageRateInfo.MemoryUtilization = v - //case "GPUUtilization": - // nodeUsageRateInfo.GPUUtilization = v - //case "CPUUtilization": - // nodeUsageRateInfo.CPUUtilization = v - //} - nodeUsageRateInfo.MemoryUtilization = v - nodeUsageRateInfo.GPUUtilization = v + switch k { + case schsdk.MemoryUtilization: + nodeUsageRateInfo.MemoryUtilization = v + case schsdk.GPUUtilization: + nodeUsageRateInfo.GPUUtilization = v + case schsdk.CPUUtilization: + nodeUsageRateInfo.CPUUtilization = v + } } return nodeUsageRateInfo diff --git a/manager/internal/mq/job.go b/manager/internal/mq/job.go index 1f24ecd..9c557f7 100644 --- a/manager/internal/mq/job.go +++ b/manager/internal/mq/job.go @@ -85,6 +85,7 @@ func (svc *Service) SubmitJobSet(msg *mgrmq.SubmitJobSet) (*mgrmq.SubmitJobSetRe }) case *schsdk.DataPreprocessJobInfo: + // 后续的调度流程跟NormalJob是一致的 normalJobInfo := &schsdk.NormalJobInfo{ Type: schsdk.JobTypeNormal, JobInfoBase: info.JobInfoBase, @@ -107,6 +108,7 @@ func (svc *Service) SubmitJobSet(msg *mgrmq.SubmitJobSet) (*mgrmq.SubmitJobSetRe }) case *schsdk.FinetuningJobInfo: + // 后续的调度流程跟NormalJob是一致的 normalJobInfo := &schsdk.NormalJobInfo{ Type: schsdk.JobTypeNormal, Files: info.Files,