pcm-coordinator/internal/storeLink/modelarts.go

542 lines
15 KiB
Go

/*
Copyright (c) [2023] [pcm]
[pcm-coordinator] is licensed under Mulan PSL v2.
You can use this software according to the terms and conditions of the Mulan PSL v2.
You may obtain a copy of Mulan PSL v2 at:
http://license.coscl.org.cn/MulanPSL2
THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
EITHER EXPaRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
See the Mulan PSL v2 for more details.
*/
package storeLink
import (
"context"
"github.com/pkg/errors"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/schedulers/option"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/collector"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/inference"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/constants"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/utils"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/utils/timeutils"
"gitlink.org.cn/JointCloud/pcm-modelarts/client/imagesservice"
"gitlink.org.cn/JointCloud/pcm-modelarts/client/modelartsservice"
"gitlink.org.cn/JointCloud/pcm-modelarts/modelarts"
modelartsclient "gitlink.org.cn/JointCloud/pcm-modelarts/modelarts"
"mime/multipart"
"strconv"
"strings"
"time"
)
const (
Ascend = "Ascend"
Npu = "npu"
)
type ModelArtsLink struct {
modelArtsRpc modelartsservice.ModelArtsService
modelArtsImgRpc imagesservice.ImagesService
platform string
participantId int64
pageIndex int32
pageSize int32
}
func NewModelArtsLink(modelArtsRpc modelartsservice.ModelArtsService, modelArtsImgRpc imagesservice.ImagesService, name string, id int64, nickname string) *ModelArtsLink {
return &ModelArtsLink{modelArtsRpc: modelArtsRpc, modelArtsImgRpc: modelArtsImgRpc, platform: nickname, participantId: id, pageIndex: 0, pageSize: 50}
}
func (m *ModelArtsLink) UploadImage(ctx context.Context, path string) (interface{}, error) {
//TODO modelArts上传镜像
return nil, nil
}
func (m *ModelArtsLink) DeleteImage(ctx context.Context, imageId string) (interface{}, error) {
// TODO modelArts删除镜像
return nil, nil
}
func (m *ModelArtsLink) QueryImageList(ctx context.Context) (interface{}, error) {
// modelArts获取镜像列表
req := &modelarts.ListRepoReq{
Offset: "0",
Limit: strconv.Itoa(int(m.pageSize)),
Platform: m.platform,
}
resp, err := m.modelArtsImgRpc.ListReposDetails(ctx, req)
if err != nil {
return nil, err
}
return resp, nil
}
func (m *ModelArtsLink) SubmitTask(ctx context.Context, imageId string, cmd string, envs []string, params []string, resourceId string, datasetsId string, algorithmId string, aiType string) (interface{}, error) {
// modelArts提交任务
environments := make(map[string]string)
parameters := make([]*modelarts.ParametersTrainJob, 0)
for _, env := range envs {
s := strings.Split(env, COMMA)
environments[s[0]] = s[1]
}
for _, param := range params {
s := strings.Split(param, COMMA)
parameters = append(parameters, &modelarts.ParametersTrainJob{
Name: s[0],
Value: s[1],
})
}
req := &modelarts.CreateTrainingJobReq{
Kind: "job",
Metadata: &modelarts.MetadataS{
Name: TASK_NAME_PREFIX + utils.RandomString(10),
WorkspaceId: "0",
},
Algorithm: &modelarts.Algorithms{
Id: algorithmId,
Engine: &modelarts.EngineCreateTraining{
ImageUrl: imageId,
},
Command: cmd,
Environments: environments,
Parameters: parameters,
},
Spec: &modelarts.SpecsC{
Resource: &modelarts.ResourceCreateTraining{
FlavorId: resourceId,
NodeCount: 1,
},
},
Platform: m.platform,
}
resp, err := m.modelArtsRpc.CreateTrainingJob(ctx, req)
if err != nil {
return nil, err
}
return resp, nil
}
func (m *ModelArtsLink) QueryTask(ctx context.Context, taskId string) (interface{}, error) {
// 获取任务
req := &modelarts.DetailTrainingJobsReq{
TrainingJobId: taskId,
Platform: m.platform,
}
resp, err := m.modelArtsRpc.GetTrainingJobs(ctx, req)
if err != nil {
return nil, err
}
return resp, nil
}
func (m *ModelArtsLink) DeleteTask(ctx context.Context, taskId string) (interface{}, error) {
// 删除任务
req := &modelarts.DeleteTrainingJobReq{
TrainingJobId: taskId,
Platform: m.platform,
}
resp, err := m.modelArtsRpc.DeleteTrainingJob(ctx, req)
if err != nil {
return nil, err
}
return resp, nil
}
func (m *ModelArtsLink) QuerySpecs(ctx context.Context) (interface{}, error) {
// octopus查询资源规格
req := &modelarts.TrainingJobFlavorsReq{
Platform: m.platform,
}
resp, err := m.modelArtsRpc.GetTrainingJobFlavors(ctx, req)
if err != nil {
return nil, err
}
return resp, nil
}
func (m *ModelArtsLink) GetResourceStats(ctx context.Context) (*collector.ResourceStats, error) {
req := &modelarts.GetPoolsRuntimeMetricsReq{}
resp, err := m.modelArtsRpc.GetPoolsRuntimeMetrics(ctx, req)
if err != nil {
return nil, err
}
if resp.ErrorMsg != "" {
return nil, errors.New("failed to get algorithms")
}
resourceStats := &collector.ResourceStats{}
CpuCoreTotalSum := int64(0)
CpuCoreAvailSum := int64(0)
MemTotalSum := float64(0)
MemAvailSum := float64(0)
var CpuCoreTotal int64
var CpuCoreAvail int64
var MemTotal float64
var MemAvail float64
for _, items := range resp.Items {
//TODO The value of taskType is temporarily fixed to "pytorch"
CpuCoreTotal, err = strconv.ParseInt(items.Table.Capacity.Value.Cpu, 10, 64)
CpuCoreTotalSum += CpuCoreTotal
CpuCoreAvail, err = strconv.ParseInt(items.Table.Allocated.Value.Cpu, 10, 64)
CpuCoreAvailSum += CpuCoreAvail
MemTotal, err = strconv.ParseFloat(items.Table.Capacity.Value.Memory, 64)
MemTotalSum += MemTotal
MemAvail, err = strconv.ParseFloat(items.Table.Allocated.Value.Memory, 64)
MemAvailSum += MemAvail
}
resourceStats.CpuCoreTotal = CpuCoreTotalSum
resourceStats.CpuCoreAvail = CpuCoreAvailSum
resourceStats.MemTotal = MemTotalSum
resourceStats.MemAvail = MemAvailSum
req1 := &modelarts.GetResourceFlavorsReq{}
resp1, err := m.modelArtsRpc.GetResourceFlavors(ctx, req1)
num32, _ := strconv.Atoi(resp1.Items[0].Spec.Npu.Size)
var cards []*collector.Card
card := &collector.Card{
Platform: MODELARTS,
Type: CARD,
Name: Npu,
CardNum: int32(num32),
TOpsAtFp16: float64(num32 * 320),
}
cards = append(cards, card)
resourceStats.CardsAvail = cards
return resourceStats, nil
}
func (m *ModelArtsLink) GetDatasetsSpecs(ctx context.Context) ([]*collector.DatasetsSpecs, error) {
return nil, nil
}
func (m *ModelArtsLink) GetAlgorithms(ctx context.Context) ([]*collector.Algorithm, error) {
var algorithms []*collector.Algorithm
req := &modelarts.ListAlgorithmsReq{
Platform: m.platform,
Offset: m.pageIndex,
Limit: m.pageSize,
}
resp, err := m.modelArtsRpc.ListAlgorithms(ctx, req)
if err != nil {
return nil, err
}
if resp.ErrorMsg != "" {
return nil, errors.New("failed to get algorithms")
}
for _, a := range resp.Items {
//TODO The value of taskType is temporarily fixed to "pytorch"
algorithm := &collector.Algorithm{Name: a.Metadata.Name, Platform: MODELARTS, TaskType: "pytorch"}
algorithms = append(algorithms, algorithm)
}
return algorithms, nil
}
func (m *ModelArtsLink) GetComputeCards(ctx context.Context) ([]string, error) {
var cards []string
cards = append(cards, Ascend)
return cards, nil
}
func (m *ModelArtsLink) GetUserBalance(ctx context.Context) (float64, error) {
return 0, nil
}
func (m *ModelArtsLink) DownloadAlgorithmCode(ctx context.Context, resourceType string, card string, taskType string, dataset string, algorithm string) (string, error) {
return "", nil
}
func (m *ModelArtsLink) UploadAlgorithmCode(ctx context.Context, resourceType string, card string, taskType string, dataset string, algorithm string, code string) error {
return nil
}
func (m *ModelArtsLink) GetTrainingTaskLog(ctx context.Context, taskId string, instanceNum string) (string, error) {
req := &modelartsservice.GetTrainingJobLogsPreviewReq{
Platform: m.platform,
TaskId: "worker-0",
TrainingJobId: taskId,
}
resp, err := m.modelArtsRpc.GetTrainingJobLogsPreview(ctx, req)
if err != nil {
return "", err
}
if strings.Contains(resp.Content, "404 Not Found") {
resp.Content = "waiting for logs..."
}
return resp.Content, nil
}
func (m *ModelArtsLink) GetTrainingTask(ctx context.Context, taskId string) (*collector.Task, error) {
resp, err := m.QueryTask(ctx, taskId)
if err != nil {
return nil, err
}
jobresp, ok := (resp).(*modelartsservice.JobResponse)
if jobresp.ErrorMsg != "" || !ok {
if jobresp.ErrorMsg != "" {
return nil, errors.New(jobresp.ErrorMsg)
} else {
return nil, errors.New("get training task failed, empty error returned")
}
}
var task collector.Task
task.Id = jobresp.Metadata.Id
switch strings.ToLower(jobresp.Status.Phase) {
case "completed":
milliTimestamp := int64(jobresp.Status.StartTime)
task.Start = timeutils.MillisecondsToUTCString(milliTimestamp, time.DateTime)
duration := int64(jobresp.Status.Duration)
task.End = timeutils.MillisecondsToAddDurationToUTCString(milliTimestamp, duration, time.DateTime)
task.Status = constants.Completed
case "failed":
milliTimestamp := int64(jobresp.Status.StartTime)
task.Start = timeutils.MillisecondsToUTCString(milliTimestamp, time.DateTime)
duration := int64(jobresp.Status.Duration)
task.End = timeutils.MillisecondsToAddDurationToUTCString(milliTimestamp, duration, time.DateTime)
task.Status = constants.Failed
case "running":
milliTimestamp := int64(jobresp.Status.StartTime)
task.Start = timeutils.MillisecondsToUTCString(milliTimestamp, time.DateTime)
task.Status = constants.Running
case "stopped":
task.Status = constants.Stopped
case "pending":
task.Status = constants.Pending
case "terminated":
//TODO Failed
task.Status = constants.Failed
default:
task.Status = "undefined"
}
return &task, nil
}
func (m *ModelArtsLink) Execute(ctx context.Context, option *option.AiOption) (interface{}, error) {
err := m.GenerateSubmitParams(ctx, option)
if err != nil {
return nil, err
}
task, err := m.SubmitTask(ctx, option.ImageId, option.Cmd, option.Envs, option.Params, option.ResourceId, option.DatasetsId, option.AlgorithmId, option.TaskType)
if err != nil {
return nil, err
}
return task, nil
}
func (m *ModelArtsLink) GenerateSubmitParams(ctx context.Context, option *option.AiOption) error {
err := m.generateResourceId(ctx, option)
if err != nil {
return err
}
err = m.generateAlgorithmId(ctx, option)
if err != nil {
return err
}
err = m.generateImageId(option)
if err != nil {
return err
}
err = m.generateCmd(option)
if err != nil {
return err
}
err = m.generateEnv(option)
if err != nil {
return err
}
err = m.generateParams(option)
if err != nil {
return err
}
return nil
}
func (m *ModelArtsLink) generateResourceId(ctx context.Context, option *option.AiOption) error {
option.ResourceId = "modelarts.kat1.xlarge"
return nil
}
func (m *ModelArtsLink) generateImageId(option *option.AiOption) error {
return nil
}
func (m *ModelArtsLink) generateCmd(option *option.AiOption) error {
return nil
}
func (m *ModelArtsLink) generateEnv(option *option.AiOption) error {
return nil
}
func (m *ModelArtsLink) generateParams(option *option.AiOption) error {
return nil
}
func (m *ModelArtsLink) generateAlgorithmId(ctx context.Context, option *option.AiOption) error {
req := &modelarts.ListAlgorithmsReq{
Platform: m.platform,
Offset: m.pageIndex,
Limit: m.pageSize,
}
resp, err := m.modelArtsRpc.ListAlgorithms(ctx, req)
if err != nil {
return err
}
if resp.ErrorMsg != "" {
return errors.New("failed to get algorithmId")
}
for _, algorithm := range resp.Items {
engVersion := algorithm.JobConfig.Engine.EngineVersion
if strings.Contains(engVersion, option.TaskType) {
ns := strings.Split(algorithm.Metadata.Name, DASH)
if ns[0] != option.TaskType {
continue
}
if ns[1] != option.DatasetsName {
continue
}
if ns[2] != option.AlgorithmName {
continue
}
option.AlgorithmId = algorithm.Metadata.Id
return nil
}
}
if option.AlgorithmId == "" {
return errors.New("Algorithm does not exist")
}
return errors.New("failed to get AlgorithmId")
}
func (m *ModelArtsLink) GetClusterInferUrl(ctx context.Context, option *option.InferOption) (*inference.ClusterInferUrl, error) {
var imageUrls []*inference.InferUrl
urlReq := &modelartsclient.ImageReasoningUrlReq{
ServiceName: option.ModelName,
Type: option.ModelType,
Card: "npu",
}
urlResp, err := m.modelArtsRpc.ImageReasoningUrl(ctx, urlReq)
if err != nil {
return nil, err
}
imageUrl := &inference.InferUrl{
Url: urlResp.Url,
Card: "npu",
}
imageUrls = append(imageUrls, imageUrl)
clusterWithUrl := &inference.ClusterInferUrl{
ClusterName: m.platform,
ClusterType: TYPE_MODELARTS,
InferUrls: imageUrls,
}
return clusterWithUrl, nil
}
func (m *ModelArtsLink) GetInferDeployInstanceList(ctx context.Context) ([]*inference.DeployInstance, error) {
var insList []*inference.DeployInstance
req := &modelarts.ListServicesReq{
Platform: m.platform,
OffSet: m.pageIndex,
Limit: m.pageSize,
}
//list, err := m.modelArtsRpc.ListServices(ctx, req)
resp, err := m.modelArtsRpc.ListServices(ctx, req)
if err != nil {
return nil, err
}
if resp.ErrorMsg != "" {
return nil, errors.New(resp.Msg)
}
for _, services := range resp.Services {
ins := &inference.DeployInstance{}
ins.InstanceName = services.ServiceName
ins.InstanceId = services.ServiceId
ins.Status = services.Status
ins.InferCard = "NPU"
ins.ClusterName = m.platform
ins.CreatedTime = string(services.StartTime)
ins.ClusterType = TYPE_MODELARTS
insList = append(insList, ins)
}
return insList, nil
}
func (m *ModelArtsLink) StartInferDeployInstance(ctx context.Context, id string) bool {
req := &modelartsclient.UpdateServiceReq{
ServiceId: id,
Status: "running",
}
resp, err := m.modelArtsRpc.UpdateService(ctx, req)
if err != nil || resp.Code != 0 {
return false
}
if resp.Code == 0 {
return true
}
return false
}
func (m *ModelArtsLink) StopInferDeployInstance(ctx context.Context, id string) bool {
req := &modelartsclient.UpdateServiceReq{
ServiceId: id,
Status: "stopped",
}
resp, err := m.modelArtsRpc.UpdateService(ctx, req)
if err != nil || resp.Code != 0 {
return false
}
if resp.Code == 0 {
return true
}
return false
}
func (m *ModelArtsLink) GetInferDeployInstance(ctx context.Context, id string) (*inference.DeployInstance, error) {
req := &modelarts.ShowServiceReq{
ServiceId: id,
}
resp, err := m.modelArtsRpc.ShowService(ctx, req)
if err != nil {
return nil, err
}
if resp.ErrorMsg != "" {
return nil, errors.New(resp.Msg)
}
ins := &inference.DeployInstance{}
ins.InstanceName = resp.ServiceName
ins.InstanceId = resp.ServiceId
ins.Status = resp.Status
ins.InferCard = "NPU"
ins.ClusterName = m.platform
ins.CreatedTime = string(resp.StartTime)
ins.ClusterType = TYPE_MODELARTS
return ins, nil
}
func (m *ModelArtsLink) GetInferResult(ctx context.Context, url string, file multipart.File, fileName string) (string, error) {
return "", nil
}