新增实例删除功能

This commit is contained in:
JeshuaRen 2024-09-13 16:13:57 +08:00
parent b0804a3a0b
commit 6bb9c85e17
16 changed files with 478 additions and 137 deletions

View File

@ -92,6 +92,8 @@ type ModelResource struct {
OjbStgID int `json:"OjbStgID" db:"OjbStgID"`
ModelPath string `json:"modelPath" db:"modelPath"`
StartShellPath string `json:"startShellPath" db:"startShellPath"`
ServerPort int `json:"serverPort" db:"serverPort"`
ServerUrlPath string `json:"serverUrlPath" db:"serverUrlPath"`
StopShellPath string `json:"stopShellPath" db:"stopShellPath"`
FinetuningShellPath string `json:"finetuningShellPath" db:"finetuningShellPath"`
}
@ -106,7 +108,7 @@ type ObjectStorage struct {
Endpoint string `json:"endpoint" db:"endpoint"`
Bucket string `json:"bucket" db:"bucket"`
CDSStorageID cdssdk.StorageID `json:"CDSStorageID" db:"CDSStorageID"`
MountType string `json:"mountType"`
MountType string `json:"mountType" db:"mountType"`
}
func (i *CCResourceInfo) Scan(src interface{}) error {

View File

@ -65,7 +65,8 @@ type GetInstanceIDReq struct {
}
type instanceService struct {
ID string `json:"id"`
ID string `json:"id"`
Status string `json:"status"`
}
type GetInstanceIDResp struct {
@ -74,10 +75,10 @@ type GetInstanceIDResp struct {
Data []instanceService `json:"data"`
}
func (c *HttpClient) GetInstanceID(token string, instanceName string) (string, error) {
func (c *HttpClient) GetInstanceID(token string, instanceName string) (string, string, error) {
targetURL, err := url.JoinPath(c.baseURL + "/ai/openapi/v2/instance-service/task")
if err != nil {
return "", err
return "", "", err
}
header := sugonHeader{
@ -97,32 +98,32 @@ func (c *HttpClient) GetInstanceID(token string, instanceName string) (string, e
Header: header,
})
if err != nil {
return "", err
return "", "", err
}
bodyBytes, err := io.ReadAll(resp.Body)
if err != nil {
return "", fmt.Errorf("reading response body: %w", err)
return "", "", fmt.Errorf("reading response body: %w", err)
}
contType := resp.Header.Get("Content-Type")
if strings.Contains(contType, myhttp.ContentTypeJSON) {
var codeResp GetInstanceIDResp
if err := serder.JSONToObject(bodyBytes, &codeResp); err != nil {
return "", fmt.Errorf("parsing response: %w", err)
return "", "", fmt.Errorf("parsing response: %w", err)
}
if codeResp.Code != "0" {
return "", err
return "", "", err
}
if len(codeResp.Data) > 0 {
return codeResp.Data[0].ID, nil
return codeResp.Data[0].ID, codeResp.Data[0].Status, nil
}
return "", err
return "", "", err
}
return "", fmt.Errorf("unknow response content type: %s", contType)
return "", "", fmt.Errorf("unknow response content type: %s", contType)
}
type GetInstanceUrlResp struct {
@ -232,7 +233,7 @@ func (c *HttpClient) OperateSugonInstance(token string, instanceID string, opera
}
if codeResp.Code == "0" {
return codeResp.Data.(string), nil
return codeResp.Code, nil
}
return "", codeResp.ToError()
@ -258,7 +259,7 @@ func (c *HttpClient) RunCommand(token string, instanceID string, content string)
}
req := runCommandReq{
ID: instanceID,
StartScriptActionScope: "all",
StartScriptActionScope: "header",
StartScriptContent: content,
}
resp, err := myhttp.PostJSON(targetURL, myhttp.RequestParam{
@ -273,7 +274,7 @@ func (c *HttpClient) RunCommand(token string, instanceID string, content string)
if err != nil {
return "", fmt.Errorf("reading response body: %w", err)
}
println(string(bodyBytes))
//logger.Info("run command result: " + string(bodyBytes))
contType := resp.Header.Get("Content-Type")
if strings.Contains(contType, myhttp.ContentTypeJSON) {
@ -291,3 +292,110 @@ func (c *HttpClient) RunCommand(token string, instanceID string, content string)
return "", fmt.Errorf("unknow response content type: %s", contType)
}
type previewFileReq struct {
Path string `json:"path"`
Force string `json:"force"`
StartIndex int `json:"startIndex"`
}
type previewFileResp struct {
Content string `json:"content"`
}
func (c *HttpClient) PreviewFile(token string, path string) (string, error) {
// 查询文件绝对路径
//fileUrl := c.baseURL + "/openapi/v2/file/list?limit=1000&start=0"
//filePath, err2 := c.getFileList(token, fileUrl, path)
//if err2 != nil {
// return "", err2
//}
targetURL, err := url.JoinPath(c.baseURL + "/openapi/v2/file/preview")
if err != nil {
return "", err
}
header := sugonHeader{
Token: token,
}
req := previewFileReq{
Path: path,
Force: "default",
StartIndex: 0,
}
resp, err := myhttp.PostForm(targetURL, myhttp.RequestParam{
Header: header,
Body: req,
})
if err != nil {
return "", err
}
bodyBytes, err := io.ReadAll(resp.Body)
if err != nil {
return "", fmt.Errorf("reading response body: %w", err)
}
contType := resp.Header.Get("Content-Type")
if strings.Contains(contType, myhttp.ContentTypeJSON) {
var codeResp response[previewFileResp]
if err := serder.JSONToObject(bodyBytes, &codeResp); err != nil {
return "", fmt.Errorf("parsing response: %w", err)
}
if codeResp.Code == "0" {
return codeResp.Data.Content, nil
}
return "", codeResp.ToError()
}
return "", fmt.Errorf("unknow response content type: %s", contType)
}
type fileListResp struct {
FileList []fileInfo `json:"fileList"`
}
type fileInfo struct {
Path string `json:"path"`
}
func (c *HttpClient) getFileList(token string, targetURL string, fileName string) (string, error) {
header := sugonHeader{
Token: token,
}
resp, err := myhttp.GetJSON(targetURL, myhttp.RequestParam{
Header: header,
})
if err != nil {
return "", err
}
bodyBytes, err := io.ReadAll(resp.Body)
if err != nil {
return "", fmt.Errorf("reading response body: %w", err)
}
contType := resp.Header.Get("Content-Type")
if strings.Contains(contType, myhttp.ContentTypeJSON) {
var codeResp response[fileListResp]
if err := serder.JSONToObject(bodyBytes, &codeResp); err != nil {
return "", fmt.Errorf("parsing response: %w", err)
}
if codeResp.Code != "0" {
return "", codeResp.ToError()
}
if len(codeResp.Data.FileList) > 0 {
for _, file := range codeResp.Data.FileList {
if strings.Contains(file.Path, fileName) {
return file.Path, nil
}
}
return "", fmt.Errorf("file not found")
}
}
return "", fmt.Errorf("unknow response content type: %s", contType)
}

View File

@ -18,7 +18,7 @@ type ScheduleCreateECS struct {
type ScheduleCreateECSStatus struct {
TaskStatusBase
Error string `json:"error"`
Address string `json:"address"`
Result string `json:"result"`
Operate string `json:"operate"`
}
@ -32,9 +32,9 @@ func NewScheduleCreateECS(userID cdssdk.UserID, command string, objectStorage sc
}
}
func NewScheduleCreateECSStatus(address string, operate string, err string) *ScheduleCreateECSStatus {
func NewScheduleCreateECSStatus(result string, operate string, err string) *ScheduleCreateECSStatus {
return &ScheduleCreateECSStatus{
Address: address,
Result: result,
Error: err,
Operate: operate,
}

View File

@ -378,9 +378,9 @@ func (s *DefaultPreScheduler) scheduleForNormalOrMultiJob(jobSet *schsdk.JobSetI
return nil, ErrNoAvailableScheme
}
// 这里写死,用于测试,生成环境必须删除
// 此逻辑用于测试,生产环境必须删除!
for i := 0; i < len(allCCsArr); i++ {
if allCCsArr[i].CC.CCID == 5 {
if allCCsArr[i].CC.CCID == schsdk.CCID(jobResource.Storage) {
targetNode = allCCsArr[i]
}
}

View File

@ -43,6 +43,8 @@ func GetFactory(providerType string) CloudFactory {
return &HuaweiCloudFactory{}
case schmod.AliCloud:
return &AliCloudFactory{}
case schmod.SugonCloud:
return &ShuGuangCloudFactory{}
default:
return nil
}

View File

@ -9,6 +9,7 @@ import (
exemq "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq/executor"
"io"
"strings"
"sync"
"time"
)
@ -63,71 +64,91 @@ func getToken(authConfigs map[string]interface{}) (string, error) {
}
type sugonUrlResp struct {
Name string `json:"name"`
AiUrls []sugonUrl `json:"aiUrls"`
Name string `json:"name"`
AiUrls []sugonUrl `json:"aiUrls"`
EfileUrls []sugonUrl `json:"efileUrls"`
}
type sugonUrl struct {
Url string `json:"url"`
}
func getUrl(token string, url string) (string, error) {
func getUrl(token string, url string) (string, string, error) {
header := make(map[string]string)
header["Token"] = token
resp, err := myhttp.GetForm(url, myhttp.RequestParam{
Header: header,
})
if err != nil {
return "", err
return "", "", err
}
// 读取并打印原始响应
bodyBytes, err := io.ReadAll(resp.Body)
if err != nil {
return "", fmt.Errorf("reading response body: %w", err)
return "", "", fmt.Errorf("reading response body: %w", err)
}
contType := resp.Header.Get("Content-Type")
if strings.Contains(contType, myhttp.ContentTypeJSON) {
var codeResp response[sugonUrlResp]
if err := serder.JSONToObject(bodyBytes, &codeResp); err != nil {
return "", fmt.Errorf("parsing response: %w", err)
return "", "", fmt.Errorf("parsing response: %w", err)
}
if len(codeResp.Data.AiUrls) == 0 {
return "", fmt.Errorf("there is no url")
return "", "", fmt.Errorf("there is no url")
}
return codeResp.Data.AiUrls[0].Url, nil
return codeResp.Data.AiUrls[0].Url, codeResp.Data.EfileUrls[0].Url, nil
}
return "", fmt.Errorf("there is no token")
return "", "", fmt.Errorf("there is no token")
}
type SugonCloud struct{}
type SugonCloud struct {
Lock sync.Mutex
}
var ecsConfig map[string]interface{}
var authConfig map[string]interface{}
var client exemq.HttpClient
var sugonClient exemq.HttpClient
var efileClient exemq.HttpClient
func SugonCloudConfig(authConfigs map[string]interface{}, ecsConfigs map[string]interface{}) {
authConfigs["get_token_url"] = "https://ac.sugon.com/ac/openapi/v2/tokens"
authConfig = authConfigs
ecsConfig = ecsConfigs
// 获取token
token, err := getToken(authConfigs)
if err != nil {
logger.Error(err.Error())
return
}
url, err := getUrl(token, "https://ac.sugon.com/ac/openapi/v2/center")
// 获取请求链接
url, efileUrl, err := getUrl(token, "https://ac.sugon.com/ac/openapi/v2/center")
httpPool := exemq.NewHttpPool(&exemq.Config{})
c, err := httpPool.AcquireByUrl(url)
if err != nil {
logger.Error(err.Error())
return
}
client = *c
sugonClient = *c
ec, err := httpPool.AcquireByUrl(efileUrl)
if err != nil {
logger.Error(err.Error())
return
}
efileClient = *ec
}
func (s SugonCloud) CreateServer() (string, string, error) {
func (s *SugonCloud) CreateServer111() (string, string, error) {
return "", "", fmt.Errorf("not support")
}
func (s *SugonCloud) CreateServer() (string, string, error) {
instanceServiceName := "auto_instance_" + time.Now().Format("20060102150405")
ecsConfig["instanceServiceName"] = instanceServiceName
// 获取token
@ -137,24 +158,48 @@ func (s SugonCloud) CreateServer() (string, string, error) {
return "", "", err
}
// 创建实例
_, err = client.CreateSugonInstance(token, ecsConfig)
_, err = sugonClient.CreateSugonInstance(token, ecsConfig)
if err != nil {
return "", "", err
}
// 获取实例ID
instanceID, err := client.GetInstanceID(token, instanceServiceName)
if err != nil {
return "", "", err
// 获取实例ID4*1000s后还未获取ID则认为实例创建失败
var instanceID string
for i := 0; i <= 1000; i++ {
id, status, err := sugonClient.GetInstanceID(token, instanceServiceName)
if status == schsdk.Failed {
return "", "", fmt.Errorf("create instance failed")
}
if err != nil || status == schsdk.Waiting || status == schsdk.Deploying {
time.Sleep(4 * time.Second)
continue
}
if i == 1000 {
return "", "", fmt.Errorf("get instance id timeout")
}
instanceID = id
break
}
if instanceID == "" {
return "", "", fmt.Errorf("get instance id failed")
}
// 获取实例url
url, err := client.GetInstanceUrl(token, instanceID)
url, err := sugonClient.GetInstanceUrl(token, instanceID)
logger.Info("create ecs success, instanceID: " + instanceID + " url: " + url)
return instanceID, url, nil
}
func (s SugonCloud) RunCommand(commands []string, instanceID string, timeout int) (string, error) {
func (s *SugonCloud) RunCommand(commands []string, instanceID string, timeout int) (string, error) {
s.Lock.Lock()
defer s.Lock.Unlock()
// 获取token
token, err := getToken(authConfig)
if err != nil {
@ -162,48 +207,66 @@ func (s SugonCloud) RunCommand(commands []string, instanceID string, timeout int
return "", err
}
for i := 0; i < len(commands); i++ {
_, err := client.RunCommand(token, instanceID, commands[i])
// 曙光集群不支持查看日志,通过预览日志文件返回
if timeout == -1 && i == len(commands)-1 {
// 命令执行完成需要时间,但是接口不会等待完成后才返回,所以这里需要轮询看是否有结果
content := ""
for j := 0; j < 10; j++ {
content, err = efileClient.PreviewFile(token, commands[i])
if err != nil {
return "", err
}
if content == "" {
time.Sleep(3 * time.Second)
continue
}
return content, err
}
}
_, err := sugonClient.RunCommand(token, instanceID, commands[i])
if err != nil {
return "", err
}
}
return "", err
}
func (s SugonCloud) DeleteInstance(instanceID string) (string, error) {
func (s *SugonCloud) DeleteInstance(instanceID string) (string, error) {
// 获取token
token, err := getToken(authConfig)
if err != nil {
logger.Error(err.Error())
return "", err
}
instance, err := client.OperateSugonInstance(token, instanceID, schsdk.DestroyECS)
instance, err := sugonClient.OperateSugonInstance(token, instanceID, schsdk.DestroyECS)
return instance, err
}
func (s SugonCloud) StopInstance(instanceID string) (string, error) {
func (s *SugonCloud) StopInstance(instanceID string) (string, error) {
// 获取token
token, err := getToken(authConfig)
if err != nil {
logger.Error(err.Error())
return "", err
}
instance, err := client.OperateSugonInstance(token, instanceID, schsdk.PauseECS)
instance, err := sugonClient.OperateSugonInstance(token, instanceID, schsdk.PauseECS)
return instance, err
}
func (s SugonCloud) RebootInstances(instanceID string) (string, error) {
func (s *SugonCloud) RebootInstances(instanceID string) (string, error) {
//TODO implement me
panic("implement me")
}
func (s SugonCloud) StartInstances(instanceID string) (string, error) {
func (s *SugonCloud) StartInstances(instanceID string) (string, error) {
// 获取token
token, err := getToken(authConfig)
if err != nil {
logger.Error(err.Error())
return "", err
}
instance, err := client.OperateSugonInstance(token, instanceID, schsdk.RunECS)
instance, err := sugonClient.OperateSugonInstance(token, instanceID, schsdk.RunECS)
return instance, err
}

View File

@ -10,8 +10,10 @@ import (
exectsk "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq/executor/task"
"gitlink.org.cn/cloudream/scheduler/common/utils"
"gitlink.org.cn/cloudream/scheduler/executor/internal/config"
"math/rand"
"strconv"
"strings"
"time"
//"gitlink.org.cn/cloudream/scheduler/executor/internal/config"
"gitlink.org.cn/cloudream/scheduler/executor/internal/task/create_ecs"
@ -42,16 +44,16 @@ func (t *ScheduleCreateECS) Execute(task *Task, ctx TaskContext) {
log.Info("ScheduleCreateECS...")
}
var count = 1
//var instanceID string
//var ecsIP string
func (t *ScheduleCreateECS) do(task *Task, ctx TaskContext) error {
// 创建云主机
factory := create_ecs.GetFactory(config.CloudName)
provider := factory.CreateProvider()
instanceID, ecsIP, err := provider.CreateServer()
//instanceID, ecsIP, err := "i-bp19q01cjmr62vstszh3", "47.98.122.29", error(nil)
if err != nil {
task.SendStatus(exectsk.NewScheduleCreateECSStatus("", schsdk.CreateECS, err.Error()))
return err
}
logger.Info("create ECS success, instance id: " + instanceID + ", address: " + ecsIP)
@ -66,6 +68,7 @@ func (t *ScheduleCreateECS) do(task *Task, ctx TaskContext) error {
// 设置环境变量
commands := utils.ConvertEnvsToCommand(t.Envs)
// 获取挂载命令
switch t.ObjectStorage.MountType {
case schsdk.RcloneMount:
rcloneCommands := getRcloneCommands(t.ModelResource, t.ObjectStorage, t.UserID)
@ -76,18 +79,28 @@ func (t *ScheduleCreateECS) do(task *Task, ctx TaskContext) error {
commands = append(commands, commandContent)
}
// 安装依赖包用于获取GPU信息
commandContent := getPipCommand()
commands = append(commands, commandContent)
// 获取用户输入的命令
arr := utils.SplitCommands(t.Command)
commands = append(commands, arr...)
_, err = provider.RunCommand(commands, instanceID, 2000)
if err != nil {
task.SendStatus(exectsk.NewScheduleCreateECSStatus("", "", err.Error()))
return err
logger.Error("run command error: " + err.Error())
}
address := "http://" + ecsIP + ":" + strconv.Itoa(t.ModelResource.ServerPort) + "/" + t.ModelResource.ServerUrlPath
if config.CloudName == schmod.SugonCloud {
address = ecsIP + "/" + t.ModelResource.ServerUrlPath
address = strings.Replace(address, "//", "/", -1)
}
// 返回执行结果
task.SendStatus(exectsk.NewScheduleCreateECSStatus("http://"+ecsIP+":5013/chat", schsdk.CreateECS, ""))
println("create ECS success!")
task.SendStatus(exectsk.NewScheduleCreateECSStatus(address, schsdk.CreateECS, ""))
logger.Info("run all commands complete")
// 监听更新操作
for {
@ -119,14 +132,29 @@ func (t *ScheduleCreateECS) do(task *Task, ctx TaskContext) error {
}
task.SendStatus(exectsk.NewScheduleCreateECSStatus("", schsdk.PauseECS, ""))
case schsdk.DestroyECS:
//_, err := provider.DeleteInstance(instanceID)
//if err != nil {
// task.SendStatus(exectsk.NewScheduleCreateECSStatus("", "", err.Error()))
// continue
//}
_, err := provider.DeleteInstance(instanceID)
if err != nil {
task.SendStatus(exectsk.NewScheduleCreateECSStatus("", "", err.Error()))
continue
}
task.SendStatus(exectsk.NewScheduleCreateECSStatus("", schsdk.DestroyECS, ""))
case schsdk.OperateServer:
executeCommands(provider, instanceID, task, info.Runtime)
case schsdk.GPUMonitor:
commands, logFile := getGPUCommand(instanceID)
var res string
// 曙光服务器执行命令无法获取返回值,所以需要特殊处理
if config.CloudName == schmod.SugonCloud {
commands = append(commands, logFile)
res, err = provider.RunCommand(commands, instanceID, -1)
} else {
res, err = provider.RunCommand(commands, instanceID, 2000)
}
if err != nil {
task.SendStatus(exectsk.NewScheduleCreateECSStatus("", "", err.Error()))
continue
}
task.SendStatus(exectsk.NewScheduleCreateECSStatus(res, schsdk.GPUMonitor, ""))
default:
//executeCommands(provider, instanceID, task, info.Command)
}
@ -134,6 +162,33 @@ func (t *ScheduleCreateECS) do(task *Task, ctx TaskContext) error {
}
}
func getRandomNum() string {
rand.Seed(time.Now().UnixNano())
randomFloat := rand.Float64() * 20
return strconv.FormatFloat(randomFloat, 'f', 2, 64)
}
func getPipCommand() string {
commandContent := "python -m pip install --upgrade pip \n pip config set global.index-url https://mirrors.tuna.tsinghua.edu.cn/pypi/web/simple \n pip install torch \n pip install gputil \n pip install psutil"
return commandContent
}
func getGPUCommand(instanceID string) ([]string, string) {
var commands []string
path := "/public/home/acgnnmfbwo/modeltest/gpu_monitor/"
logFileName := path + "gpuMonitor_" + instanceID + ".log"
commandContent := "mkdir -p " + path + " && rm -rf " + logFileName
commands = append(commands, commandContent)
commandContent = "echo -e 'import torch\nimport GPUtil\nimport psutil\n\ndef get_memory_usage():\n if torch.cuda.is_available():\n allocated_memory = torch.cuda.memory_allocated()\n allocated_memory_mb = allocated_memory / (1024 * 1024)\n gpus = GPUtil.getGPUs()\n if len(gpus) > 0:\n total_memory_mb = gpus[0].memoryTotal\n memory_utilization = (allocated_memory_mb / total_memory_mb) * 100\n print(f\"MemoryUtilization: {memory_utilization:.2f}\")\n else:\n print(\"No GPU found with GPUtil.\")\n else:\n print(\"CUDA is not available. Please check your GPU setup.\")\n\ndef get_gpu_utilization_with_gputil():\n try:\n gpus = GPUtil.getGPUs()\n for gpu in gpus:\n print(f\"GPU_Name: {gpu.name}\")\n print(f\"GPUUtilization: {gpu.load * 100:.2f}\")\n print(f\"Memory_Used: {gpu.memoryUsed} MB\")\n print(f\"Memory_Free: {gpu.memoryFree} MB\")\n print(f\"Memory_Total: {gpu.memoryTotal} MB\")\n except Exception as e:\n print(f\"Error getting GPU utilization with GPUtil: {e}\")\n\ndef get_cpu_usage():\n cpu_percent = psutil.cpu_percent(interval=1)\n print(f\"CPUUtilization: {cpu_percent:.2f}\")\n\nif __name__ == \"__main__\":\n get_memory_usage()\n get_gpu_utilization_with_gputil()\n get_cpu_usage()\n' > ./modeltest/gpu_monitor/gpuMonitor.py"
commands = append(commands, commandContent)
commandContent = "python ./modeltest/gpu_monitor/gpuMonitor.py > " + logFileName
commands = append(commands, commandContent)
//logFileName = strings.Replace(logFileName, "./", "", -1)
return commands, logFileName
}
func getRcloneCommands(resource schmod.ModelResource, storage schmod.ObjectStorage, userID cdssdk.UserID) []string {
var commands []string
@ -166,6 +221,7 @@ func getRcloneCommands(resource schmod.ModelResource, storage schmod.ObjectStora
// 执行启动脚本
startScript := mountDir + "/" + resource.StartShellPath
startScript = strings.Replace(startScript, "//", "/", -1)
commandContent = "sudo sh @startScript@ > /opt/startup.log"
commandContent = strings.Replace(commandContent, "@startScript@", startScript, -1)
commands = append(commands, commandContent)
@ -177,12 +233,12 @@ func executeCommands(provider create_ecs.CloudProvider, instanceID string, task
commands := utils.ConvertEnvsToCommand(runtime.Envs)
commands = append(commands, utils.SplitCommands(runtime.Command)...)
_, err := provider.RunCommand(commands, instanceID, 2000)
res, err := provider.RunCommand(commands, instanceID, 2000)
if err != nil {
task.SendStatus(exectsk.NewScheduleCreateECSStatus("", "", err.Error()))
return
}
task.SendStatus(exectsk.NewScheduleCreateECSStatus("", "", ""))
task.SendStatus(exectsk.NewScheduleCreateECSStatus(res, schsdk.OperateServer, ""))
}
func init() {

View File

@ -32,6 +32,7 @@ func (i *InstanceInfoBase) Instance() {}
var InstanceOperateInfoTypeUnion = types.NewTypeUnion[InstanceOperateInfo](
(*InstanceCreateInfo)(nil),
(*InstanceUpdateInfo)(nil),
(*InstanceDeleteInfo)(nil),
)
var _ = serder.UseTypeUnionInternallyTagged(&InstanceOperateInfoTypeUnion, "type")
@ -51,6 +52,12 @@ type InstanceUpdateInfo struct {
//LoRAPackage string `json:"loraPackage"`
}
type InstanceDeleteInfo struct {
serder.Metadata `union:"Delete"`
InstanceInfoBase
InstanceID schsdk.JobID `json:"instanceID"`
}
func NewInstanceOperate(info InstanceOperateInfo, future OperateInstanceFuture) *InstanceOperate {
return &InstanceOperate{
Info: info,

View File

@ -6,16 +6,18 @@ import (
)
type InstanceJob struct {
Info schsdk.InstanceJobInfo // 提交任务时提供的任务描述信息
Files jobmod.JobFiles // 任务需要的文件
TargetCCID schsdk.CCID // 将要运行此任务的算力中心ID
OutputPath string // 程序结果输出路径一个相对路径需要加上CDS数据库中记录的RemoteBase才是完整路径
Info schsdk.InstanceJobInfo // 提交任务时提供的任务描述信息
Files jobmod.JobFiles // 任务需要的文件
TargetCCID schsdk.CCID // 将要运行此任务的算力中心ID
OutputPath string // 程序结果输出路径一个相对路径需要加上CDS数据库中记录的RemoteBase才是完整路径
ParentJobID schsdk.JobID
}
func NewInstanceJob(info schsdk.InstanceJobInfo, files jobmod.JobFiles) *InstanceJob {
func NewInstanceJob(info schsdk.InstanceJobInfo, files jobmod.JobFiles, parentJobID schsdk.JobID) *InstanceJob {
return &InstanceJob{
Info: info,
Files: files,
Info: info,
Files: files,
ParentJobID: parentJobID,
}
}

View File

@ -4,6 +4,7 @@ import (
"context"
"fmt"
"github.com/samber/lo"
"gitlink.org.cn/cloudream/common/pkgs/future"
schsdk "gitlink.org.cn/cloudream/common/sdks/scheduler"
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
schmod "gitlink.org.cn/cloudream/scheduler/common/models"
@ -226,6 +227,10 @@ func (s *NormalJobExecuting) submitInstanceTask(rtx jobmgr.JobStateRunContext, j
case *exetsk.ScheduleCreateECSStatus:
if v2.Error != "" {
logger.Error("update task fail, error: " + v2.Error)
if v2.Operate == schsdk.CreateECS {
// 创建失败,从多实例任务中删除
postDeleteInstanceEvent(rtx, jo, runningJob)
}
continue
}
@ -234,21 +239,27 @@ func (s *NormalJobExecuting) submitInstanceTask(rtx jobmgr.JobStateRunContext, j
// 扩容任务,将结果放到池子中
node := schsdk.NodeInfo{
InstanceID: jo.JobID,
Address: schsdk.Address(v2.Address),
Address: schsdk.Address(v2.Result),
Status: schsdk.RunECS,
}
jobmgr.SetNodeData(jo.JobSetID, modelJobInfo, node)
logger.Infof("node expansion: %v", v2.Address)
rtx.Mgr.NodeSvc.SetNodeData(jo.JobSetID, modelJobInfo, node)
logger.Infof("node expansion: %v", v2.Result)
case schsdk.DestroyECS:
// 缩容任务,将节点从节点中移除
jobmgr.RemoveNodeFromRunningModels(modelJobInfo, jo.JobID)
// 缩容任务,从节点列表中移除
rtx.Mgr.NodeSvc.RemoveNodeFromRunningModels(modelJobInfo, jo.JobID)
// 从多实例任务中删除
postDeleteInstanceEvent(rtx, jo, runningJob)
case schsdk.PauseECS:
// 更新节点状态
jobmgr.UpdateNodeFromRunningModels(modelJobInfo, jo.JobID, schsdk.PauseECS)
rtx.Mgr.NodeSvc.UpdateNodeFromRunningModels(modelJobInfo, jo.JobID, schsdk.PauseECS)
case schsdk.RunECS:
// 更新节点状态
jobmgr.UpdateNodeFromRunningModels(modelJobInfo, jo.JobID, schsdk.RunECS)
rtx.Mgr.NodeSvc.UpdateNodeFromRunningModels(modelJobInfo, jo.JobID, schsdk.RunECS)
case schsdk.OperateServer:
println()
case schsdk.GPUMonitor:
rtx.Mgr.NodeSvc.SetNodeUsageRateInfo(jo.JobID, v2.Result)
}
case error:
@ -264,6 +275,15 @@ func (s *NormalJobExecuting) submitInstanceTask(rtx jobmgr.JobStateRunContext, j
}
}
func postDeleteInstanceEvent(rtx jobmgr.JobStateRunContext, jo *jobmgr.Job, runningJob *job.InstanceJob) {
deleteInfo := event.InstanceDeleteInfo{
InstanceID: jo.JobID,
}
fut := future.NewSetValue[event.OperateInstanceResult]()
rtx.Mgr.PostEvent(runningJob.ParentJobID, event.NewInstanceOperate(&deleteInfo, fut))
_, _ = fut.Wait(context.TODO())
}
// 判断算力中心是否支持环境变量配置如果不支持则读取脚本内容并拼接在Command参数后面
func getRuntimeCommand(runtime schsdk.JobRuntimeInfo, packageID cdssdk.PackageID, outputPath string, remoteBase string, ccInfo schmod.ComputingCenter) (string, []schsdk.KVPair) {
var envs []schsdk.KVPair

View File

@ -52,7 +52,7 @@ func (s *MultiInstanceInit) do(rtx jobmgr.JobStateRunContext, jo *jobmgr.Job) {
}
// 创建实例并运行
instanceJob := job.NewInstanceJob(*instJobInfo, files)
instanceJob := job.NewInstanceJob(*instJobInfo, files, jo.JobID)
jobID := rtx.Mgr.AddJob(jo.JobSetID, instanceJob, NewPreSchuduling(multInstJob.PreScheduler))
// 在多实例任务中新增这个实例的任务ID

View File

@ -14,6 +14,7 @@ import (
"gitlink.org.cn/cloudream/scheduler/manager/internal/jobmgr/job"
"strings"
"sync"
"time"
)
type MultiInstanceRunning struct {
@ -42,6 +43,8 @@ func (s *MultiInstanceRunning) do(rtx jobmgr.JobStateRunContext, jo *jobmgr.Job)
multInstJob := jo.Body.(*job.MultiInstanceJob)
go pollingInstance(rtx, multInstJob)
waitFut := event.BeginWaitType[*event.InstanceOperate](rtx.EventSet)
for {
chanValue := <-waitFut.Chan()
@ -62,11 +65,44 @@ func (s *MultiInstanceRunning) do(rtx jobmgr.JobStateRunContext, jo *jobmgr.Job)
subJobs = multInstJob.SubJobs
}
updateInstance(rtx, info, subJobs, instanceFuture)
case *event.InstanceDeleteInfo:
deleteInstance(multInstJob, info.InstanceID)
}
}
}
func deleteInstance(multiJob *job.MultiInstanceJob, instanceID schsdk.JobID) {
for i := 0; i < len(multiJob.SubJobs); i++ {
// 找到instanceID后从列表中删除
if multiJob.SubJobs[i] == instanceID {
multiJob.SubJobs = append(multiJob.SubJobs[:i], multiJob.SubJobs[i+1:]...)
break
}
}
}
// 实例轮询用于查询GPU等信息
func pollingInstance(rtx jobmgr.JobStateRunContext, multiJob *job.MultiInstanceJob) {
for {
time.Sleep(time.Second * 30)
for i := 0; i < len(multiJob.SubJobs); i++ {
instanceID := multiJob.SubJobs[i]
logger.Info("polling instanceID: " + instanceID)
go func() {
fut := future.NewSetValue[event.UpdateResult]()
rtx.Mgr.PostEvent(instanceID, event.NewUpdate(schsdk.JobRuntimeInfo{}, schsdk.GPUMonitor, fut))
_, err := fut.Wait(context.TODO())
if err != nil {
logger.Error(err.Error())
}
println()
}()
}
}
}
func updateInstance(rtx jobmgr.JobStateRunContext, updateInfo *event.InstanceUpdateInfo, subJobs []schsdk.JobID, updateInstanceFuture event.OperateInstanceFuture) {
var failJobs []string
@ -146,7 +182,7 @@ func createInstance(rtx jobmgr.JobStateRunContext, info *event.InstanceCreateInf
}
// 创建实例并运行
instanceJob := job.NewInstanceJob(*instJobInfo, files)
instanceJob := job.NewInstanceJob(*instJobInfo, files, jo.JobID)
jobID := rtx.Mgr.AddJob(jo.JobSetID, instanceJob, NewPreSchuduling(*jobSchedule))
// 在多实例任务中新增这个实例的任务ID

View File

@ -31,6 +31,7 @@ type Manager struct {
ExecMgr *executormgr.Manager
AdvMgr *advisormgr.Manager
DB *db.DB
NodeSvc *NodeService
jobSetIDIndex int
jobSets map[schsdk.JobSetID]*mgrJobSet
@ -38,11 +39,12 @@ type Manager struct {
jobs map[schsdk.JobID]*mgrJob
}
func NewManager(execMgr *executormgr.Manager, advMgr *advisormgr.Manager, db *db.DB) (*Manager, error) {
func NewManager(execMgr *executormgr.Manager, advMgr *advisormgr.Manager, db *db.DB, nodeSvc *NodeService) (*Manager, error) {
mgr := &Manager{
ExecMgr: execMgr,
AdvMgr: advMgr,
DB: db,
NodeSvc: nodeSvc,
jobSets: make(map[schsdk.JobSetID]*mgrJobSet),
jobs: make(map[schsdk.JobID]*mgrJob),
}

View File

@ -4,16 +4,30 @@ import (
"github.com/patrickmn/go-cache"
"gitlink.org.cn/cloudream/common/pkgs/logger"
schsdk "gitlink.org.cn/cloudream/common/sdks/scheduler"
"sort"
"strconv"
"strings"
"sync"
"time"
)
var runningModels = make(map[string]schsdk.RunningModelInfo)
type NodeService struct {
RunningModels map[string]schsdk.RunningModelInfo
NodeUsageCache map[schsdk.JobID]*cache.Cache
Lock sync.Mutex
}
func NewNodeService() *NodeService {
return &NodeService{
NodeUsageCache: make(map[schsdk.JobID]*cache.Cache),
RunningModels: make(map[string]schsdk.RunningModelInfo),
}
}
// SetNodeData 新增节点
func SetNodeData(jobSetID schsdk.JobSetID, modelJobInfo schsdk.ModelJobInfo, node schsdk.NodeInfo) {
func (s *NodeService) SetNodeData(jobSetID schsdk.JobSetID, modelJobInfo schsdk.ModelJobInfo, node schsdk.NodeInfo) {
key := string(modelJobInfo.CustomModelName) + "_" + string(modelJobInfo.ModelID)
value, ok := runningModels[key]
value, ok := s.RunningModels[key]
// 如果不存在
if !ok {
nodes := []schsdk.NodeInfo{node}
@ -25,18 +39,18 @@ func SetNodeData(jobSetID schsdk.JobSetID, modelJobInfo schsdk.ModelJobInfo, nod
ModelName: "",
CustomModelName: modelJobInfo.CustomModelName,
}
runningModels[key] = value
s.RunningModels[key] = value
return
}
// 如果存在
value.Nodes = append(value.Nodes, node)
runningModels[key] = value
s.RunningModels[key] = value
}
// RemoveNodeFromRunningModels 移除节点
func RemoveNodeFromRunningModels(modelJobInfo schsdk.ModelJobInfo, instanceID schsdk.JobID) {
func (s *NodeService) RemoveNodeFromRunningModels(modelJobInfo schsdk.ModelJobInfo, instanceID schsdk.JobID) {
key := string(modelJobInfo.CustomModelName) + "_" + string(modelJobInfo.ModelID)
value, ok := runningModels[key]
value, ok := s.RunningModels[key]
if !ok {
return
}
@ -45,16 +59,16 @@ func RemoveNodeFromRunningModels(modelJobInfo schsdk.ModelJobInfo, instanceID sc
node := value.Nodes[i]
if node.InstanceID == instanceID {
value.Nodes = append(value.Nodes[:i], value.Nodes[i+1:]...)
runningModels[key] = value
s.RunningModels[key] = value
logger.Info("remove node success from running models, job id: " + instanceID)
break
}
}
}
func UpdateNodeFromRunningModels(modelJobInfo schsdk.ModelJobInfo, instanceID schsdk.JobID, status string) {
func (s *NodeService) UpdateNodeFromRunningModels(modelJobInfo schsdk.ModelJobInfo, instanceID schsdk.JobID, status string) {
key := string(modelJobInfo.CustomModelName) + "_" + string(modelJobInfo.ModelID)
value, ok := runningModels[key]
value, ok := s.RunningModels[key]
if !ok {
return
}
@ -65,75 +79,104 @@ func UpdateNodeFromRunningModels(modelJobInfo schsdk.ModelJobInfo, instanceID sc
node.Status = status
logger.Info("update node success from running models, job id: " + instanceID)
value.Nodes[i] = node
runningModels[key] = value
s.RunningModels[key] = value
break
}
}
}
func GetAvailableNodes() map[string]schsdk.RunningModelInfo {
return runningModels
func (s *NodeService) GetAvailableNodes() map[string]schsdk.RunningModelInfo {
return s.RunningModels
}
var rateInfos []schsdk.NodeUsageRateInfo
func (s *NodeService) GetNodeUsageRateInfo(customModelName schsdk.ModelName, modelID schsdk.ModelID) []schsdk.NodeUsageRateInfo {
var rateInfos []schsdk.NodeUsageRateInfo
// 模拟获取节点使用率
func GetNodeUsageRateInfo(customModelName schsdk.ModelName, modelID schsdk.ModelID) []schsdk.NodeUsageRateInfo {
key := string(customModelName) + "_" + string(modelID)
value, ok := runningModels[key]
value, ok := s.RunningModels[key]
if !ok {
return nil
}
for i := 0; i < len(value.Nodes); i++ {
node := value.Nodes[i]
//c := cachePool[node.NodeID]
//rates := getCacheData(c)
rateInfo := schsdk.NodeUsageRateInfo{
InstanceID: node.InstanceID,
Address: node.Address,
GPURate: []schsdk.UsageRate{
{
Timestamp: strconv.FormatInt(time.Now().Unix(), 10),
Number: "10.1",
},
{
Timestamp: strconv.FormatInt(time.Now().Unix()+2, 10),
Number: "20",
},
},
AccCardRate: []schsdk.UsageRate{
{
Timestamp: strconv.FormatInt(time.Now().Unix(), 10),
Number: "3",
},
{
Timestamp: strconv.FormatInt(time.Now().Unix()+2, 10),
Number: "4.55",
},
},
}
c := s.NodeUsageCache[node.InstanceID]
rateInfo := getCacheData(c)
rateInfos = append(rateInfos, rateInfo)
}
//c := cache.New(5*time.Minute, 10*time.Minute)
return rateInfos
}
func getCacheData(c *cache.Cache) []schsdk.UsageRate {
func (s *NodeService) SetNodeUsageRateInfo(key schsdk.JobID, value string) {
timeStamp := strconv.FormatInt(time.Now().Unix(), 10)
ch, ok := s.NodeUsageCache[key]
if !ok {
ch = cache.New(time.Minute*60, time.Minute*60)
ch.Set(timeStamp, value, cache.DefaultExpiration)
s.NodeUsageCache[key] = ch
return
}
ch.Set(timeStamp, value, cache.DefaultExpiration)
}
func getCacheData(c *cache.Cache) schsdk.NodeUsageRateInfo {
var nodeUsageRateInfo schsdk.NodeUsageRateInfo
infoMap := make(map[string][]schsdk.UsageRate)
var usageRates []schsdk.UsageRate
// 获取缓存中的所有项
items := c.Items()
// 遍历缓存项,将其放入 map 中
for key, item := range items {
for tmstamp, item := range items {
usageRates = append(usageRates, schsdk.UsageRate{
Timestamp: key,
Number: string(item.Object.([]byte)),
})
msg := item.Object.(string)
arr1 := strings.Split(msg, "\n")
// 提取所有kv
for i := 0; i < len(arr1); i++ {
arr2 := strings.Split(arr1[i], ":")
if len(arr2) != 2 {
continue
}
key := strings.TrimSpace(arr2[0])
value := strings.TrimSpace(arr2[1])
rate, ok := infoMap[key]
if !ok {
infoMap[key] = []schsdk.UsageRate{
{
Timestamp: tmstamp,
Number: value,
},
}
continue
}
rate = append(rate, schsdk.UsageRate{
Timestamp: tmstamp,
Number: value,
})
infoMap[key] = rate
}
}
return usageRates
for _, v := range infoMap {
// 对v 进行排序
sort.Slice(v, func(i, j int) bool {
return v[i].Timestamp < v[j].Timestamp
})
//switch k {
//case "MemoryUtilization":
// nodeUsageRateInfo.MemoryUtilization = v
//case "GPUUtilization":
// nodeUsageRateInfo.GPUUtilization = v
//case "CPUUtilization":
// nodeUsageRateInfo.CPUUtilization = v
//}
nodeUsageRateInfo.MemoryUtilization = v
nodeUsageRateInfo.GPUUtilization = v
}
return nodeUsageRateInfo
}

View File

@ -6,16 +6,15 @@ import (
"gitlink.org.cn/cloudream/common/pkgs/mq"
schsdk "gitlink.org.cn/cloudream/common/sdks/scheduler"
mgrmq "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq/manager"
"gitlink.org.cn/cloudream/scheduler/manager/internal/jobmgr"
)
func (svc *Service) QueryRunningModels(msg *mgrmq.AvailableNodes) (*schsdk.RunningModelResp, *mq.CodeMessage) {
availableNodes := jobmgr.GetAvailableNodes()
availableNodes := svc.jobMgr.NodeSvc.GetAvailableNodes()
return mq.ReplyOK(mgrmq.NewAvailableNodesResp(availableNodes))
}
func (svc *Service) ECSNodeRunningInfo(req *schsdk.ECSNodeRunningInfoReq) (*schsdk.ECSNodeRunningInfoResp, *mq.CodeMessage) {
info := jobmgr.GetNodeUsageRateInfo(req.CustomModelName, req.ModelID)
info := svc.jobMgr.NodeSvc.GetNodeUsageRateInfo(req.CustomModelName, req.ModelID)
return mq.ReplyOK(schsdk.NewECSNodeRunningInfoResp(info))
}

View File

@ -53,7 +53,8 @@ func main() {
os.Exit(1)
}
jobMgr, err := jobmgr.NewManager(exeMgr, advMgr, db)
nodeSvc := jobmgr.NewNodeService()
jobMgr, err := jobmgr.NewManager(exeMgr, advMgr, db, nodeSvc)
if err != nil {
fmt.Printf("new job manager: %s", err.Error())
os.Exit(1)