pcm-coordinator/internal/storeLink/modelarts.go

1120 lines
32 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

/*
Copyright (c) [2023] [pcm]
[pcm-coordinator] is licensed under Mulan PSL v2.
You can use this software according to the terms and conditions of the Mulan PSL v2.
You may obtain a copy of Mulan PSL v2 at:
http://license.coscl.org.cn/MulanPSL2
THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
EITHER EXPaRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
See the Mulan PSL v2 for more details.
*/
package storeLink
import (
"context"
"fmt"
"github.com/pkg/errors"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/common"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/schedulers/option"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/collector"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/executor"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/inference"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/constants"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/utils"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/utils/timeutils"
"gitlink.org.cn/JointCloud/pcm-modelarts/client/imagesservice"
"gitlink.org.cn/JointCloud/pcm-modelarts/client/modelartsservice"
"gitlink.org.cn/JointCloud/pcm-modelarts/modelarts"
modelartsclient "gitlink.org.cn/JointCloud/pcm-modelarts/modelarts"
"k8s.io/apimachinery/pkg/util/json"
"log"
"mime/multipart"
"regexp"
"strconv"
"strings"
"sync"
"time"
)
const (
Ascend = "Ascend"
Npu = "npu"
ImageNetResnet50Cmd = "cd /home/ma-user & python ./inference_ascend.py"
ChatGLM6BCmd = "cd /home/ma-user && python ./download_model.py && python ./inference_chatGLM.py"
ASCEND = "ASCEND910"
)
type ModelArtsLink struct {
modelArtsRpc modelartsservice.ModelArtsService
modelArtsImgRpc imagesservice.ImagesService
platform string
participantId int64
pageIndex int32
pageSize int32
SourceLocation string
Version string
ModelId string
ModelType string
}
type MoUsage struct {
CpuSize int64
NpuSize int64
MemorySize int64
VMemorySize int64
VMemoryNumber int64
CpuAvailable int64
NpuAvailable int64
MemoryAvailable int64
VMemoryAvailable int64
}
// Version 结构体表示版本号
type Version struct {
Major, Minor, Patch int
}
// ParseVersion 从字符串解析版本号
func ParseVersion(versionStr string) (*Version, error) {
parts := strings.Split(versionStr, ".")
if len(parts) != 3 {
return nil, fmt.Errorf("invalid version format: %s", versionStr)
}
major, err := strconv.Atoi(parts[0])
if err != nil {
return nil, err
}
minor, err := strconv.Atoi(parts[1])
if err != nil {
return nil, err
}
patch, err := strconv.Atoi(parts[2])
if err != nil {
return nil, err
}
return &Version{Major: major, Minor: minor, Patch: patch}, nil
}
// Increment 根据给定规则递增版本号
func (v *Version) Increment() {
if v.Patch < 9 {
v.Patch++
} else {
v.Patch = 0
if v.Minor < 9 {
v.Minor++
} else {
v.Minor = 0
v.Major++
}
}
}
// String 将版本号转换回字符串格式
func (v *Version) String() string {
return fmt.Sprintf("%d.%d.%d", v.Major, v.Minor, v.Patch)
}
func NewModelArtsLink(modelArtsRpc modelartsservice.ModelArtsService, modelArtsImgRpc imagesservice.ImagesService, name string, id int64, nickname string) *ModelArtsLink {
return &ModelArtsLink{modelArtsRpc: modelArtsRpc, modelArtsImgRpc: modelArtsImgRpc, platform: nickname, participantId: id, pageIndex: 0, pageSize: 50}
}
func (m *ModelArtsLink) UploadImage(ctx context.Context, path string) (interface{}, error) {
//TODO modelArts上传镜像
return nil, nil
}
func (m *ModelArtsLink) DeleteImage(ctx context.Context, imageId string) (interface{}, error) {
// TODO modelArts删除镜像
return nil, nil
}
func (m *ModelArtsLink) QueryImageList(ctx context.Context) (interface{}, error) {
// modelArts获取镜像列表
req := &modelarts.ListRepoReq{
Offset: "0",
Limit: strconv.Itoa(int(m.pageSize)),
Platform: m.platform,
}
resp, err := m.modelArtsImgRpc.ListReposDetails(ctx, req)
if err != nil {
return nil, err
}
return resp, nil
}
func (m *ModelArtsLink) SubmitTask(ctx context.Context, imageId string, cmd string, envs []string, params []string, resourceId string, datasetsId string, algorithmId string, aiType string) (interface{}, error) {
// modelArts提交任务
environments := make(map[string]string)
parameters := make([]*modelarts.ParametersTrainJob, 0)
/* inputs := make([]*modelarts.InputTraining, 0)
outputs := make([]*modelarts.OutputTraining, 0)*/
for _, env := range envs {
s := strings.Split(env, COMMA)
environments[s[0]] = s[1]
}
for _, param := range params {
s := strings.Split(param, COMMA)
parameters = append(parameters, &modelarts.ParametersTrainJob{
Name: s[0],
Value: s[1],
})
}
/* inputs = append(inputs, &modelarts.InputTraining{
Name: "data_url",
Remote: &modelarts.RemoteTra{
Obs: &modelarts.Obs1{
ObsUrl: "/test-wq/data/mnist.npz",
},
}})
outputs = append(outputs, &modelarts.OutputTraining{
Name: "train_url",
Remote: &modelarts.RemoteOut{
Obs: &modelarts.ObsTra{
ObsUrl: "/test-wq/model/",
},
},
})*/
req := &modelarts.CreateTrainingJobReq{
Kind: "job",
Metadata: &modelarts.MetadataS{
Name: TASK_NAME_PREFIX + utils.RandomString(10),
WorkspaceId: "0",
},
Algorithm: &modelarts.Algorithms{
Id: algorithmId,
Engine: &modelarts.EngineCreateTraining{
ImageUrl: imageId,
},
Command: cmd,
Environments: environments,
Parameters: parameters,
//Inputs: inputs,
//Outputs: outputs,
},
Spec: &modelarts.SpecsC{
Resource: &modelarts.ResourceCreateTraining{
FlavorId: resourceId,
NodeCount: 1,
},
},
Platform: m.platform,
}
marshal, err2 := json.Marshal(req)
if err2 != nil {
}
println(string(marshal))
resp, err := m.modelArtsRpc.CreateTrainingJob(ctx, req)
if err != nil {
return nil, err
}
if resp.ErrorMsg != "" {
return nil, errors.New(resp.ErrorMsg)
}
return resp, nil
}
func (m *ModelArtsLink) QueryTask(ctx context.Context, taskId string) (interface{}, error) {
// 获取任务
req := &modelarts.DetailTrainingJobsReq{
TrainingJobId: taskId,
Platform: m.platform,
}
resp, err := m.modelArtsRpc.GetTrainingJobs(ctx, req)
if err != nil {
return nil, err
}
return resp, nil
}
func (m *ModelArtsLink) DeleteTask(ctx context.Context, taskId string) (interface{}, error) {
// 删除任务
req := &modelarts.DeleteTrainingJobReq{
TrainingJobId: taskId,
Platform: m.platform,
}
resp, err := m.modelArtsRpc.DeleteTrainingJob(ctx, req)
if err != nil {
return nil, err
}
return resp, nil
}
func (m *ModelArtsLink) QuerySpecs(ctx context.Context) (interface{}, error) {
// octopus查询资源规格
req := &modelarts.TrainingJobFlavorsReq{
Platform: m.platform,
}
resp, err := m.modelArtsRpc.GetTrainingJobFlavors(ctx, req)
if err != nil {
return nil, err
}
return resp, nil
}
func (m *ModelArtsLink) GetResourceStats(ctx context.Context) (*collector.ResourceStats, error) {
req := &modelarts.GetPoolsRuntimeMetricsReq{}
resp, err := m.modelArtsRpc.GetPoolsRuntimeMetrics(ctx, req)
if err != nil {
return nil, err
}
if resp.ErrorMsg != "" {
return nil, errors.New("failed to get algorithms")
}
resourceStats := &collector.ResourceStats{}
CpuCoreTotalSum := int64(0)
CpuCoreAvailSum := int64(0)
MemTotalSum := float64(0)
MemAvailSum := float64(0)
var CpuCoreTotal int64
var CpuCoreAvail int64
var MemTotal float64
var MemAvail float64
for _, items := range resp.Items {
//TODO The value of taskType is temporarily fixed to "pytorch"
CpuCoreTotal, err = strconv.ParseInt(items.Table.Capacity.Value.Cpu, 10, 64)
CpuCoreTotalSum += CpuCoreTotal
CpuCoreAvail, err = strconv.ParseInt(items.Table.Allocated.Value.Cpu, 10, 64)
CpuCoreAvailSum += CpuCoreAvail
MemTotal, err = strconv.ParseFloat(items.Table.Capacity.Value.Memory, 64)
MemTotalSum += MemTotal
MemAvail, err = strconv.ParseFloat(items.Table.Allocated.Value.Memory, 64)
MemAvailSum += MemAvail
}
resourceStats.CpuCoreTotal = CpuCoreTotalSum
resourceStats.CpuCoreAvail = CpuCoreAvailSum
resourceStats.MemTotal = MemTotalSum
resourceStats.MemAvail = MemAvailSum
req1 := &modelarts.GetResourceFlavorsReq{}
resp1, err := m.modelArtsRpc.GetResourceFlavors(ctx, req1)
num32, _ := strconv.Atoi(resp1.Items[0].Spec.Npu.Size)
var cards []*collector.Card
card := &collector.Card{
Platform: MODELARTS,
Type: CARD,
Name: Npu,
CardNum: int32(num32),
TOpsAtFp16: float64(num32 * 320),
}
cards = append(cards, card)
resourceStats.CardsAvail = cards
return resourceStats, nil
}
func (m *ModelArtsLink) GetDatasetsSpecs(ctx context.Context) ([]*collector.DatasetsSpecs, error) {
return nil, nil
}
func (m *ModelArtsLink) GetAlgorithms(ctx context.Context) ([]*collector.Algorithm, error) {
var algorithms []*collector.Algorithm
req := &modelarts.ListAlgorithmsReq{
Platform: m.platform,
Offset: m.pageIndex,
Limit: m.pageSize,
}
resp, err := m.modelArtsRpc.ListAlgorithms(ctx, req)
if err != nil {
return nil, err
}
if resp.ErrorMsg != "" {
return nil, errors.New("failed to get algorithms")
}
for _, a := range resp.Items {
//TODO The value of taskType is temporarily fixed to "pytorch"
algorithm := &collector.Algorithm{Name: a.Metadata.Name, Platform: MODELARTS, TaskType: "pytorch"}
algorithms = append(algorithms, algorithm)
}
return algorithms, nil
}
func (m *ModelArtsLink) GetComputeCards(ctx context.Context) ([]string, error) {
var cards []string
cards = append(cards, Ascend)
return cards, nil
}
func (m *ModelArtsLink) GetUserBalance(ctx context.Context) (float64, error) {
return 0, nil
}
func (m *ModelArtsLink) DownloadAlgorithmCode(ctx context.Context, resourceType string, card string, taskType string, dataset string, algorithm string) (string, error) {
algoName := dataset + DASH + algorithm
req := &modelarts.GetFileReq{
Path: algoName + FORWARD_SLASH + TRAIN_FILE,
}
resp, err := m.modelArtsRpc.GetFile(ctx, req)
if err != nil {
return "", err
}
return string(resp.Content), nil
}
func (m *ModelArtsLink) UploadAlgorithmCode(ctx context.Context, resourceType string, card string, taskType string, dataset string, algorithm string, code string) error {
return nil
}
// Determine whether there is a necessary image in image management and query the image name based on the image name
func (m *ModelArtsLink) getSourceLocationFromImages(ctx context.Context, option *option.InferOption) error {
req := &modelarts.ListImagesReq{
//Platform: m.platform,
Limit: 50,
Offset: 0,
}
ListImagesResp, err := m.modelArtsRpc.ListImages(ctx, req)
if err != nil {
return err
}
if ListImagesResp.Code != 200 {
return errors.New("failed to get ListImages")
}
for _, ListImages := range ListImagesResp.Data {
if option.ModelName == "ChatGLM-6B" {
if ListImages.Name == "chatglm-6b" {
m.SourceLocation = ListImages.SwrPath
return nil
}
} else {
if ListImages.Name == option.ModelName {
m.SourceLocation = ListImages.SwrPath
return nil
}
}
}
return errors.New("SourceLocation not set")
}
// Get AI Application List
func (m *ModelArtsLink) GetModelId(ctx context.Context, option *option.InferOption) error {
req := &modelarts.ListModelReq{
Platform: m.platform,
ModelName: option.ModelName,
//ModelType: "Image",
Limit: int64(m.pageIndex),
Offset: int64(m.pageSize),
}
ListModelResp, err := m.modelArtsRpc.ListModels(ctx, req)
if err != nil {
return err
}
if ListModelResp.Code == 200 {
//return errors.New("failed to get ModelId")
for _, ListModel := range ListModelResp.Models {
if ListModel.ModelName == option.ModelName {
option.ModelId = ListModel.ModelId
m.Version = ListModel.ModelVersion
return nil
}
}
}
err = m.CreateModel(ctx, option)
if err != nil {
return err
}
return nil
}
func (m *ModelArtsLink) GetModel(ctx context.Context, option *option.InferOption) string {
req := &modelarts.ShowModelReq{
Platform: m.platform,
ModelId: option.ModelId,
}
ctx, cancel := context.WithTimeout(context.Background(), 50*time.Second)
defer cancel()
ShowModelsResp, err := m.modelArtsRpc.ShowModels(ctx, req)
if err != nil {
if err == context.DeadlineExceeded {
log.Println("Request timed out")
// 重试请求或其他处理
} else {
log.Fatalf("could not call method: %v", err)
}
}
if ShowModelsResp.Code != 200 {
errors.New("failed to get findModelsStatus")
}
m.ModelType = ShowModelsResp.ShowModelDetail.ModelAlgorithm
return ShowModelsResp.ShowModelDetail.ModelStatus
}
// Get AI Application List
func (m *ModelArtsLink) GetModelStatus(ctx context.Context, option *option.InferOption) error {
var wg sync.WaitGroup
wg.Add(1)
// 使用goroutine进行轮询
//defer wg.Done()
for {
status := m.GetModel(ctx, option)
if status == "published" {
fmt.Println("Model is now published.")
break // 一旦状态变为published就退出循环
}
fmt.Println("Waiting for model to be published...")
time.Sleep(5 * time.Second) // 等待一段时间后再次检查
}
// 在这里执行模型状态为published后需要进行的操作
fmt.Println("Continuing with the program...")
return nil
}
// Create an AI application
func (m *ModelArtsLink) CreateModel(ctx context.Context, option *option.InferOption) error {
//Before creating an AI application, check if there are any images that can be created
err := m.getSourceLocationFromImages(ctx, option)
if err != nil { //
return errors.New("No image available for creationd")
}
//
var CMD string
if option.ModelName == "imagenet_resnet50" {
CMD = ImageNetResnet50Cmd
} else if option.ModelName == "ChatGLM-6B" {
CMD = ChatGLM6BCmd
}
if m.Version == "" {
m.Version = "0.0.1"
}
version, err := ParseVersion(m.Version)
version.Increment()
req := &modelarts.CreateModelReq{
Platform: m.platform,
ModelName: option.ModelName,
ModelType: "Image",
ModelVersion: version.String(),
SourceLocation: m.SourceLocation,
InstallType: []string{"real-time"},
Cmd: CMD,
ModelAlgorithm: option.ModelType,
}
ModelResp, err := m.modelArtsRpc.CreateModel(ctx, req)
if err != nil {
return err
}
if ModelResp.Code != 200 {
return errors.New("failed to get ModelId")
}
option.ModelId = ModelResp.ModelId
return nil
}
func (m *ModelArtsLink) GetSpecifications(ctx context.Context, option *option.AiOption, ifoption *option.InferOption) error {
req := &modelarts.ListSpecificationsReq{
//Platform: m.platform,
IsPersonalCluster: false,
InferType: "real-time",
Limit: m.pageIndex,
OffSet: m.pageSize,
}
ListSpecificationsResp, err := m.modelArtsRpc.ListSpecifications(ctx, req)
if err != nil {
return err
}
for _, ListSpecifications := range ListSpecificationsResp.Specifications {
if ListSpecifications.Specification == "modelarts.kat1.xlarge" {
ifoption.Specification = ListSpecifications.Specification
return nil
}
}
return nil
}
func (m *ModelArtsLink) GetTrainingTaskLog(ctx context.Context, taskId string, instanceNum string) (string, error) {
req := &modelartsservice.GetTrainingJobLogsPreviewReq{
Platform: m.platform,
TaskId: "worker-0",
TrainingJobId: taskId,
}
resp, err := m.modelArtsRpc.GetTrainingJobLogsPreview(ctx, req)
if err != nil {
return "", err
}
if strings.Contains(resp.Content, "404 Not Found") {
resp.Content = "waiting for logs..."
}
return resp.Content, nil
}
func (m *ModelArtsLink) GetTrainingTask(ctx context.Context, taskId string) (*collector.Task, error) {
resp, err := m.QueryTask(ctx, taskId)
if err != nil {
return nil, err
}
jobresp, ok := (resp).(*modelartsservice.JobResponse)
if jobresp.ErrorMsg != "" || !ok {
if jobresp.ErrorMsg != "" {
return nil, errors.New(jobresp.ErrorMsg)
} else {
return nil, errors.New("get training task failed, empty error returned")
}
}
var task collector.Task
task.Id = jobresp.Metadata.Id
switch strings.ToLower(jobresp.Status.Phase) {
case "completed":
milliTimestamp := int64(jobresp.Status.StartTime)
task.Start = timeutils.MillisecondsToUTCString(milliTimestamp, time.DateTime)
duration := int64(jobresp.Status.Duration)
task.End = timeutils.MillisecondsToAddDurationToUTCString(milliTimestamp, duration, time.DateTime)
task.Status = constants.Completed
case "failed":
milliTimestamp := int64(jobresp.Status.StartTime)
task.Start = timeutils.MillisecondsToUTCString(milliTimestamp, time.DateTime)
duration := int64(jobresp.Status.Duration)
task.End = timeutils.MillisecondsToAddDurationToUTCString(milliTimestamp, duration, time.DateTime)
task.Status = constants.Failed
case "running":
milliTimestamp := int64(jobresp.Status.StartTime)
task.Start = timeutils.MillisecondsToUTCString(milliTimestamp, time.DateTime)
task.Status = constants.Running
case "stopped":
task.Status = constants.Stopped
case "pending":
task.Status = constants.Pending
case "terminated":
//TODO Failed
task.Status = constants.Failed
default:
task.Status = "undefined"
}
return &task, nil
}
func (m *ModelArtsLink) Execute(ctx context.Context, option *option.AiOption, mode int) (interface{}, error) {
switch mode {
case executor.SUBMIT_MODE_JOINT_CLOUD:
err := m.GenerateSubmitParams(ctx, option)
if err != nil {
return nil, err
}
case executor.SUBMIT_MODE_STORAGE_SCHEDULE:
var ascendNum int32
for _, res := range option.ResourcesRequired {
typeName, ok := res["type"]
if !ok {
continue
}
switch typeName {
case "NPU":
num, ok := res["number"]
if !ok {
continue
}
n := common.ConvertTypeToString(num)
val, err := strconv.ParseInt(n, 10, 32)
if err != nil {
return nil, err
}
ascendNum = int32(val)
}
}
req := &modelarts.TrainingJobFlavorsReq{
Platform: "modelarts-CloudBrain2",
FlavorType: "",
}
resp, err := m.modelArtsRpc.GetTrainingJobFlavors(ctx, req)
for _, v := range resp.Flavors {
if ascendNum == v.FlavorInfo.Npu.UnitNum {
option.ResourceId = v.FlavorId
break
} else if ascendNum <= 1 {
option.ResourceId = "modelarts.kat1.xlarge"
break
} else if ascendNum == 2 {
option.ResourceId = "modelarts.kat1.2xlarge"
break
} else if ascendNum > 2 && ascendNum <= 4 {
option.ResourceId = "modelarts.kat1.4xlarge"
break
} else if ascendNum >= 5 && ascendNum <= 8 {
option.ResourceId = "modelarts.kat1.8xlarge"
break
} else if ascendNum > 8 {
option.ResourceId = "modelarts.kat1.8xlarge"
break
}
}
if err != nil {
return nil, err
}
option.ComputeCard = NPU
default:
return nil, errors.New("failed to choose submit mode")
}
task, err := m.SubmitTask(ctx, option.ImageId, option.Cmd, option.Envs, option.Params, option.ResourceId, option.DatasetsId, option.AlgorithmId, option.TaskType)
if err != nil {
return nil, err
}
return task, nil
}
func (m *ModelArtsLink) GenerateSubmitParams(ctx context.Context, option *option.AiOption) error {
err := m.generateResourceId(ctx, option, nil)
if err != nil {
return err
}
err = m.generateAlgorithmId(ctx, option)
if err != nil {
return err
}
err = m.generateImageId(option)
if err != nil {
return err
}
err = m.generateCmd(option)
if err != nil {
return err
}
err = m.generateEnv(option)
if err != nil {
return err
}
err = m.generateParams(option)
if err != nil {
return err
}
return nil
}
func (m *ModelArtsLink) generateResourceId(ctx context.Context, option *option.AiOption, ifoption *option.InferOption) error {
option.ResourceId = "modelarts.kat1.xlarge"
return nil
}
func (m *ModelArtsLink) generateImageId(option *option.AiOption) error {
return nil
}
func (m *ModelArtsLink) generateCmd(option *option.AiOption) error {
return nil
}
func (m *ModelArtsLink) generateEnv(option *option.AiOption) error {
return nil
}
func (m *ModelArtsLink) generateParams(option *option.AiOption) error {
return nil
}
func (m *ModelArtsLink) generateAlgorithmId(ctx context.Context, option *option.AiOption) error {
req := &modelarts.ListAlgorithmsReq{
Platform: m.platform,
Offset: m.pageIndex,
Limit: m.pageSize,
}
resp, err := m.modelArtsRpc.ListAlgorithms(ctx, req)
if err != nil {
return err
}
if resp.ErrorMsg != "" {
return errors.New("failed to get algorithmId")
}
for _, algorithm := range resp.Items {
engVersion := algorithm.JobConfig.Engine.EngineVersion
if strings.Contains(engVersion, option.TaskType) {
ns := strings.Split(algorithm.Metadata.Name, DASH)
if ns[0] != option.TaskType {
continue
}
if ns[1] != option.DatasetsName {
continue
}
if ns[2] != option.AlgorithmName {
continue
}
option.AlgorithmId = algorithm.Metadata.Id
return nil
}
}
if option.AlgorithmId == "" {
return errors.New("Algorithm does not exist")
}
return errors.New("failed to get AlgorithmId")
}
func (m *ModelArtsLink) GetClusterInferUrl(ctx context.Context, option *option.InferOption) (*inference.ClusterInferUrl, error) {
var imageUrls []*inference.InferUrl
urlReq := &modelartsclient.ImageReasoningUrlReq{
ServiceName: option.ModelName,
Type: option.ModelType,
Card: "npu",
}
urlResp, err := m.modelArtsRpc.ImageReasoningUrl(ctx, urlReq)
if err != nil {
return nil, err
}
imageUrl := &inference.InferUrl{
Url: urlResp.Url,
Card: "npu",
}
imageUrls = append(imageUrls, imageUrl)
clusterWithUrl := &inference.ClusterInferUrl{
ClusterName: m.platform,
ClusterType: TYPE_MODELARTS,
InferUrls: imageUrls,
}
return clusterWithUrl, nil
}
func (m *ModelArtsLink) GetInferDeployInstanceList(ctx context.Context) ([]*inference.DeployInstance, error) {
var insList []*inference.DeployInstance
req := &modelarts.ListServicesReq{
Platform: m.platform,
OffSet: m.pageIndex,
Limit: m.pageSize,
}
//list, err := m.modelArtsRpc.ListServices(ctx, req)
resp, err := m.modelArtsRpc.ListServices(ctx, req)
if err != nil {
return nil, err
}
if resp.ErrorMsg != "" {
return nil, errors.New(resp.Msg)
}
for _, services := range resp.Services {
ins := &inference.DeployInstance{}
ins.InstanceName = services.ServiceName
ins.InstanceId = services.ServiceId
ins.Status = services.Status
ins.InferCard = "NPU"
ins.ClusterName = m.platform
ins.CreatedTime = string(services.StartTime)
ins.ClusterType = TYPE_MODELARTS
insList = append(insList, ins)
}
return insList, nil
}
func (m *ModelArtsLink) StartInferDeployInstance(ctx context.Context, id string) bool {
req := &modelartsclient.UpdateServiceReq{
ServiceId: id,
Status: "running",
}
resp, err := m.modelArtsRpc.UpdateService(ctx, req)
if err != nil || resp.Code != 0 {
return false
}
if resp.Code == 0 {
return true
}
return false
}
func (m *ModelArtsLink) StopInferDeployInstance(ctx context.Context, id string) bool {
req := &modelartsclient.UpdateServiceReq{
ServiceId: id,
Status: "stopped",
}
resp, err := m.modelArtsRpc.UpdateService(ctx, req)
if err != nil || resp.Code != 0 {
return false
}
if resp.Code == 0 {
return true
}
return false
}
func (m *ModelArtsLink) GetInferDeployInstance(ctx context.Context, id string) (*inference.DeployInstance, error) {
req := &modelarts.ShowServiceReq{
ServiceId: id,
}
resp, err := m.modelArtsRpc.ShowService(ctx, req)
if err != nil {
return nil, err
}
if resp.ErrorMsg != "" {
return nil, errors.New(resp.Msg)
}
ins := &inference.DeployInstance{}
ins.InstanceName = resp.ServiceName
ins.InstanceId = resp.ServiceId
ins.Status = resp.Status
ins.InferCard = "NPU"
ins.ClusterName = m.platform
ins.CreatedTime = string(resp.StartTime)
ins.ClusterType = TYPE_MODELARTS
ins.ModelName = resp.Config[0].ModelName
ins.ModelType = m.ModelType
ins.InferUrl = resp.AccessAddress
return ins, nil
}
func (m *ModelArtsLink) GetImageInferResult(ctx context.Context, url string, file multipart.File, fileName string) (string, error) {
return "", nil
}
func (m *ModelArtsLink) CreateInferDeployInstance(ctx context.Context, option *option.InferOption) (string, error) {
err := m.GetModelId(ctx, option)
if err != nil {
return "", err
}
err = m.GetModelStatus(ctx, option)
if err != nil {
return "", err
}
configParam := &modelarts.ServiceConfig{
Specification: "modelarts.kat1.xlarge",
Weight: 100,
ModelId: option.ModelId,
InstanceCount: 1,
}
var configItems []*modelarts.ServiceConfig
configItems = append(configItems, configParam)
now := time.Now()
timestampSec := now.Unix()
str := strconv.FormatInt(timestampSec, 10)
req := &modelarts.CreateServiceReq{
Platform: m.platform,
Config: configItems,
InferType: "real-time",
ServiceName: option.ModelName + "_" + option.ModelType + "_" + Npu + "_" + str,
}
ctx, cancel := context.WithTimeout(context.Background(), 150*time.Second)
defer cancel()
resp, err := m.modelArtsRpc.CreateService(ctx, req)
if err != nil {
return "", err
}
return resp.ServiceId, nil
}
func (m *ModelArtsLink) CheckModelExistence(ctx context.Context, name string, mtype string) bool {
ifoption := &option.InferOption{
ModelName: name,
ModelType: mtype,
}
err := m.CheckImageExist(ctx, ifoption)
if err != nil {
return false
}
return true
}
func (m *ModelArtsLink) CheckImageExist(ctx context.Context, option *option.InferOption) error {
req := &modelarts.ListImagesReq{
Limit: m.pageSize,
Offset: m.pageIndex,
}
ListImageResp, err := m.modelArtsRpc.ListImages(ctx, req)
if err != nil {
return err
}
var modelName string
if ListImageResp.Code == 200 {
//return errors.New("failed to get ModelId")
for _, ListImage := range ListImageResp.Data {
if option.ModelName == "ChatGLM-6B" {
modelName = "chatglm-6b"
} else {
modelName = option.ModelName
}
if ListImage.Name == modelName {
return nil
}
}
}
return errors.New("failed to find Image ")
}
func (m *ModelArtsLink) GetResourceSpecs(ctx context.Context) (*collector.ResourceSpec, error) {
var wg sync.WaitGroup
//查询modelarts资源规格
req := &modelarts.GetResourceFlavorsReq{}
resp, err := m.modelArtsRpc.GetResourceFlavors(ctx, req)
if err != nil {
return nil, err
}
if resp.Msg != "" {
return nil, errors.New(resp.Msg)
}
MoUsage := MoUsage{}
var cpusum int64 = 0
var npusum int64 = 0
var memorysum int64 = 0
var VMemorysum int64 = 0
var RunningTaskNum int64 = 0
var BalanceValue float64 = -1
var RateValue float64 = 0.930000
var StorageValue int64 = 1024
var AvailableValue int64 = 886
for _, Flavors := range resp.Items {
if Flavors.Metadata.Name == "modelarts.kat1.8xlarge" {
MoUsage.CpuSize, err = strconv.ParseInt(Flavors.Spec.Cpu, 10, 64) //CPU的值
if err != nil {
// 如果转换失败,处理错误
fmt.Println("转换错误:", err)
return nil, err
}
cpusum = MoUsage.CpuSize
MoUsage.NpuSize, err = strconv.ParseInt(Flavors.Spec.Npu.Size, 10, 64) //NPU的值
if err != nil {
// 如果转换失败,处理错误
fmt.Println("转换错误:", err)
return nil, err
}
npusum = MoUsage.NpuSize
re := regexp.MustCompile(`\d+`)
numberStr := re.FindString(Flavors.Spec.Memory) //正则表达式去单位
MoUsage.MemorySize, err = strconv.ParseInt(numberStr, 10, 64) //内存的值
if err != nil {
// 如果转换失败,处理错误
fmt.Println("转换错误:", err)
return nil, err
}
memorysum = MoUsage.MemorySize * 1024
}
}
//查询获取训练作业支持的公共规格包括1,2,4,8卡的选择和显存的数值
reqJobFlavors := &modelarts.TrainingJobFlavorsReq{
Platform: m.platform,
}
respJobFlavors, err := m.modelArtsRpc.GetTrainingJobFlavors(ctx, reqJobFlavors)
if err != nil {
wg.Done()
return nil, err
}
for _, TrainLists := range respJobFlavors.Flavors {
if TrainLists.FlavorId == "modelarts.kat1.8xlarge" {
re := regexp.MustCompile(`\d+`)
numberStr := re.FindString(string(TrainLists.FlavorInfo.Npu.Memory)) //正则表达式去单位
MoUsage.VMemorySize, err = strconv.ParseInt(numberStr, 10, 64) //显存的值
VMemorysum = MoUsage.VMemorySize * int64(TrainLists.FlavorInfo.Npu.UnitNum)
}
}
reqTraining := &modelarts.ListTrainingJobsreq{
Platform: m.platform,
}
//查询作业列表
respList, err := m.modelArtsRpc.GetListTrainingJobs(ctx, reqTraining)
if err != nil {
wg.Done()
return nil, err
}
var CoreNum int32 = 0
var NpuNum int32 = 0
var MemoryNum int32 = 0
var VMemoryNum int64 = 0
for _, TrainLists := range respList.Items {
if len(respList.Items) == 0 {
wg.Done()
}
if TrainLists.Status.Phase == "Running" {
CoreNum += TrainLists.Spec.Resource.FlavorDetail.FlavorInfo.Cpu.CoreNum
NpuNum += TrainLists.Spec.Resource.FlavorDetail.FlavorInfo.Npu.UnitNum
MemoryNum += TrainLists.Spec.Resource.FlavorDetail.FlavorInfo.Memory.Size
VMemoryNum, _ = strconv.ParseInt(TrainLists.Spec.Resource.FlavorDetail.FlavorInfo.Npu.Memory, 10, 64)
VMemoryNum += VMemoryNum
RunningTaskNum += 1
}
}
MoUsage.CpuAvailable = cpusum - int64(CoreNum)
MoUsage.NpuAvailable = npusum - int64(NpuNum)
MoUsage.MemoryAvailable = memorysum - int64(MemoryNum)
MoUsage.VMemoryAvailable = VMemorysum - VMemoryNum
UsageCPU := &collector.Usage{Type: strings.ToUpper(CPU), Name: strings.ToUpper("ARM"), Total: &collector.UnitValue{Unit: CPUCORE, Value: cpusum}, Available: &collector.UnitValue{Unit: CPUCORE, Value: MoUsage.CpuAvailable}}
UsageNPU := &collector.Usage{Type: strings.ToUpper(NPU), Name: ASCEND, Total: &collector.UnitValue{Unit: NUMBER, Value: npusum}, Available: &collector.UnitValue{Unit: NUMBER, Value: MoUsage.NpuAvailable}}
UsageMEMORY := &collector.Usage{Type: strings.ToUpper(MEMORY), Name: strings.ToUpper(RAM), Total: &collector.UnitValue{Unit: GIGABYTE, Value: memorysum}, Available: &collector.UnitValue{Unit: GIGABYTE, Value: MoUsage.MemoryAvailable}}
UsageVMEMORY := &collector.Usage{Type: strings.ToUpper(MEMORY), Name: strings.ToUpper(VRAM), Total: &collector.UnitValue{Unit: GIGABYTE, Value: VMemorysum}, Available: &collector.UnitValue{Unit: GIGABYTE, Value: MoUsage.VMemoryAvailable}}
RunningTask := &collector.Usage{Type: strings.ToUpper(RUNNINGTASK), Total: &collector.UnitValue{Unit: NUMBER, Value: RunningTaskNum}}
Balance := &collector.Usage{Type: strings.ToUpper(BALANCE), Total: &collector.UnitValue{Unit: RMB, Value: BalanceValue}}
Rate := &collector.Usage{Type: strings.ToUpper(RATE), Total: &collector.UnitValue{Unit: PERHOUR, Value: RateValue}}
Storage := &collector.Usage{Type: strings.ToUpper(STORAGE), Total: &collector.UnitValue{Unit: GIGABYTE, Value: StorageValue}, Name: strings.ToUpper("disk"), Available: &collector.UnitValue{Unit: GIGABYTE, Value: AvailableValue}}
resUsage := &collector.ResourceSpec{
ClusterId: strconv.FormatInt(m.participantId, 10),
}
cres := &collector.ClusterResource{}
cres.Resource = UsageNPU
cres.BaseResources = append(cres.BaseResources, UsageCPU)
cres.BaseResources = append(cres.BaseResources, UsageMEMORY)
cres.BaseResources = append(cres.BaseResources, UsageVMEMORY)
cres.BaseResources = append(cres.BaseResources, Storage)
RunningTaskRes := &collector.ClusterResource{}
RunningTaskRes.Resource = RunningTask
BalanceRes := &collector.ClusterResource{}
BalanceRes.Resource = Balance
RateRes := &collector.ClusterResource{}
RateRes.Resource = Rate
/* StorageRes := &collector.ClusterResource{}
StorageRes.Resource = Storage*/
resUsage.Resources = append(resUsage.Resources, cres)
resUsage.Resources = append(resUsage.Resources, RunningTaskRes)
resUsage.Resources = append(resUsage.Resources, BalanceRes)
resUsage.Resources = append(resUsage.Resources, RateRes)
//resUsage.Resources = append(resUsage.Resources, StorageRes)
return resUsage, nil
}
func (m *ModelArtsLink) Stop(ctx context.Context, id string) error {
req := &modelarts.StopTrainingJobReq{
TrainingJobId: id,
ActionType: "terminate",
}
resp, err := m.modelArtsRpc.StopTrainingJob(ctx, req)
if err != nil {
return err
}
if resp.Code != 0 {
return errors.New(resp.ErrorMsg)
}
return nil
}