新增节点探活功能

This commit is contained in:
JeshuaRen 2024-09-20 10:35:07 +08:00
parent 6bb9c85e17
commit d69ea7dc48
14 changed files with 157 additions and 69 deletions

View File

@ -55,3 +55,9 @@ func InitRcloneConfig(rcloneID string, rcloneConfigID string) {
CDSRcloneConfigID: rcloneConfigID,
}
}
var InferencePlatform schsdk.InferencePlatform
func InitInferencePlatform(cfg schsdk.InferencePlatform) {
InferencePlatform = cfg
}

View File

@ -87,15 +87,14 @@ type Models struct {
}
type ModelResource struct {
ModelID int64 `json:"modelID" db:"modelID"`
CDSStorageID cdssdk.StorageID `json:"CDSStorageID" db:"CDSStorageID"`
OjbStgID int `json:"OjbStgID" db:"OjbStgID"`
ModelPath string `json:"modelPath" db:"modelPath"`
StartShellPath string `json:"startShellPath" db:"startShellPath"`
ServerPort int `json:"serverPort" db:"serverPort"`
ServerUrlPath string `json:"serverUrlPath" db:"serverUrlPath"`
StopShellPath string `json:"stopShellPath" db:"stopShellPath"`
FinetuningShellPath string `json:"finetuningShellPath" db:"finetuningShellPath"`
ModelID int64 `json:"modelID" db:"modelID"`
OjbStgID int64 `json:"OjbStgID" db:"OjbStgID"`
ModelPath string `json:"modelPath" db:"modelPath"`
StartShellPath string `json:"startShellPath" db:"startShellPath"`
ServerPort int64 `json:"serverPort" db:"serverPort"`
ServerUrlPath string `json:"serverUrlPath" db:"serverUrlPath"`
StopShellPath string `json:"stopShellPath" db:"stopShellPath"`
FinetuningShellPath string `json:"finetuningShellPath" db:"finetuningShellPath"`
}
type ObjectStorage struct {

View File

@ -3,7 +3,6 @@ package db
import (
"github.com/jmoiron/sqlx"
schsdk "gitlink.org.cn/cloudream/common/sdks/scheduler"
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
schmod "gitlink.org.cn/cloudream/scheduler/common/models"
)
@ -22,8 +21,8 @@ func (*ModelsDB) GetAll(ctx SQLContext) ([]schmod.Models, error) {
return ret, err
}
func (*ModelsDB) GetModelByID(ctx SQLContext, modelID schsdk.ModelID, CDSStorageID cdssdk.StorageID) (schmod.ModelResource, error) {
func (*ModelsDB) GetModelByID(ctx SQLContext, modelID schsdk.ModelID, OjbStgID int64) (schmod.ModelResource, error) {
var ret schmod.ModelResource
err := sqlx.Get(ctx, &ret, "select * from ModelResource where modelID = ? and CDSStorageID = ?", modelID, CDSStorageID)
err := sqlx.Get(ctx, &ret, "select * from ModelResource where modelID = ? and OjbStgID = ?", modelID, OjbStgID)
return ret, err
}

View File

@ -16,6 +16,6 @@ func (db *DB) ObjectStorage() *ObjectStorageDB {
func (*ObjectStorageDB) GetObjectStorageByStorageID(ctx SQLContext, CDSStorageID cdssdk.StorageID) (schmod.ObjectStorage, error) {
var ret schmod.ObjectStorage
err := sqlx.Get(ctx, &ret, "select access_key_id, secret_access_key, endpoint, bucket, CDSStorageID, mountType from ObjectStorage where CDSStorageID = ?", CDSStorageID)
err := sqlx.Get(ctx, &ret, "select * from ObjectStorage where CDSStorageID = ?", CDSStorageID)
return ret, err
}

View File

@ -4,8 +4,10 @@ import schsdk "gitlink.org.cn/cloudream/common/sdks/scheduler"
type SchedulerModelFinetuning struct {
TaskInfoBase
CMD string `json:"cmd"`
Envs []schsdk.KVPair `json:"envs"`
Type string `json:"type"`
CMD string `json:"cmd"`
Envs []schsdk.KVPair `json:"envs"`
InferencePlatform schsdk.InferencePlatform `json:"inferencePlatform"`
}
type SchedulerModelFinetuningStatus struct {
@ -13,10 +15,11 @@ type SchedulerModelFinetuningStatus struct {
Error error `json:"error"`
}
func NewSchedulerModelFinetuning(cmd string, envs []schsdk.KVPair) *SchedulerModelFinetuning {
func NewSchedulerModelFinetuning(cmd string, envs []schsdk.KVPair, inferencePlatform schsdk.InferencePlatform) *SchedulerModelFinetuning {
return &SchedulerModelFinetuning{
CMD: cmd,
Envs: envs,
CMD: cmd,
Envs: envs,
InferencePlatform: inferencePlatform,
}
}

View File

@ -16,14 +16,15 @@ type Application struct {
}
type Config struct {
Logger log.Config `json:"logger"`
ReportIntervalSec int `json:"reportIntervalSec"`
RabbitMQ mymq.Config `json:"rabbitMQ"`
CloudreamStorage cdssdk.Config `json:"cloudreamStorage"`
PCM pcmsdk.Config `json:"pcm"`
Application Application `json:"application"`
Rclone schsdk.Rclone `json:"rclone"`
CloudECS map[string]interface{} `json:"createECS"`
Logger log.Config `json:"logger"`
ReportIntervalSec int `json:"reportIntervalSec"`
RabbitMQ mymq.Config `json:"rabbitMQ"`
CloudreamStorage cdssdk.Config `json:"cloudreamStorage"`
PCM pcmsdk.Config `json:"pcm"`
Application Application `json:"application"`
Rclone schsdk.Rclone `json:"rclone"`
CloudECS map[string]interface{} `json:"createECS"`
InferencePlatform schsdk.InferencePlatform `json:"inferencePlatform"`
}
var cfg Config

View File

@ -275,3 +275,9 @@ func getInstanceIP(instanceID string, regionId string) (string, error) {
return "", nil
}
func (a *AliCloud) AvailableCheck(instanceID string) bool {
// 待实现
return true
}

View File

@ -10,6 +10,7 @@ type CloudProvider interface {
StopInstance(instanceID string) (string, error)
RebootInstances(instanceID string) (string, error)
StartInstances(instanceID string) (string, error)
AvailableCheck(instanceID string) bool
}
type CloudFactory interface {

View File

@ -12,6 +12,11 @@ import (
// HuaweiCloud实现了CloudProvider接口
type HuaweiCloud struct{}
func (a *HuaweiCloud) AvailableCheck(instanceID string) bool {
//TODO implement me
panic("implement me")
}
func (a *HuaweiCloud) StartInstances(instanceID string) (string, error) {
//TODO implement me
panic("implement me")

View File

@ -112,6 +112,7 @@ type SugonCloud struct {
var ecsConfig map[string]interface{}
var authConfig map[string]interface{}
var instanceKV map[string]string
var sugonClient exemq.HttpClient
var efileClient exemq.HttpClient
@ -119,6 +120,7 @@ func SugonCloudConfig(authConfigs map[string]interface{}, ecsConfigs map[string]
authConfigs["get_token_url"] = "https://ac.sugon.com/ac/openapi/v2/tokens"
authConfig = authConfigs
ecsConfig = ecsConfigs
instanceKV = make(map[string]string)
// 获取token
token, err := getToken(authConfigs)
@ -144,9 +146,6 @@ func SugonCloudConfig(authConfigs map[string]interface{}, ecsConfigs map[string]
efileClient = *ec
}
func (s *SugonCloud) CreateServer111() (string, string, error) {
return "", "", fmt.Errorf("not support")
}
func (s *SugonCloud) CreateServer() (string, string, error) {
instanceServiceName := "auto_instance_" + time.Now().Format("20060102150405")
@ -192,6 +191,7 @@ func (s *SugonCloud) CreateServer() (string, string, error) {
url, err := sugonClient.GetInstanceUrl(token, instanceID)
logger.Info("create ecs success, instanceID: " + instanceID + " url: " + url)
instanceKV[instanceID] = instanceServiceName
return instanceID, url, nil
}
@ -220,8 +220,9 @@ func (s *SugonCloud) RunCommand(commands []string, instanceID string, timeout in
time.Sleep(3 * time.Second)
continue
}
return content, err
break
}
return content, err
}
_, err := sugonClient.RunCommand(token, instanceID, commands[i])
@ -270,3 +271,17 @@ func (s *SugonCloud) StartInstances(instanceID string) (string, error) {
instance, err := sugonClient.OperateSugonInstance(token, instanceID, schsdk.RunECS)
return instance, err
}
func (s *SugonCloud) AvailableCheck(instanceID string) bool {
instanceName, ok := instanceKV[instanceID]
if !ok {
return false
}
token, _ := getToken(authConfig)
instanceID, _, err := sugonClient.GetInstanceID(token, instanceName)
if err != nil || instanceID == "" {
logger.Error(err.Error())
return false
}
return true
}

View File

@ -34,7 +34,6 @@ func (t *ScheduleCreateECS) Execute(task *Task, ctx TaskContext) {
log.Debugf("begin")
defer log.Debugf("end")
//task.SendStatus(exectsk.NewScheduleCreateECSStatus("address", ""))
err := t.do(task, ctx)
if err != nil {
log.Error(err)
@ -49,7 +48,7 @@ func (t *ScheduleCreateECS) do(task *Task, ctx TaskContext) error {
factory := create_ecs.GetFactory(config.CloudName)
provider := factory.CreateProvider()
instanceID, ecsIP, err := provider.CreateServer()
//instanceID, ecsIP, err := "i-bp19q01cjmr62vstszh3", "47.98.122.29", error(nil)
//instanceID, ecsIP, err := "i-bp18see6gypratlt3nhp", "47.96.28.209", error(nil)
if err != nil {
task.SendStatus(exectsk.NewScheduleCreateECSStatus("", schsdk.CreateECS, err.Error()))
return err
@ -87,12 +86,12 @@ func (t *ScheduleCreateECS) do(task *Task, ctx TaskContext) error {
arr := utils.SplitCommands(t.Command)
commands = append(commands, arr...)
_, err = provider.RunCommand(commands, instanceID, 2000)
if err != nil {
logger.Error("run command error: " + err.Error())
}
//_, err = provider.RunCommand(commands, instanceID, 2000)
//if err != nil {
// logger.Error("run command error: " + err.Error())
//}
address := "http://" + ecsIP + ":" + strconv.Itoa(t.ModelResource.ServerPort) + "/" + t.ModelResource.ServerUrlPath
address := "http://" + ecsIP + ":" + strconv.FormatInt(t.ModelResource.ServerPort, 10) + "/" + t.ModelResource.ServerUrlPath
if config.CloudName == schmod.SugonCloud {
address = ecsIP + "/" + t.ModelResource.ServerUrlPath
address = strings.Replace(address, "//", "/", -1)
@ -138,6 +137,7 @@ func (t *ScheduleCreateECS) do(task *Task, ctx TaskContext) error {
continue
}
task.SendStatus(exectsk.NewScheduleCreateECSStatus("", schsdk.DestroyECS, ""))
break
case schsdk.OperateServer:
executeCommands(provider, instanceID, task, info.Runtime)
case schsdk.GPUMonitor:
@ -151,12 +151,18 @@ func (t *ScheduleCreateECS) do(task *Task, ctx TaskContext) error {
res, err = provider.RunCommand(commands, instanceID, 2000)
}
if err != nil {
// 如果命令执行失败,判断机器是否可用
check := provider.AvailableCheck(instanceID)
if !check {
task.SendStatus(exectsk.NewScheduleCreateECSStatus("", schsdk.Invalid, err.Error()))
break
}
task.SendStatus(exectsk.NewScheduleCreateECSStatus("", "", err.Error()))
continue
}
task.SendStatus(exectsk.NewScheduleCreateECSStatus(res, schsdk.GPUMonitor, ""))
default:
//executeCommands(provider, instanceID, task, info.Command)
executeCommands(provider, instanceID, task, info.Runtime)
}
}

View File

@ -1,7 +1,9 @@
package task
import (
"errors"
"gitlink.org.cn/cloudream/common/pkgs/logger"
schsdk "gitlink.org.cn/cloudream/common/sdks/scheduler"
exectsk "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq/executor/task"
"gitlink.org.cn/cloudream/scheduler/common/utils"
"gitlink.org.cn/cloudream/scheduler/executor/internal/config"
@ -22,7 +24,6 @@ func (t *SchedulerModelFinetuning) Execute(task *Task, ctx TaskContext) {
log.Debugf("begin")
defer log.Debugf("end")
//task.SendStatus(exectsk.NewScheduleCreateECSStatus("address", ""))
err := t.do(task, ctx)
if err != nil {
log.Error(err)
@ -33,11 +34,16 @@ func (t *SchedulerModelFinetuning) Execute(task *Task, ctx TaskContext) {
}
func (t *SchedulerModelFinetuning) do(task *Task, ctx TaskContext) error {
var commands []string
commandContent := "sed -i '/SCH_DATA_IN/d' ~/.bashrc && echo 'export SCH_DATA_IN=" + strings.Replace(t.Envs[0].Value, "\\", "/", -1) + "' >> ~/.bashrc"
commands = append(commands, commandContent)
commandContent = "sed -i '/SCH_DATA_OUT/d' ~/.bashrc && echo 'export SCH_DATA_OUT=" + strings.Replace(t.Envs[1].Value, "\\", "/", -1) + "' >> ~/.bashrc"
commands = append(commands, commandContent)
// 设置环境变量
commands := utils.ConvertEnvsToCommand(t.Envs)
if t.Type == schsdk.DataPreprocess {
_, err := getDataPreprocessCommands(t.Envs, t.InferencePlatform)
if err != nil {
task.SendStatus(exectsk.NewSchedulerModelFinetuningStatus(err))
return err
}
}
arr := utils.SplitCommands(t.CMD)
commands = append(commands, arr...)
@ -67,6 +73,32 @@ func (t *SchedulerModelFinetuning) do(task *Task, ctx TaskContext) error {
return nil
}
func getDataPreprocessCommands(envs []schsdk.KVPair, inferencePlatform schsdk.InferencePlatform) ([]string, error) {
if inferencePlatform.PlatformName == "" {
return nil, errors.New("inferencePlatform.PlatformName is empty")
}
var commands []string
// 读取当前目录下的 data_preprocess.py 文件
fileContent := ""
fileContent = strings.ReplaceAll(fileContent, "@base_url@", inferencePlatform.ApiBaseUrl)
fileContent = strings.ReplaceAll(fileContent, "@api_key@", inferencePlatform.ApiKey)
fileContent = strings.ReplaceAll(fileContent, "@input_file_path@", "")
fileContent = strings.ReplaceAll(fileContent, "@output_file@", "")
fileContent = strings.ReplaceAll(fileContent, "@base_url@", inferencePlatform.ApiBaseUrl)
fileContent = strings.ReplaceAll(fileContent, "@base_url@", inferencePlatform.ApiBaseUrl)
commandContent := "echo -e '" + fileContent + "' > /opt/generate_data.py"
commands = append(commands, commandContent)
commandContent = "echo -e '" + fileContent + "' > /opt/generate_data.py"
commands = append(commands, commandContent)
return commands, nil
}
func init() {
Register(NewSchedulerModelFinetuning)
}

View File

@ -32,6 +32,7 @@ func main() {
myglbs.InitExecutorID(config.Cfg().Application.ExecutorID)
schglb.InitRcloneConfig(config.Cfg().Rclone.CDSRcloneID, config.Cfg().Rclone.CDSRcloneConfigID)
schglb.InitInferencePlatform(config.Cfg().InferencePlatform)
//rpter := reporter.NewReporter(myglbs.ExecutorID, time.Second*time.Duration(config.Cfg().ReportIntervalSec))
//

View File

@ -24,17 +24,17 @@ import (
"gitlink.org.cn/cloudream/scheduler/manager/internal/jobmgr/job"
)
type NormalJobExecuting struct {
type JobExecuting struct {
lastStatus pcmsdk.TaskStatus
}
func NewNormalJobExecuting() *NormalJobExecuting {
return &NormalJobExecuting{
func NewNormalJobExecuting() *JobExecuting {
return &JobExecuting{
lastStatus: "Begin",
}
}
func (s *NormalJobExecuting) Run(rtx jobmgr.JobStateRunContext, jo *jobmgr.Job) {
func (s *JobExecuting) Run(rtx jobmgr.JobStateRunContext, jo *jobmgr.Job) {
err := s.do(rtx, jo)
if err != nil {
rtx.Mgr.ChangeState(jo, FailureComplete(err))
@ -43,13 +43,13 @@ func (s *NormalJobExecuting) Run(rtx jobmgr.JobStateRunContext, jo *jobmgr.Job)
}
}
func (s *NormalJobExecuting) Dump(rtx jobmgr.JobStateRunContext, jo *jobmgr.Job) jobmod.JobStateDump {
func (s *JobExecuting) Dump(rtx jobmgr.JobStateRunContext, jo *jobmgr.Job) jobmod.JobStateDump {
return &jobmod.NormalJobExecutingDump{
TaskStatus: s.lastStatus,
}
}
func (s *NormalJobExecuting) do(rtx jobmgr.JobStateRunContext, jo *jobmgr.Job) error {
func (s *JobExecuting) do(rtx jobmgr.JobStateRunContext, jo *jobmgr.Job) error {
// TODO UserID
userID := cdssdk.UserID(1)
err := error(nil)
@ -74,6 +74,9 @@ func (s *NormalJobExecuting) do(rtx jobmgr.JobStateRunContext, jo *jobmgr.Job) e
}
cmd, envs := getRuntimeCommand(runningJob.Info.Runtime, runningJob.Files.Dataset.PackageID, runningJob.OutputPath, getStg.RemoteBase, *ccInfo)
err = s.submitNormalTask(rtx, cmd, envs, *ccInfo, pcmImgInfo, ress[0].PCMResourceID)
if err != nil {
logger.Error(err.Error())
}
case *job.FinetuningJob:
ccInfo, getStg, err := getCCInfoAndStgInfo(rtx, runningJob.TargetCCID, userID)
@ -82,6 +85,9 @@ func (s *NormalJobExecuting) do(rtx jobmgr.JobStateRunContext, jo *jobmgr.Job) e
}
cmd, envs := getRuntimeCommand(runningJob.Info.Runtime, runningJob.Files.Dataset.PackageID, runningJob.OutputPath, getStg.RemoteBase, *ccInfo)
err = s.submitFinetuningTask(rtx, cmd, envs, *ccInfo)
if err != nil {
logger.Error(err.Error())
}
case *job.InstanceJob:
ccInfo, getStg, err := getCCInfoAndStgInfo(rtx, runningJob.TargetCCID, userID)
@ -90,11 +96,16 @@ func (s *NormalJobExecuting) do(rtx jobmgr.JobStateRunContext, jo *jobmgr.Job) e
}
_, envs := getRuntimeCommand(runningJob.Info.Runtime, runningJob.Files.Dataset.PackageID, runningJob.OutputPath, getStg.RemoteBase, *ccInfo)
err = s.submitInstanceTask(rtx, jo, runningJob, *ccInfo, getStg.StorageID, userID, envs)
if err != nil {
logger.Error(err.Error())
// 创建失败,从多实例任务中删除
postDeleteInstanceEvent(rtx, jo, runningJob)
}
}
return err
}
func (s *NormalJobExecuting) submitNormalTask(rtx jobmgr.JobStateRunContext, cmd string, envs []schsdk.KVPair, ccInfo schmod.ComputingCenter, pcmImgInfo schmod.PCMImage, resourceID pcmsdk.ResourceID) error {
func (s *JobExecuting) submitNormalTask(rtx jobmgr.JobStateRunContext, cmd string, envs []schsdk.KVPair, ccInfo schmod.ComputingCenter, pcmImgInfo schmod.PCMImage, resourceID pcmsdk.ResourceID) error {
task, err := rtx.Mgr.ExecMgr.StartTask(exetsk.NewSubmitTask(
ccInfo.PCMParticipantID,
pcmImgInfo.PCMImageID,
@ -136,10 +147,11 @@ func (s *NormalJobExecuting) submitNormalTask(rtx jobmgr.JobStateRunContext, cmd
}
}
func (s *NormalJobExecuting) submitFinetuningTask(rtx jobmgr.JobStateRunContext, cmd string, envs []schsdk.KVPair, ccInfo schmod.ComputingCenter) error {
func (s *JobExecuting) submitFinetuningTask(rtx jobmgr.JobStateRunContext, cmd string, envs []schsdk.KVPair, ccInfo schmod.ComputingCenter) error {
task, err := rtx.Mgr.ExecMgr.StartTask(exetsk.NewSchedulerModelFinetuning(
cmd,
envs,
schglb.InferencePlatform,
), ccInfo)
if err != nil {
@ -159,27 +171,28 @@ func (s *NormalJobExecuting) submitFinetuningTask(rtx jobmgr.JobStateRunContext,
return nil
}
func (s *NormalJobExecuting) submitInstanceTask(rtx jobmgr.JobStateRunContext, jo *jobmgr.Job, runningJob *job.InstanceJob, ccInfo schmod.ComputingCenter,
func (s *JobExecuting) submitInstanceTask(rtx jobmgr.JobStateRunContext, jo *jobmgr.Job, runningJob *job.InstanceJob, ccInfo schmod.ComputingCenter,
storageID cdssdk.StorageID, userID cdssdk.UserID, envs []schsdk.KVPair) error {
modelJobInfo := runningJob.Info.ModelJobInfo
// 先从数据库中查询是否已经预置了模型
modelInfo, err := rtx.Mgr.DB.Models().GetModelByID(rtx.Mgr.DB.SQLCtx(), modelJobInfo.ModelID, storageID)
if &modelInfo == nil {
logger.Error(err.Error())
return fmt.Errorf("the model is not exists: %w", err)
}
if err != nil {
return fmt.Errorf("getting computing center info: %w", err)
}
objectStorage, err := rtx.Mgr.DB.ObjectStorage().GetObjectStorageByStorageID(rtx.Mgr.DB.SQLCtx(), storageID)
if err != nil {
logger.Error(err.Error())
return fmt.Errorf("getting object storage info: %w", err)
}
println(objectStorage.CDSStorageID)
// 先从数据库中查询是否已经预置了模型
modelInfo, err := rtx.Mgr.DB.Models().GetModelByID(rtx.Mgr.DB.SQLCtx(), modelJobInfo.ModelID, objectStorage.ID)
if &modelInfo == nil {
logger.Error(err.Error())
return fmt.Errorf("the model is not exists: %w", err)
}
if err != nil {
logger.Error(err.Error())
return fmt.Errorf("getting model info info: %w", err)
}
// 发送扩容任务
ecs := exetsk.NewScheduleCreateECS(
userID,
@ -227,11 +240,12 @@ func (s *NormalJobExecuting) submitInstanceTask(rtx jobmgr.JobStateRunContext, j
case *exetsk.ScheduleCreateECSStatus:
if v2.Error != "" {
logger.Error("update task fail, error: " + v2.Error)
if v2.Operate == schsdk.CreateECS {
// 创建失败,从多实例任务中删除
postDeleteInstanceEvent(rtx, jo, runningJob)
if v2.Operate == schsdk.CreateECS || v2.Operate == schsdk.Invalid {
// 创建失败或者检测不可用,从多实例任务中删除
v2.Operate = schsdk.DestroyECS
} else {
continue
}
continue
}
switch v2.Operate {
@ -358,7 +372,7 @@ func (s *DataReturnJobExecuting) do(rtx jobmgr.JobStateRunContext, jo *jobmgr.Jo
reJob := jo.Body.(*job.DataReturnJob)
userID := cdssdk.UserID(1)
log := logger.WithType[NormalJobExecuting]("State").WithField("JobID", jo.JobID)
log := logger.WithType[JobExecuting]("State").WithField("JobID", jo.JobID)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()