优化代码

This commit is contained in:
JeshuaRen 2024-09-05 16:29:11 +08:00
parent c730220bfc
commit 96a6bb1c69
13 changed files with 112 additions and 28 deletions

View File

@ -37,8 +37,6 @@ func (s *JobSetService) Submit(ctx *gin.Context) {
return
}
println(string(bodyData))
jobSetInfo, err := serder.JSONToObjectEx[schsdk.JobSetInfo](bodyData)
if err != nil {
log.Warnf("parsing request body: %s", err.Error())

View File

@ -49,7 +49,7 @@ func InitPCMPool(cfg *pcmsdk.Config) {
var CDSRclone schsdk.Rclone
func InitRcloneConfig(rcloneID, rcloneConfigID string) {
func InitRcloneConfig(rcloneID string, rcloneConfigID string) {
CDSRclone = schsdk.Rclone{
CDSRcloneID: rcloneID,
CDSRcloneConfigID: rcloneConfigID,

View File

@ -86,6 +86,24 @@ type Models struct {
ModelName schsdk.ModelName `json:"modelName" db:"modelName"`
}
type ModelResource struct {
ModelID schsdk.ModelID `json:"modelID" db:"modelID"`
CDSStorageID cdssdk.StorageID `json:"CDSStorageID" db:"CDSStorageID"`
OjbStgID int `json:"OjbStgID" db:"OjbStgID"`
ModelPath string `json:"modelPath" db:"modelPath"`
StartShellPath string `json:"startShellPath" db:"startShellPath"`
StopShellPath string `json:"stopShellPath" db:"stopShellPath"`
FinetuningShellPath string `json:"finetuningShellPath" db:"finetuningShellPath"`
}
type ObjectStorage struct {
AK string `json:"access_key_id" db:"access_key_id"`
SK string `json:"secret_access_key" db:"secret_access_key"`
Endpoint string `json:"endpoint" db:"endpoint"`
Bucket string `json:"bucket" db:"bucket"`
CDSStorageID cdssdk.StorageID `json:"CDSStorageID" db:"CDSStorageID"`
}
func (i *CCResourceInfo) Scan(src interface{}) error {
data, ok := src.([]uint8)
if !ok {

View File

@ -2,6 +2,8 @@ package db
import (
"github.com/jmoiron/sqlx"
schsdk "gitlink.org.cn/cloudream/common/sdks/scheduler"
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
schmod "gitlink.org.cn/cloudream/scheduler/common/models"
)
@ -19,3 +21,9 @@ func (*ModelsDB) GetAll(ctx SQLContext) ([]schmod.Models, error) {
return ret, err
}
func (*ModelsDB) GetModelByID(ctx SQLContext, modelID schsdk.ModelID, CDSStorageID cdssdk.StorageID) (schmod.ModelResource, error) {
var ret schmod.ModelResource
err := sqlx.Get(ctx, &ret, "select * from ModelResource where modelID = ? and CDSStorageID = ?", modelID, CDSStorageID)
return ret, err
}

View File

@ -0,0 +1,21 @@
package db
import (
"github.com/jmoiron/sqlx"
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
schmod "gitlink.org.cn/cloudream/scheduler/common/models"
)
type ObjectStorageDB struct {
*DB
}
func (db *DB) ObjectStorage() *ObjectStorageDB {
return &ObjectStorageDB{DB: db}
}
func (*ObjectStorageDB) GetObjectStorageByStorageID(ctx SQLContext, CDSStorageID cdssdk.StorageID) (schmod.ObjectStorage, error) {
var ret schmod.ObjectStorage
err := sqlx.Get(ctx, &ret, "select * from ObjectStorage where CDSStorageID = ?", CDSStorageID)
return ret, err
}

View File

@ -1,14 +1,17 @@
package task
import (
schsdk "gitlink.org.cn/cloudream/common/sdks/scheduler"
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
)
type ScheduleCreateECS struct {
TaskInfoBase
UserID cdssdk.UserID `json:"userID"`
PackageID cdssdk.PackageID `json:"packageID"`
Command string `json:"command"`
UserID cdssdk.UserID `json:"userID"`
PackageID cdssdk.PackageID `json:"packageID"`
Command string `json:"command"`
StartShellPath string `json:"startShellPath"`
Envs []schsdk.KVPair `json:"envs"`
}
type ScheduleCreateECSStatus struct {
@ -18,11 +21,13 @@ type ScheduleCreateECSStatus struct {
Operate string `json:"operate"`
}
func NewScheduleCreateECS(userID cdssdk.UserID, packageID cdssdk.PackageID, command string) *ScheduleCreateECS {
func NewScheduleCreateECS(userID cdssdk.UserID, packageID cdssdk.PackageID, command string, startShellPath string, envs []schsdk.KVPair) *ScheduleCreateECS {
return &ScheduleCreateECS{
UserID: userID,
PackageID: packageID,
Command: command,
UserID: userID,
PackageID: packageID,
Command: command,
StartShellPath: startShellPath,
Envs: envs,
}
}

View File

@ -380,7 +380,7 @@ func (s *DefaultPreScheduler) scheduleForNormalOrMultiJob(jobSet *schsdk.JobSetI
// 这里写死,用于测试,生成环境必须删除
for i := 0; i < len(allCCsArr); i++ {
if allCCsArr[i].CC.CCID == 4 {
if allCCsArr[i].CC.CCID == 5 {
targetNode = allCCsArr[i]
}
}

View File

@ -7,6 +7,7 @@ import (
exectsk "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq/executor/task"
"gitlink.org.cn/cloudream/scheduler/common/utils"
"gitlink.org.cn/cloudream/scheduler/executor/internal/config"
"strings"
//"gitlink.org.cn/cloudream/scheduler/executor/internal/config"
"gitlink.org.cn/cloudream/scheduler/executor/internal/task/create_ecs"
@ -43,7 +44,14 @@ var ecsIP string
func (t *ScheduleCreateECS) do(task *Task, ctx TaskContext) error {
commands := utils.SplitCommands(t.Command)
var commands []string
commandContent := "sed -i '/SCH_DATA_IN/d' ~/.bashrc && echo 'export SCH_DATA_IN=" + strings.Replace(t.Envs[0].Value, "\\", "/", -1) + "' >> ~/.bashrc"
commands = append(commands, commandContent)
commandContent = "sed -i '/SCH_DATA_OUT/d' ~/.bashrc && echo 'export SCH_DATA_OUT=" + strings.Replace(t.Envs[1].Value, "\\", "/", -1) + "' >> ~/.bashrc"
commands = append(commands, commandContent)
arr := utils.SplitCommands(t.Command)
commands = append(commands, arr...)
// 创建云主机
factory := create_ecs.GetFactory(config.CloudName)

View File

@ -2,6 +2,7 @@ package config
import (
"gitlink.org.cn/cloudream/common/pkgs/logger"
schsdk "gitlink.org.cn/cloudream/common/sdks/scheduler"
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
"gitlink.org.cn/cloudream/common/utils/config"
db "gitlink.org.cn/cloudream/scheduler/common/pkgs/db/config"
@ -14,6 +15,7 @@ type Config struct {
DB db.Config `json:"db"`
CloudreamStorage cdssdk.Config `json:"cloudreamStorage"`
ReportTimeoutSecs int `json:"reportTimeoutSecs"`
CDSRclone schsdk.Rclone `json:"CDSRclone"`
}
var cfg Config

View File

@ -68,24 +68,27 @@ func (s *NormalJobExecuting) do(rtx jobmgr.JobStateRunContext, jo *jobmgr.Job) e
}
ccInfo, getStg, err := getCCInfoAndStgInfo(rtx, runningJob.TargetCCID, userID)
if err != nil {
return fmt.Errorf("getting storage info: %w", err)
}
cmd, envs := getRuntimeCommand(runningJob.Info.Runtime, runningJob.Files.Dataset.PackageID, runningJob.OutputPath, getStg.RemoteBase, *ccInfo)
err = s.submitNormalTask(rtx, cmd, envs, *ccInfo, pcmImgInfo, ress[0].PCMResourceID)
case *job.FinetuningJob:
ccInfo, err := rtx.Mgr.DB.ComputingCenter().GetByID(rtx.Mgr.DB.SQLCtx(), runningJob.TargetCCID)
ccInfo, getStg, err := getCCInfoAndStgInfo(rtx, runningJob.TargetCCID, userID)
if err != nil {
return fmt.Errorf("getting computing center info: %w", err)
return fmt.Errorf("getting storage info: %w", err)
}
_, getStg, err := getCCInfoAndStgInfo(rtx, runningJob.TargetCCID, userID)
cmd, envs := getRuntimeCommand(runningJob.Info.Runtime, runningJob.Files.Dataset.PackageID, runningJob.OutputPath, getStg.RemoteBase, ccInfo)
err = s.submitFinetuningTask(rtx, cmd, envs, ccInfo)
cmd, envs := getRuntimeCommand(runningJob.Info.Runtime, runningJob.Files.Dataset.PackageID, runningJob.OutputPath, getStg.RemoteBase, *ccInfo)
err = s.submitFinetuningTask(rtx, cmd, envs, *ccInfo)
case *job.InstanceJob:
ccInfo, err := rtx.Mgr.DB.ComputingCenter().GetByID(rtx.Mgr.DB.SQLCtx(), runningJob.TargetCCID)
ccInfo, getStg, err := getCCInfoAndStgInfo(rtx, runningJob.TargetCCID, userID)
if err != nil {
return fmt.Errorf("getting computing center info: %w", err)
return fmt.Errorf("getting storage info: %w", err)
}
err = s.submitInstanceTask(rtx, jo, runningJob, ccInfo, userID)
_, envs := getRuntimeCommand(runningJob.Info.Runtime, runningJob.Files.Dataset.PackageID, runningJob.OutputPath, getStg.RemoteBase, *ccInfo)
err = s.submitInstanceTask(rtx, jo, runningJob, *ccInfo, getStg.StorageID, userID, envs)
}
return err
}
@ -155,15 +158,33 @@ func (s *NormalJobExecuting) submitFinetuningTask(rtx jobmgr.JobStateRunContext,
return nil
}
func (s *NormalJobExecuting) submitInstanceTask(rtx jobmgr.JobStateRunContext, jo *jobmgr.Job, runningJob *job.InstanceJob, ccInfo schmod.ComputingCenter, userID cdssdk.UserID) error {
func (s *NormalJobExecuting) submitInstanceTask(rtx jobmgr.JobStateRunContext, jo *jobmgr.Job, runningJob *job.InstanceJob, ccInfo schmod.ComputingCenter,
storageID cdssdk.StorageID, userID cdssdk.UserID, envs []schsdk.KVPair) error {
modelJobInfo := runningJob.Info.ModelJobInfo
// 先从数据库中查询是否已经预置了模型
modelInfo, err := rtx.Mgr.DB.Models().GetModelByID(rtx.Mgr.DB.SQLCtx(), modelJobInfo.ModelID, storageID)
if &modelInfo == nil {
return fmt.Errorf("the model is not exists: %w", err)
}
if err != nil {
return fmt.Errorf("getting computing center info: %w", err)
}
objectStorage, err := rtx.Mgr.DB.ObjectStorage().GetObjectStorageByStorageID(rtx.Mgr.DB.SQLCtx(), storageID)
if err != nil {
return fmt.Errorf("getting object storage info: %w", err)
}
//getRcloneMountCommand()
println(objectStorage)
// 发送扩容任务
ecs := exetsk.NewScheduleCreateECS(
userID,
runningJob.Files.Dataset.PackageID,
// modelJobInfo.Command是模型更新的脚本
runningJob.Info.Runtime.Command+"\\n"+modelJobInfo.Command,
modelInfo.StartShellPath,
envs,
)
task, err := rtx.Mgr.ExecMgr.StartTask(ecs, ccInfo)
@ -213,7 +234,7 @@ func (s *NormalJobExecuting) submitInstanceTask(rtx jobmgr.JobStateRunContext, j
node := schsdk.NodeInfo{
InstanceID: jo.JobID,
Address: schsdk.Address(v2.Address),
Status: schsdk.RunStatus,
Status: schsdk.CreateECS,
}
jobmgr.SetNodeData(jo.JobSetID, modelJobInfo, node)
@ -223,10 +244,10 @@ func (s *NormalJobExecuting) submitInstanceTask(rtx jobmgr.JobStateRunContext, j
jobmgr.RemoveNodeFromRunningModels(modelJobInfo, jo.JobID)
case schsdk.PauseECS:
// 更新节点状态
jobmgr.UpdateNodeFromRunningModels(modelJobInfo, jo.JobID, schsdk.StopStatus)
jobmgr.UpdateNodeFromRunningModels(modelJobInfo, jo.JobID, schsdk.PauseECS)
case schsdk.RunECS:
// 更新节点状态
jobmgr.UpdateNodeFromRunningModels(modelJobInfo, jo.JobID, schsdk.RunStatus)
jobmgr.UpdateNodeFromRunningModels(modelJobInfo, jo.JobID, schsdk.RunECS)
}
case error:

View File

@ -64,6 +64,8 @@ func UpdateNodeFromRunningModels(modelJobInfo schsdk.ModelJobInfo, instanceID sc
if node.InstanceID == instanceID {
node.Status = status
logger.Info("update node success from running models, job id: " + instanceID)
value.Nodes[i] = node
runningModels[key] = value
break
}
}
@ -95,7 +97,7 @@ func GetNodeUsageRateInfo(customModelName schsdk.ModelName, modelID schsdk.Model
Number: "10.1",
},
{
Timestamp: strconv.FormatInt(time.Now().Unix(), 10),
Timestamp: strconv.FormatInt(time.Now().Unix()+2, 10),
Number: "20",
},
},
@ -105,7 +107,7 @@ func GetNodeUsageRateInfo(customModelName schsdk.ModelName, modelID schsdk.Model
Number: "3",
},
{
Timestamp: strconv.FormatInt(time.Now().Unix(), 10),
Timestamp: strconv.FormatInt(time.Now().Unix()+2, 10),
Number: "4.55",
},
},

View File

@ -19,7 +19,7 @@ func (svc *Service) ECSNodeRunningInfo(req *schsdk.ECSNodeRunningInfoReq) (*schs
return mq.ReplyOK(schsdk.NewECSNodeRunningInfoResp(info))
}
func (svc *Service) GetAllModels() (*mgrmq.GetAllModelsResp, *mq.CodeMessage) {
func (svc *Service) GetAllModels(msg *mgrmq.GetAllModels) (*mgrmq.GetAllModelsResp, *mq.CodeMessage) {
models, err := svc.db.Models().GetAll(svc.db.SQLCtx())
if err != nil {
logger.Warnf("getting all models: %s", err.Error())

View File

@ -33,6 +33,7 @@ func main() {
schglb.InitMQPool(&config.Cfg().RabbitMQ)
schglb.InitCloudreamStoragePool(&config.Cfg().CloudreamStorage)
schglb.InitRcloneConfig(config.Cfg().CDSRclone.CDSRcloneID, "")
exeMgr, err := executormgr.NewManager(time.Duration(config.Cfg().ReportTimeoutSecs) * time.Second)
if err != nil {