新增finetuning任务类型

This commit is contained in:
JeshuaRen 2024-09-03 16:51:24 +08:00
parent 6d14924693
commit c730220bfc
10 changed files with 297 additions and 126 deletions

View File

@ -21,6 +21,7 @@ var _ = serder.UseTypeUnionExternallyTagged(types.Ref(types.NewTypeUnion[JobBody
(*InstanceJobDump)(nil),
(*MultiInstanceJobDump)(nil),
(*UpdateMultiInstanceJobDump)(nil),
(*FinetuningJobDump)(nil),
)))
type NormalJobDump struct {
@ -34,6 +35,17 @@ func (d *NormalJobDump) getType() JobBodyDumpType {
return d.Type
}
type FinetuningJobDump struct {
serder.Metadata `union:"FinetuningJob"`
Type JobBodyDumpType `json:"type"`
TargetCCID schsdk.CCID `json:"targetCCID"`
Files JobFiles `json:"files"`
}
func (d *FinetuningJobDump) getType() JobBodyDumpType {
return d.Type
}
type DataReturnJobDump struct {
serder.Metadata `union:"DataReturnJob"`
Type JobBodyDumpType `json:"type"`

View File

@ -0,0 +1,31 @@
package task
import schsdk "gitlink.org.cn/cloudream/common/sdks/scheduler"
type SchedulerModelFinetuning struct {
TaskInfoBase
CMD string `json:"cmd"`
Envs []schsdk.KVPair `json:"envs"`
}
type SchedulerModelFinetuningStatus struct {
TaskStatusBase
Error error `json:"error"`
}
func NewSchedulerModelFinetuning(cmd string, envs []schsdk.KVPair) *SchedulerModelFinetuning {
return &SchedulerModelFinetuning{
CMD: cmd,
Envs: envs,
}
}
func NewSchedulerModelFinetuningStatus(err error) *SchedulerModelFinetuningStatus {
return &SchedulerModelFinetuningStatus{
Error: err,
}
}
func init() {
Register[*SchedulerModelFinetuning, *SchedulerModelFinetuningStatus]()
}

View File

@ -6,8 +6,6 @@ import (
pcmsdk "gitlink.org.cn/cloudream/common/sdks/pcm"
schglb "gitlink.org.cn/cloudream/scheduler/common/globals"
exectsk "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq/executor/task"
"gitlink.org.cn/cloudream/scheduler/common/utils"
"strings"
)
type PCMSubmitTask struct {
@ -25,9 +23,7 @@ func (t *PCMSubmitTask) Execute(task *Task, ctx TaskContext) {
log.Debugf("begin with %v", logger.FormatStruct(t.SubmitTask))
defer log.Debugf("end")
//err := t.do(task, ctx)
err := t.finetuning_test(task, ctx)
//err := error(nil)
err := t.do(task, ctx)
if err != nil {
//TODO 若任务失败上报的状态failed字段根据情况修改
task.SendStatus(exectsk.NewSubmitTaskStatus("failed", err.Error()))
@ -96,24 +92,3 @@ func (t *PCMSubmitTask) do(task *Task, ctx TaskContext) error {
func init() {
Register(NewPCMSubmitTask)
}
func (t *PCMSubmitTask) finetuning_test(task *Task, ctx TaskContext) error {
var commands []string
commandContent := "sed -i '/SCH_DATA_IN/d' ~/.bashrc && echo 'export SCH_DATA_IN=" + strings.Replace(t.Envs[0].Value, "\\", "/", -1) + "' >> ~/.bashrc"
commands = append(commands, commandContent)
commandContent = "sed -i '/SCH_DATA_OUT/d' ~/.bashrc && echo 'export SCH_DATA_OUT=" + strings.Replace(t.Envs[1].Value, "\\", "/", -1) + "' >> ~/.bashrc"
commands = append(commands, commandContent)
arr := utils.SplitCommands(t.CMD)
commands = append(commands, arr...)
//factory := create_ecs.GetFactory(config.CloudName)
//provider := factory.CreateProvider()
//_, err := provider.RunCommand(commands, "i-bp1ikwdsr5r9p5i9mggm", 2000)
//if err != nil {
// return err
//}
return nil
}

View File

@ -0,0 +1,72 @@
package task
import (
"gitlink.org.cn/cloudream/common/pkgs/logger"
exectsk "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq/executor/task"
"gitlink.org.cn/cloudream/scheduler/common/utils"
"gitlink.org.cn/cloudream/scheduler/executor/internal/config"
"gitlink.org.cn/cloudream/scheduler/executor/internal/task/create_ecs"
"strings"
)
type SchedulerModelFinetuning struct {
*exectsk.SchedulerModelFinetuning
}
func NewSchedulerModelFinetuning(info *exectsk.SchedulerModelFinetuning) *SchedulerModelFinetuning {
return &SchedulerModelFinetuning{info}
}
func (t *SchedulerModelFinetuning) Execute(task *Task, ctx TaskContext) {
log := logger.WithType[SchedulerModelFinetuning]("Task")
log.Debugf("begin")
defer log.Debugf("end")
//task.SendStatus(exectsk.NewScheduleCreateECSStatus("address", ""))
err := t.do(task, ctx)
if err != nil {
log.Error(err)
return
}
log.Info("ScheduleCreateECS...")
}
func (t *SchedulerModelFinetuning) do(task *Task, ctx TaskContext) error {
var commands []string
commandContent := "sed -i '/SCH_DATA_IN/d' ~/.bashrc && echo 'export SCH_DATA_IN=" + strings.Replace(t.Envs[0].Value, "\\", "/", -1) + "' >> ~/.bashrc"
commands = append(commands, commandContent)
commandContent = "sed -i '/SCH_DATA_OUT/d' ~/.bashrc && echo 'export SCH_DATA_OUT=" + strings.Replace(t.Envs[1].Value, "\\", "/", -1) + "' >> ~/.bashrc"
commands = append(commands, commandContent)
arr := utils.SplitCommands(t.CMD)
commands = append(commands, arr...)
factory := create_ecs.GetFactory(config.CloudName)
provider := factory.CreateProvider()
// 创建服务器
//instanceID, ecsIP, err := provider.CreateServer()
//if err != nil {
// task.SendStatus(exectsk.NewSchedulerModelFinetuningStatus(err.Error()))
// return err
//}
//logger.Info("create ECS success, instance id: " + instanceID + ", ip: " + ecsIP)
// 执行微调任务
_, err := provider.RunCommand(commands, "i-bp1ikwdsr5r9p5i9mggm", 2000)
// 执行结束后销毁服务器
//_, err2 := provider.DeleteInstance(instanceID)
//if err2 != nil {
// task.SendStatus(exectsk.NewScheduleCreateECSStatus("", "", err.Error()))
//}
if err != nil {
return err
}
return nil
}
func init() {
Register(NewSchedulerModelFinetuning)
}

View File

@ -0,0 +1,30 @@
package job
import (
schsdk "gitlink.org.cn/cloudream/common/sdks/scheduler"
jobmod "gitlink.org.cn/cloudream/scheduler/common/models/job"
)
type FinetuningJob struct {
Info schsdk.FinetuningJobInfo // 提交任务时提供的任务描述信息
Files jobmod.JobFiles // 任务需要的文件
TargetCCID schsdk.CCID // 将要运行此任务的算力中心ID
OutputPath string // 程序结果输出路径一个相对路径需要加上CDS数据库中记录的RemoteBase才是完整路径
}
func NewFinetuningJob(info schsdk.FinetuningJobInfo) *FinetuningJob {
return &FinetuningJob{
Info: info,
}
}
func (j *FinetuningJob) GetInfo() schsdk.JobInfo {
return &j.Info
}
func (j *FinetuningJob) Dump() jobmod.JobBodyDump {
return &jobmod.FinetuningJobDump{
Files: j.Files,
TargetCCID: j.TargetCCID,
}
}

View File

@ -79,6 +79,11 @@ func (s *Adjusting) do(rtx jobmgr.JobStateRunContext, jo *jobmgr.Job) error {
jobFiles = &runningJob.Files
runningJob.TargetCCID = s.scheme.TargetCCID
runningJob.OutputPath = outputPath
case *job.FinetuningJob:
jobFilesInfo = runningJob.Info.Files
jobFiles = &runningJob.Files
runningJob.TargetCCID = s.scheme.TargetCCID
runningJob.OutputPath = outputPath
case *job.MultiInstanceJob:
jobFilesInfo = runningJob.Info.Files
jobFiles = &runningJob.Files

View File

@ -8,9 +8,7 @@ import (
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
schmod "gitlink.org.cn/cloudream/scheduler/common/models"
"gitlink.org.cn/cloudream/scheduler/common/pkgs/mq/executor"
mgrmq "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq/manager"
"gitlink.org.cn/cloudream/scheduler/manager/internal/executormgr"
jobTask "gitlink.org.cn/cloudream/scheduler/manager/internal/task"
"path/filepath"
"time"
@ -51,112 +49,53 @@ func (s *NormalJobExecuting) Dump(rtx jobmgr.JobStateRunContext, jo *jobmgr.Job)
}
func (s *NormalJobExecuting) do(rtx jobmgr.JobStateRunContext, jo *jobmgr.Job) error {
log := logger.WithType[NormalJobExecuting]("State").WithField("JobID", jo.JobID)
// TODO UserID
userID := cdssdk.UserID(1)
var runtime *schsdk.JobRuntimeInfo
var jobFiles *jobmod.JobFiles
var targetCCID schsdk.CCID
var outputPath string
var modelJobInfo *schsdk.ModelJobInfo
var packageID cdssdk.PackageID
err := error(nil)
switch runningJob := jo.Body.(type) {
case *job.NormalJob:
runtime = &runningJob.Info.Runtime
jobFiles = &runningJob.Files
targetCCID = runningJob.TargetCCID
outputPath = runningJob.OutputPath
packageID = runningJob.Files.Dataset.PackageID
case *job.InstanceJob:
runtime = &runningJob.Info.Runtime
jobFiles = &runningJob.Files
targetCCID = runningJob.TargetCCID
outputPath = runningJob.OutputPath
modelJobInfo = &runningJob.Info.ModelJobInfo
packageID = runningJob.Files.Dataset.PackageID
}
pcmImgInfo, err := rtx.Mgr.DB.PCMImage().GetByImageIDAndCCID(rtx.Mgr.DB.SQLCtx(), jobFiles.Image.ImageID, targetCCID)
if err != nil {
return fmt.Errorf("getting pcm image info: %w", err)
}
ccInfo, err := rtx.Mgr.DB.ComputingCenter().GetByID(rtx.Mgr.DB.SQLCtx(), targetCCID)
if err != nil {
return fmt.Errorf("getting computing center info: %w", err)
}
ress, err := rtx.Mgr.DB.CCResource().GetByCCID(rtx.Mgr.DB.SQLCtx(), targetCCID)
if err != nil {
return fmt.Errorf("getting computing center resource: %w", err)
}
if len(ress) == 0 {
return fmt.Errorf("no resource found at computing center %v", targetCCID)
}
stgCli, err := schglb.CloudreamStoragePool.Acquire()
if err != nil {
return fmt.Errorf("new cds client: %w", err)
}
defer schglb.CloudreamStoragePool.Release(stgCli)
getStg, err := stgCli.StorageGet(cdssdk.StorageGet{
UserID: userID,
StorageID: ccInfo.CDSStorageID,
})
if err != nil {
return fmt.Errorf("request to cds: %w", err)
}
// TODO 判断是否是模型推理任务,如果是,则进行扩缩容管理
if modelJobInfo != nil {
// 发送扩容任务
ecs := exetsk.NewScheduleCreateECS(
userID,
packageID,
// modelJobInfo.Command是模型更新的脚本
runtime.Command+"\\n"+modelJobInfo.Command,
)
task, err := rtx.Mgr.ExecMgr.StartTask(ecs, ccInfo)
pcmImgInfo, err := rtx.Mgr.DB.PCMImage().GetByImageIDAndCCID(rtx.Mgr.DB.SQLCtx(), runningJob.Files.Image.ImageID, runningJob.TargetCCID)
if err != nil {
log.Error(err.Error())
return err
return fmt.Errorf("getting pcm image info: %w", err)
}
return s.listen(rtx, jo, task, ccInfo, *modelJobInfo)
}
// 判断算力中心是否支持环境变量配置如果不支持则读取脚本内容并拼接在Command参数后面
var envs []schsdk.KVPair
var params []string
var cmd string
// TODO 临时使用这个路径应该来自于CDS
dataSetPath := filepath.Join("packages", "1", fmt.Sprintf("%v", jobFiles.Dataset.PackageID))
envs = append(envs, schsdk.KVPair{Key: schsdk.JobDataInEnv, Value: filepath.Join(getStg.RemoteBase, dataSetPath)})
envs = append(envs, schsdk.KVPair{Key: schsdk.JobDataOutEnv, Value: filepath.Join(getStg.RemoteBase, outputPath)})
envs = append(envs, runtime.Envs...)
switch boot := ccInfo.Bootstrap.(type) {
case *schsdk.DirectBootstrap:
cmd = runtime.Command
case *schsdk.NoEnvBootstrap:
cmd = boot.ScriptFileName
params = append(params, runtime.Command)
envMap := lo.Map(envs, func(env schsdk.KVPair, _ int) string {
return fmt.Sprintf("%s=%s", env.Key, env.Value)
})
params = append(params, envMap...)
default:
cmd = runtime.Command
ress, err := rtx.Mgr.DB.CCResource().GetByCCID(rtx.Mgr.DB.SQLCtx(), runningJob.TargetCCID)
if err != nil {
return fmt.Errorf("getting computing center resource: %w", err)
}
if len(ress) == 0 {
return fmt.Errorf("no resource found at computing center %v", runningJob.TargetCCID)
}
ccInfo, getStg, err := getCCInfoAndStgInfo(rtx, runningJob.TargetCCID, userID)
cmd, envs := getRuntimeCommand(runningJob.Info.Runtime, runningJob.Files.Dataset.PackageID, runningJob.OutputPath, getStg.RemoteBase, *ccInfo)
err = s.submitNormalTask(rtx, cmd, envs, *ccInfo, pcmImgInfo, ress[0].PCMResourceID)
case *job.FinetuningJob:
ccInfo, err := rtx.Mgr.DB.ComputingCenter().GetByID(rtx.Mgr.DB.SQLCtx(), runningJob.TargetCCID)
if err != nil {
return fmt.Errorf("getting computing center info: %w", err)
}
_, getStg, err := getCCInfoAndStgInfo(rtx, runningJob.TargetCCID, userID)
cmd, envs := getRuntimeCommand(runningJob.Info.Runtime, runningJob.Files.Dataset.PackageID, runningJob.OutputPath, getStg.RemoteBase, ccInfo)
err = s.submitFinetuningTask(rtx, cmd, envs, ccInfo)
case *job.InstanceJob:
ccInfo, err := rtx.Mgr.DB.ComputingCenter().GetByID(rtx.Mgr.DB.SQLCtx(), runningJob.TargetCCID)
if err != nil {
return fmt.Errorf("getting computing center info: %w", err)
}
err = s.submitInstanceTask(rtx, jo, runningJob, ccInfo, userID)
}
return err
}
func (s *NormalJobExecuting) submitNormalTask(rtx jobmgr.JobStateRunContext, cmd string, envs []schsdk.KVPair, ccInfo schmod.ComputingCenter, pcmImgInfo schmod.PCMImage, resourceID pcmsdk.ResourceID) error {
task, err := rtx.Mgr.ExecMgr.StartTask(exetsk.NewSubmitTask(
ccInfo.PCMParticipantID,
pcmImgInfo.PCMImageID,
// TODO 选择资源的算法
ress[0].PCMResourceID,
resourceID,
cmd,
envs,
// params, TODO params不应该是kv数组而应该是字符串数组
@ -164,7 +103,7 @@ func (s *NormalJobExecuting) do(rtx jobmgr.JobStateRunContext, jo *jobmgr.Job) e
), ccInfo)
if err != nil {
log.Error(err.Error())
logger.Error(err.Error())
return err
}
@ -174,7 +113,7 @@ func (s *NormalJobExecuting) do(rtx jobmgr.JobStateRunContext, jo *jobmgr.Job) e
tskStatus := msg.Value.Status.(*exetsk.SubmitTaskStatus)
if tskStatus.Status != s.lastStatus {
log.Infof("task %s -> %s", s.lastStatus, tskStatus.Status)
logger.Infof("task %s -> %s", s.lastStatus, tskStatus.Status)
}
s.lastStatus = tskStatus.Status
@ -193,8 +132,45 @@ func (s *NormalJobExecuting) do(rtx jobmgr.JobStateRunContext, jo *jobmgr.Job) e
}
}
func (s *NormalJobExecuting) listen(rtx jobmgr.JobStateRunContext, jo *jobmgr.Job, task *jobTask.JobTask[mgrmq.ExecutorTaskStatus], ccInfo schmod.ComputingCenter, modelJobInfo schsdk.ModelJobInfo) error {
log := logger.WithType[NormalJobExecuting]("State").WithField("TaskID", task.ID())
func (s *NormalJobExecuting) submitFinetuningTask(rtx jobmgr.JobStateRunContext, cmd string, envs []schsdk.KVPair, ccInfo schmod.ComputingCenter) error {
task, err := rtx.Mgr.ExecMgr.StartTask(exetsk.NewSchedulerModelFinetuning(
cmd,
envs,
), ccInfo)
if err != nil {
logger.Error(err.Error())
return err
}
taskFut := task.Receive()
msg := <-taskFut.Chan()
tskStatus := msg.Value.Status.(*exetsk.SchedulerModelFinetuningStatus)
if tskStatus.Error != nil {
logger.Error(tskStatus.Error.Error())
return tskStatus.Error
}
return nil
}
func (s *NormalJobExecuting) submitInstanceTask(rtx jobmgr.JobStateRunContext, jo *jobmgr.Job, runningJob *job.InstanceJob, ccInfo schmod.ComputingCenter, userID cdssdk.UserID) error {
modelJobInfo := runningJob.Info.ModelJobInfo
// 发送扩容任务
ecs := exetsk.NewScheduleCreateECS(
userID,
runningJob.Files.Dataset.PackageID,
// modelJobInfo.Command是模型更新的脚本
runningJob.Info.Runtime.Command+"\\n"+modelJobInfo.Command,
)
task, err := rtx.Mgr.ExecMgr.StartTask(ecs, ccInfo)
if err != nil {
logger.Error(err.Error())
return err
}
waitFut := event.BeginWaitType[*event.Update](rtx.EventSet)
taskFut := task.Receive()
@ -241,7 +217,7 @@ func (s *NormalJobExecuting) listen(rtx jobmgr.JobStateRunContext, jo *jobmgr.Jo
}
jobmgr.SetNodeData(jo.JobSetID, modelJobInfo, node)
log.Infof("node expansion: %v", v2.Address)
logger.Infof("node expansion: %v", v2.Address)
case schsdk.DestroyECS:
// 缩容任务,将节点从节点中移除
jobmgr.RemoveNodeFromRunningModels(modelJobInfo, jo.JobID)
@ -266,6 +242,56 @@ func (s *NormalJobExecuting) listen(rtx jobmgr.JobStateRunContext, jo *jobmgr.Jo
}
}
// 判断算力中心是否支持环境变量配置如果不支持则读取脚本内容并拼接在Command参数后面
func getRuntimeCommand(runtime schsdk.JobRuntimeInfo, packageID cdssdk.PackageID, outputPath string, remoteBase string, ccInfo schmod.ComputingCenter) (string, []schsdk.KVPair) {
var envs []schsdk.KVPair
var params []string
var cmd string
// TODO 临时使用这个路径应该来自于CDS
dataSetPath := filepath.Join("packages", "1", fmt.Sprintf("%v", packageID))
envs = append(envs, schsdk.KVPair{Key: schsdk.JobDataInEnv, Value: filepath.Join(remoteBase, dataSetPath)})
envs = append(envs, schsdk.KVPair{Key: schsdk.JobDataOutEnv, Value: filepath.Join(remoteBase, outputPath)})
envs = append(envs, runtime.Envs...)
switch boot := ccInfo.Bootstrap.(type) {
case *schsdk.DirectBootstrap:
cmd = runtime.Command
case *schsdk.NoEnvBootstrap:
cmd = boot.ScriptFileName
params = append(params, runtime.Command)
envMap := lo.Map(envs, func(env schsdk.KVPair, _ int) string {
return fmt.Sprintf("%s=%s", env.Key, env.Value)
})
params = append(params, envMap...)
default:
cmd = runtime.Command
}
return cmd, envs
}
func getCCInfoAndStgInfo(rtx jobmgr.JobStateRunContext, targetCCID schsdk.CCID, userID cdssdk.UserID) (*schmod.ComputingCenter, *cdssdk.StorageGetResp, error) {
ccInfo, err := rtx.Mgr.DB.ComputingCenter().GetByID(rtx.Mgr.DB.SQLCtx(), targetCCID)
if err != nil {
return nil, nil, fmt.Errorf("getting computing center info: %w", err)
}
stgCli, err := schglb.CloudreamStoragePool.Acquire()
if err != nil {
return nil, nil, fmt.Errorf("new cds client: %w", err)
}
defer schglb.CloudreamStoragePool.Release(stgCli)
getStg, err := stgCli.StorageGet(cdssdk.StorageGet{
UserID: userID,
StorageID: ccInfo.CDSStorageID,
})
if err != nil {
return nil, nil, fmt.Errorf("request to cds: %w", err)
}
return &ccInfo, getStg, nil
}
type DataReturnJobExecuting struct {
}

View File

@ -42,6 +42,10 @@ func (s *PreScheduling) Run(rtx jobmgr.JobStateRunContext, jo *jobmgr.Job) {
jobFilesInfo = runningJob.Info.Files
jobFiles = &runningJob.Files
runningJob.TargetCCID = s.scheme.TargetCCID
case *job.FinetuningJob:
jobFilesInfo = runningJob.Info.Files
jobFiles = &runningJob.Files
runningJob.TargetCCID = s.scheme.TargetCCID
case *job.MultiInstanceJob:
jobFilesInfo = runningJob.Info.Files
jobFiles = &runningJob.Files

View File

@ -35,6 +35,9 @@ func (s *ReadyToAdjust) do(rtx jobmgr.JobStateRunContext, jo *jobmgr.Job) error
case *job.NormalJob:
jobFilesInfo = runningJob.Info.Files
jobFiles = &runningJob.Files
case *job.FinetuningJob:
jobFilesInfo = runningJob.Info.Files
jobFiles = &runningJob.Files
case *job.InstanceJob:
jobFilesInfo = runningJob.Info.Files
jobFiles = &runningJob.Files

View File

@ -82,6 +82,19 @@ func (svc *Service) SubmitJobSet(msg *mgrmq.SubmitJobSet) (*mgrmq.SubmitJobSetRe
Body: modelJob,
InitState: state.NewMultiInstanceUpdate(multiInstanceJobDump),
})
case *schsdk.FinetuningJobInfo:
jo := job.NewFinetuningJob(*info)
preSch, ok := msg.PreScheduleScheme.JobSchemes[info.LocalJobID]
if !ok {
return nil, mq.Failed(errorcode.OperationFailed, fmt.Sprintf("pre schedule scheme for job %s is not found", info.LocalJobID))
}
jobs = append(jobs, jobmgr.SubmittingJob{
Body: jo,
InitState: state.NewPreSchuduling(preSch),
})
}
}