pcm-coordinator/internal/storeLink/shuguangai.go

1002 lines
26 KiB
Go

/*
Copyright (c) [2023] [pcm]
[pcm-coordinator] is licensed under Mulan PSL v2.
You can use this software according to the terms and conditions of the Mulan PSL v2.
You may obtain a copy of Mulan PSL v2 at:
http://license.coscl.org.cn/MulanPSL2
THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
EITHER EXPaRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
See the Mulan PSL v2 for more details.
*/
package storeLink
import (
"context"
"errors"
"gitlink.org.cn/JointCloud/pcm-ac/hpcAC"
hpcacclient "gitlink.org.cn/JointCloud/pcm-ac/hpcacclient"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/common"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/schedulers/option"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/collector"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/inference"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/types"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/constants"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/utils"
"mime/multipart"
"strconv"
"strings"
"sync"
"time"
)
const (
RAM_SIZE_1G = 1024 // 1G
WORKER_NUMBER = 1
DCU = "DCU"
DCU_TOPS = 24.5
PYTORCH = "Pytorch"
TASK_PYTORCH_PREFIX = "PytorchTask"
TENSORFLOW = "Tensorflow"
RESOURCE_GROUP = "kshdtest"
WorkPath = "/work/home/acgnnmfbwo/pcmv1/"
TimeoutLimit = "10:00:00"
PythonCodePath = "/work/home/acgnnmfbwo/111111/py/test.py"
DATASETS_DIR = KUNSHAN_DIR + "/dataset"
ALGORITHM_DIR = KUNSHAN_DIR + "/algorithm"
KUNSHAN_DIR = "/public/home/acgnnmfbwo/pcmv1"
TRAIN_FILE = "train.py"
CPUCOREPRICEPERHOUR = 0.09
DCUPRICEPERHOUR = 2.0
KB = 1024
TIMEOUT = 20
DEPLOY_INSTANCE_LIMIT = 100
ProtocolType = "HTTP"
ContainerPort = 8881
JUPYTER = "jupyter"
)
var (
RESOURCESGAIMAP = map[string]ResourceSpecSGAI{
"WodTB2rJ8SobMgQ1nrtR245jxOrsovFi": {
CPU: 1,
GPU: 1,
RAM: 2 * RAM_SIZE_1G,
},
"6d41v1XV53MQPmQOJ5kNatIck9yl8nWZ": {
CPU: 1,
GPU: 2,
RAM: 2 * RAM_SIZE_1G,
},
"OBtVaaXAv9n9FbLR7pWAoa3yR13jXwNc": {
CPU: 2,
GPU: 3,
RAM: 4 * RAM_SIZE_1G,
},
"sBWfpkntUzsWYly11kdwEHZOYYIsFmve": {
CPU: 4,
GPU: 4,
RAM: 8 * RAM_SIZE_1G,
},
"jeYBVPwyIALjVYNzHvysh2o5CsBpBLp2": {
CPU: 5,
GPU: 5,
RAM: 10 * RAM_SIZE_1G,
},
}
RESOURCESPECSAI = map[string]string{
"WodTB2rJ8SobMgQ1nrtR245jxOrsovFi": "CPU:1, DCU:1, RAM:2G",
"6d41v1XV53MQPmQOJ5kNatIck9yl8nWZ": "CPU:1, DCU:2, RAM:2G",
"OBtVaaXAv9n9FbLR7pWAoa3yR13jXwNc": "CPU:2, DCU:3, RAM:4G",
"sBWfpkntUzsWYly11kdwEHZOYYIsFmve": "CPU:4, DCU:4, RAM:8G",
"jeYBVPwyIALjVYNzHvysh2o5CsBpBLp2": "CPU:5, DCU:5, RAM:10G",
}
ModelNameCmdMap = map[string]string{
"blip-image-captioning-base": "sudo pip install transformers python-multipart fastapi uvicorn[standard]; sudo python /public/home/acgnnmfbwo/pcmv1/inference/pytorch/blip_image_captioning_base/infer.py",
"imagenet_resnet50": "sudo pip install fastapi uvicorn[standard] python-multipart; sudo python /public/home/acgnnmfbwo/pcmv1/inference/pytorch/imagenet_resnet50/infer.py",
}
)
type ResourceSpecSGAI struct {
CPU int64
GPU int64
RAM int64
}
type ShuguangAi struct {
aCRpc hpcacclient.HpcAC
platform string
participantId int64
}
func NewShuguangAi(aCRpc hpcAC.HpcACClient, name string, id int64) *ShuguangAi {
return &ShuguangAi{aCRpc: aCRpc, platform: name, participantId: id}
}
func (s *ShuguangAi) UploadImage(ctx context.Context, path string) (interface{}, error) {
return nil, nil
}
func (s *ShuguangAi) DeleteImage(ctx context.Context, imageId string) (interface{}, error) {
return nil, nil
}
func (s *ShuguangAi) QueryImageList(ctx context.Context) (interface{}, error) {
// shuguangAi获取镜像列表
req := &hpcAC.GetImageListAiReq{
AcceleratorType: DCU,
TaskType: PYTORCH,
}
resp, err := s.aCRpc.GetImageListAi(ctx, req)
if err != nil {
return nil, err
}
return resp, nil
}
func (s *ShuguangAi) SubmitPytorchTask(ctx context.Context, imageId string, cmd string, envs []string, params []string, resourceId string, datasetsId string, algorithmId string) (interface{}, error) {
//判断是否resourceId匹配自定义资源Id
_, isMapContainsKey := RESOURCESPECSAI[resourceId]
if !isMapContainsKey {
return nil, errors.New("shuguangAi资源Id不存在")
}
//根据imageId获取imagePath, version
imageReq := &hpcAC.GetImageAiByIdReq{ImageId: imageId}
imageResp, err := s.aCRpc.GetImageAiById(ctx, imageReq)
if err != nil {
return nil, err
}
//python参数
var pythonArg string
for _, param := range params {
s := strings.Split(param, COMMA)
pythonArg += PY_PARAM_PREFIX + s[0] + "=" + s[1] + SPACE
}
//环境变量
var env string
for _, e := range envs {
s := strings.Split(e, COMMA)
env += s[0] + "=" + s[1] + SPACE
}
//set paths
paths := strings.Split(algorithmId, DASH)
workPath := ALGORITHM_DIR + FORWARD_SLASH + paths[0] + FORWARD_SLASH + paths[1] + DASH + paths[2]
codePath := workPath + FORWARD_SLASH + TRAIN_FILE
req := &hpcAC.SubmitPytorchTaskReq{
Params: &hpcAC.SubmitPytorchTaskParams{
TaskName: TASK_PYTORCH_PREFIX + UNDERSCORE + utils.RandomString(10),
WorkPath: workPath,
IsDistributed: false,
IsHvd: false,
Env: env,
AcceleratorType: DCU,
Version: imageResp.Image.Version,
ImagePath: imageResp.Image.Path,
WorkerNumber: WORKER_NUMBER,
ResourceGroup: RESOURCE_GROUP,
TimeoutLimit: TimeoutLimit,
PythonCodePath: codePath,
PythonArg: pythonArg,
},
}
updateSGAIRequestByResourceId(resourceId, req)
resp, err := s.aCRpc.SubmitPytorchTask(ctx, req)
if err != nil {
return nil, err
}
return resp, nil
}
func updateSGAIRequestByResourceId(resourceId string, req *hpcAC.SubmitPytorchTaskReq) {
spec := RESOURCESGAIMAP[resourceId]
req.Params.WorkerCpuNumber = spec.CPU
req.Params.WorkerGpuNumber = spec.GPU
req.Params.WorkerRamSize = spec.RAM
}
func (s *ShuguangAi) SubmitTensorflowTask(ctx context.Context, imageId string, cmd string, envs []string, params []string, resourceId string, datasetsId string, algorithmId string) (interface{}, error) {
//req := &hpcAC.SubmitTensorflowTaskReq{
// Params: &hpcAC.SubmitTensorflowTaskParams{
//
// }
//}
return nil, nil
}
func (s *ShuguangAi) SubmitTask(ctx context.Context, imageId string, cmd string, envs []string, params []string, resourceId string, datasetsId string, algorithmId string, aiType string) (interface{}, error) {
// set algorithmId temporarily for storelink submit
if algorithmId == "" {
algorithmId = "pytorch-mnist-fcn"
}
// shuguangAi提交任务
switch aiType {
case PYTORCH_TASK:
task, err := s.SubmitPytorchTask(ctx, imageId, cmd, envs, params, resourceId, datasetsId, algorithmId)
if err != nil {
return nil, err
}
return task, nil
case TENSORFLOW_TASK:
task, err := s.SubmitTensorflowTask(ctx, imageId, cmd, envs, params, resourceId, datasetsId, algorithmId)
if err != nil {
return nil, err
}
return task, nil
}
return nil, errors.New("shuguangAi不支持的任务类型")
}
func (s *ShuguangAi) QueryTask(ctx context.Context, taskId string) (interface{}, error) {
// shuguangAi获取任务
req := &hpcAC.GetPytorchTaskReq{
Id: taskId,
}
resp, err := s.aCRpc.GetPytorchTask(ctx, req)
if err != nil {
return nil, err
}
return resp, nil
}
func (s *ShuguangAi) DeleteTask(ctx context.Context, taskId string) (interface{}, error) {
// shuguangAi删除任务
req := &hpcAC.DeleteTaskAiReq{
Ids: taskId,
}
resp, err := s.aCRpc.DeleteTaskAi(ctx, req)
if err != nil {
return nil, err
}
return resp, nil
}
func (s *ShuguangAi) QuerySpecs(ctx context.Context) (interface{}, error) {
resp := &types.GetResourceSpecsResp{}
for k, v := range RESOURCESPECSAI {
var respec types.ResourceSpecSl
respec.SpecId = k
respec.SpecName = v
respec.ParticipantId = s.participantId
respec.ParticipantName = s.platform
resp.ResourceSpecs = append(resp.ResourceSpecs, &respec)
}
resp.Success = true
return resp, nil
}
func (s *ShuguangAi) GetResourceStats(ctx context.Context) (*collector.ResourceStats, error) {
var wg sync.WaitGroup
wg.Add(5)
var cBalance = make(chan float64)
var cMemTotal = make(chan float64)
var cTotalCpu = make(chan int64)
resourceStats := &collector.ResourceStats{
ClusterId: strconv.FormatInt(s.participantId, 10),
Name: s.platform,
}
dcu := &collector.Card{
Platform: SHUGUANGAI,
Type: CARD,
Name: DCU,
TOpsAtFp16: DCU_TOPS,
}
//history jobs
go func() {
hReq := &hpcAC.ListHistoryJobReq{}
hReq.Start = 0
hReq.Limit = 1
hReq.IsQueryByQueueTime = "false"
hReq.TimeType = "CUSTOM"
hReq.StartTime = "2024-01-01 01:01:01"
endTime := time.Now().Format("2006-01-02 15:04:05")
hReq.EndTime = endTime
hResp, err := s.aCRpc.ListHistoryJob(ctx, hReq)
if err != nil || hResp.Code != "0" {
wg.Done()
return
}
resourceStats.TaskCompleted = int64(hResp.Data.Total)
wg.Done()
}()
//balance
go func() {
userReq := &hpcAC.GetUserInfoReq{}
userinfo, err := s.aCRpc.GetUserInfo(ctx, userReq)
if err != nil {
return
}
balance, _ := strconv.ParseFloat(userinfo.Data.AccountBalance, 64)
resourceStats.Balance = balance
cBalance <- balance
}()
//resource limit
go func() {
limitReq := &hpcAC.QueueReq{}
limitResp, err := s.aCRpc.QueryUserQuotasLimit(ctx, limitReq)
if err != nil || limitResp.Code != "0" {
wg.Done()
return
}
totalCpu := limitResp.Data.AccountMaxCpu
totalDcu := limitResp.Data.AccountMaxDcu
dcu.CardNum = int32(totalDcu)
resourceStats.CpuCoreTotal = totalCpu
cTotalCpu <- totalCpu
wg.Done()
}()
//disk
go func() {
diskReq := &hpcAC.ParaStorQuotaReq{}
diskResp, err := s.aCRpc.ParaStorQuota(ctx, diskReq)
if err != nil {
wg.Done()
return
}
totalDisk := common.RoundFloat(diskResp.Data[0].Threshold*KB*KB*KB, 3)
availDisk := common.RoundFloat((diskResp.Data[0].Threshold-diskResp.Data[0].Usage)*KB*KB*KB, 3)
resourceStats.DiskTotal = totalDisk
resourceStats.DiskAvail = availDisk
wg.Done()
}()
//memory
go func() {
nodeResp, err := s.aCRpc.GetNodeResources(ctx, nil)
if err != nil {
wg.Done()
return
}
memSize := common.RoundFloat(float64(nodeResp.Data.MemorySize)*KB*KB, 3) // MB to BYTES
resourceStats.MemTotal = memSize
cMemTotal <- memSize
wg.Done()
}()
//resources being occupied
go func() {
var memSize float64
var totalCpu int64
select {
case v := <-cMemTotal:
memSize = v
case <-time.After(TIMEOUT * time.Second):
wg.Done()
return
}
select {
case v := <-cTotalCpu:
totalCpu = v
case <-time.After(TIMEOUT * time.Second):
wg.Done()
return
}
memberJobResp, err := s.aCRpc.GetMemberJobs(ctx, nil)
if err != nil {
wg.Done()
return
}
var cpuCoreAvail int64
var memAvail float64
if len(memberJobResp.Data) != 0 {
cpuCoreAvail = totalCpu
memAvail = memSize
} else {
var cpuCoreUsed int64
var memUsed float64
for _, datum := range memberJobResp.Data {
cpuCoreUsed += datum.CpuCore
}
memUsed = float64(cpuCoreUsed * 2 * KB * KB * KB) // 2 GB per cpu core
if cpuCoreUsed > totalCpu {
cpuCoreAvail = 0
} else {
cpuCoreAvail = totalCpu - cpuCoreUsed
}
if memUsed > memSize {
memAvail = 0
} else {
memAvail = memSize - memUsed
}
}
resourceStats.CpuCoreAvail = cpuCoreAvail
resourceStats.MemAvail = memAvail
wg.Done()
}()
//usable hours
var balance float64
select {
case v := <-cBalance:
balance = v
case <-time.After(TIMEOUT * time.Second):
return nil, errors.New("get balance rpc call failed")
}
var cards []*collector.Card
cardHours := common.RoundFloat(balance/DCUPRICEPERHOUR, 3)
cpuHours := common.RoundFloat(balance/CPUCOREPRICEPERHOUR, 3)
dcu.CardHours = cardHours
resourceStats.CpuCoreHours = cpuHours
resourceStats.Balance = balance
wg.Wait()
cards = append(cards, dcu)
resourceStats.CardsAvail = cards
return resourceStats, nil
}
func (s *ShuguangAi) GetDatasetsSpecs(ctx context.Context) ([]*collector.DatasetsSpecs, error) {
req := &hpcAC.GetFileListReq{Limit: 100, Path: DATASETS_DIR, Start: 0}
list, err := s.aCRpc.GetFileList(ctx, req)
if err != nil {
return nil, err
}
if list.Code != "0" {
return nil, errors.New(list.Msg)
}
specs := []*collector.DatasetsSpecs{}
for _, file := range list.Data.FileList {
spec := &collector.DatasetsSpecs{Name: file.Name, Size: strconv.FormatInt(file.Size, 10)}
specs = append(specs, spec)
}
return specs, nil
}
func (s *ShuguangAi) GetAlgorithms(ctx context.Context) ([]*collector.Algorithm, error) {
var algorithms []*collector.Algorithm
for _, t := range GetTaskTypes() {
taskType := t
req := &hpcAC.GetFileListReq{Limit: 100, Path: ALGORITHM_DIR + FORWARD_SLASH + taskType, Start: 0, Order: "asc", OrderBy: "name", KeyWord: ""}
list, err := s.aCRpc.GetFileList(ctx, req)
if err != nil {
return nil, err
}
if list.Code != "0" {
return nil, errors.New(list.Msg)
}
for _, file := range list.Data.FileList {
algorithm := &collector.Algorithm{Name: file.Name, Platform: SHUGUANGAI, TaskType: taskType}
algorithms = append(algorithms, algorithm)
}
}
return algorithms, nil
}
func (s *ShuguangAi) GetComputeCards(ctx context.Context) ([]string, error) {
var cards []string
cards = append(cards, DCU)
return cards, nil
}
func (s *ShuguangAi) GetUserBalance(ctx context.Context) (float64, error) {
userReq := &hpcAC.GetUserInfoReq{}
userinfo, err := s.aCRpc.GetUserInfo(ctx, userReq)
if err != nil {
return 0, err
}
balance, _ := strconv.ParseFloat(userinfo.Data.AccountBalance, 64)
return balance, nil
}
func (s *ShuguangAi) DownloadAlgorithmCode(ctx context.Context, resourceType string, card string, taskType string, dataset string, algorithm string) (string, error) {
algoName := dataset + DASH + algorithm
req := &hpcAC.GetFileReq{
Path: ALGORITHM_DIR + FORWARD_SLASH + taskType + FORWARD_SLASH + algoName + FORWARD_SLASH + TRAIN_FILE,
}
resp, err := s.aCRpc.GetFile(ctx, req)
if err != nil {
return "", err
}
return resp.Content, nil
}
func (s *ShuguangAi) UploadAlgorithmCode(ctx context.Context, resourceType string, card string, taskType string, dataset string, algorithm string, code string) error {
algoName := dataset + DASH + algorithm
req := &hpcAC.UploadFileReq{
Path: ALGORITHM_DIR + FORWARD_SLASH + taskType + FORWARD_SLASH + algoName + FORWARD_SLASH,
Cover: "cover",
File: code,
}
_, err := s.aCRpc.UploadFile(ctx, req)
if err != nil {
return err
}
return nil
}
func (s *ShuguangAi) GetTrainingTaskLog(ctx context.Context, taskId string, instanceNum string) (string, error) {
req := &hpcAC.GetInstanceLogReq{
TaskId: taskId,
InstanceNum: instanceNum,
LineCount: 1000,
StartLineNum: -1,
}
resp, err := s.aCRpc.GetInstanceLog(ctx, req)
if err != nil {
return "", err
}
if resp.Code != "0" {
resp.Data.Content = "waiting for logs..."
}
return resp.Data.Content, nil
}
func (s *ShuguangAi) GetTrainingTask(ctx context.Context, taskId string) (*collector.Task, error) {
resp, err := s.QueryTask(ctx, taskId)
if err != nil {
return nil, err
}
jobresp := (resp).(*hpcAC.GetPytorchTaskResp)
if jobresp.Code != "0" {
return nil, errors.New(jobresp.Msg)
}
var task collector.Task
task.Id = jobresp.Data.Id
if jobresp.Data.StartTime != "" {
task.Start = jobresp.Data.StartTime
}
if jobresp.Data.EndTime != "" {
task.End = jobresp.Data.EndTime
}
task.Status = jobresp.Data.Status
return &task, nil
}
func (s *ShuguangAi) Execute(ctx context.Context, option *option.AiOption) (interface{}, error) {
err := s.GenerateSubmitParams(ctx, option)
if err != nil {
return nil, err
}
task, err := s.SubmitTask(ctx, option.ImageId, option.Cmd, option.Envs, option.Params, option.ResourceId, option.DatasetsId, option.AlgorithmId, option.TaskType)
if err != nil {
return nil, err
}
return task, nil
}
func (s *ShuguangAi) GenerateSubmitParams(ctx context.Context, option *option.AiOption) error {
err := s.generateResourceId(option)
if err != nil {
return err
}
err = s.generateImageId(ctx, option)
if err != nil {
return err
}
err = s.generateAlgorithmId(ctx, option)
if err != nil {
return err
}
err = s.generateCmd(option)
if err != nil {
return err
}
err = s.generateEnv(option)
if err != nil {
return err
}
err = s.generateParams(option)
if err != nil {
return err
}
return nil
}
func (s *ShuguangAi) generateResourceId(option *option.AiOption) error {
if option.ResourceType == "" {
return errors.New("ResourceType not set")
}
if option.ResourceType == CPU {
option.ResourceId = "WodTB2rJ8SobMgQ1nrtR245jxOrsovFi"
option.ComputeCard = CPU
return nil
}
if option.ResourceType == CARD {
if option.ComputeCard == "" {
option.ComputeCard = DCU
}
if strings.ToUpper(option.ComputeCard) != DCU {
return errors.New("computeCard not found")
}
option.ComputeCard = DCU
if 0 <= option.Tops && option.Tops <= DCU_TOPS {
option.ResourceId = "WodTB2rJ8SobMgQ1nrtR245jxOrsovFi"
return nil
}
cardNum := 5
for k, v := range RESOURCESGAIMAP {
for i := 1; i <= cardNum; i++ {
if float64(i)*DCU_TOPS <= option.Tops && option.Tops <= float64(v.GPU)*DCU_TOPS {
option.ResourceId = k
return nil
}
}
}
if option.Tops > float64(cardNum)*DCU_TOPS {
option.ResourceId = "jeYBVPwyIALjVYNzHvysh2o5CsBpBLp2"
return nil
}
}
return errors.New("failed to get ResourceId")
}
func (s *ShuguangAi) generateImageId(ctx context.Context, option *option.AiOption) error {
if option.TaskType == "" {
return errors.New("TaskType not set")
}
taskType := strings.Title(option.TaskType)
req := &hpcAC.GetImageListAiReq{
AcceleratorType: DCU,
TaskType: taskType,
}
resp, err := s.aCRpc.GetImageListAi(ctx, req)
if err != nil {
return errors.New("generateImageId / GetImageListAi: " + err.Error())
}
if resp.Code != "0" {
return errors.New("failed to get imageId")
}
for _, datum := range resp.Data {
ns := strings.Split(datum.Version, COLON)
if ns[0] == "jupyterlab-pytorch" {
option.ImageId = datum.ImageId
return nil
}
}
return errors.New("failed to get ImageId")
}
func (s *ShuguangAi) generateAlgorithmId(ctx context.Context, option *option.AiOption) error {
if option.DatasetsName == "" {
return errors.New("DatasetsName not set")
}
req := &hpcAC.GetFileListReq{Limit: 100, Path: ALGORITHM_DIR + FORWARD_SLASH + option.TaskType, Start: 0}
list, err := s.aCRpc.GetFileList(ctx, req)
if err != nil {
return errors.New("generateAlgorithmId / GetFileListReq: " + err.Error())
}
if list.Code != "0" {
return errors.New(list.Msg)
}
var algorithmId string
for _, file := range list.Data.FileList {
ns := strings.Split(file.Name, DASH)
if ns[0] == option.DatasetsName {
algoName := ns[1]
if option.AlgorithmName == "" {
switch option.DatasetsName {
case "cifar10":
algorithmId = option.TaskType + DASH + option.DatasetsName + DASH + "cnn"
option.AlgorithmId = algorithmId
option.AlgorithmName = algoName
return nil
case "mnist":
algorithmId = option.TaskType + DASH + option.DatasetsName + DASH + "fcn"
option.AlgorithmId = algorithmId
option.AlgorithmName = algoName
return nil
}
} else {
if algoName == option.AlgorithmName {
algorithmId = option.TaskType + DASH + option.DatasetsName + DASH + algoName
option.AlgorithmId = algorithmId
return nil
}
}
}
}
if algorithmId == "" {
return errors.New("Algorithm does not exist")
}
return errors.New("failed to get AlgorithmId")
}
func (s *ShuguangAi) generateCmd(option *option.AiOption) error {
return nil
}
func (s *ShuguangAi) generateEnv(option *option.AiOption) error {
return nil
}
func (s *ShuguangAi) generateParams(option *option.AiOption) error {
if option.ResourceType == "" {
return errors.New("ResourceType not set")
}
if len(option.Params) == 0 {
epoch := "epoch" + COMMA + "1"
option.Params = append(option.Params, epoch)
}
switch option.ResourceType {
case CPU:
card := "card" + COMMA + CPU
option.Params = append(option.Params, card)
return nil
case CARD:
card := "card" + COMMA + "cuda:0"
option.Params = append(option.Params, card)
return nil
}
return errors.New("failed to set params")
}
func (s *ShuguangAi) GetClusterInferUrl(ctx context.Context, option *option.InferOption) (*inference.ClusterInferUrl, error) {
var imageUrls []*inference.InferUrl
urlReq := &hpcAC.GetInferUrlReq{
ModelName: option.ModelName,
Type: option.ModelType,
Card: "dcu",
}
urlResp, err := s.aCRpc.GetInferUrl(ctx, urlReq)
if err != nil {
return nil, err
}
imageUrl := &inference.InferUrl{
Url: urlResp.Url,
Card: "dcu",
}
imageUrls = append(imageUrls, imageUrl)
clusterWithUrl := &inference.ClusterInferUrl{
ClusterName: s.platform,
ClusterType: TYPE_SHUGUANGAI,
InferUrls: imageUrls,
}
return clusterWithUrl, nil
}
func (s *ShuguangAi) GetInferDeployInstanceList(ctx context.Context) ([]*inference.DeployInstance, error) {
var insList []*inference.DeployInstance
params := &hpcAC.GetInstanceServiceListReqParam{
InstanceServiceName: DEPLOY_INSTANCE_PREFIEX,
Status: "",
TaskType: "",
Start: 0,
Limit: DEPLOY_INSTANCE_LIMIT,
Sort: "desc",
}
req := &hpcacclient.GetInstanceServiceListReq{
Param: params,
}
list, err := s.aCRpc.GetInstanceServiceList(ctx, req)
if err != nil {
return nil, err
}
if list.Code != "0" {
return nil, errors.New(list.Msg)
}
for _, datum := range list.Data {
ins := &inference.DeployInstance{}
ins.InstanceName = datum.InstanceServiceName
ins.InstanceId = datum.Id
ins.ClusterName = s.platform
ins.Status = datum.Status
ins.InferCard = DCU
ins.CreatedTime = datum.CreateTime
ins.ClusterType = TYPE_SHUGUANGAI
insList = append(insList, ins)
}
return insList, nil
}
func (s *ShuguangAi) StartInferDeployInstance(ctx context.Context, id string) bool {
req := &hpcAC.StartInstanceServiceReq{
InstanceServiceId: id,
}
resp, err := s.aCRpc.StartInstanceService(ctx, req)
if err != nil || resp.Code != "0" {
return false
}
if resp.Data == id && resp.Code == "0" {
return true
}
return false
}
func (s *ShuguangAi) StopInferDeployInstance(ctx context.Context, id string) bool {
ids := []string{id}
req := &hpcAC.StopInstanceServiceReq{
Ids: ids,
}
resp, err := s.aCRpc.StopInstanceService(ctx, req)
if err != nil || resp.Code != "0" {
return false
}
if resp.Code == "0" {
return true
}
return false
}
func (s *ShuguangAi) GetInferDeployInstance(ctx context.Context, id string) (*inference.DeployInstance, error) {
ins := &inference.DeployInstance{}
req := &hpcAC.GetInstanceServiceDetailReq{
Id: id,
}
resp, err := s.aCRpc.GetInstanceServiceDetail(ctx, req)
if err != nil || resp.Code != "0" || resp.Data == nil {
return nil, err
}
if resp.Data == nil {
return nil, errors.New("GetInferDeployInstance empty")
}
var url string
if resp.Data.Status == constants.Running {
url = resp.Data.ContainerPortInfoList[0].AccessUrl
}
var modelType string
var modelName string
var card string
if resp.Data.Description != "" {
str := strings.Split(resp.Data.Description, FORWARD_SLASH)
if len(str) == 3 {
modelType = str[0]
modelName = str[1]
card = str[2]
}
}
ins.InstanceName = resp.Data.InstanceServiceName
ins.InstanceId = resp.Data.Id
ins.ClusterName = s.platform
ins.Status = resp.Data.Status
ins.InferCard = DCU
ins.CreatedTime = resp.Data.CreateTime
ins.ClusterType = TYPE_SHUGUANGAI
ins.ModelType = modelType
ins.ModelName = modelName
ins.InferUrl = url
ins.InferCard = card
return ins, nil
}
func (s *ShuguangAi) GetImageInferResult(ctx context.Context, url string, file multipart.File, fileName string) (string, error) {
return "", nil
}
func (s *ShuguangAi) CreateInferDeployInstance(ctx context.Context, option *option.InferOption) (string, error) {
containerPortInfoList := []*hpcAC.ContainerPortInfoList{
{
ProtocolType: ProtocolType,
ContainerPort: ContainerPort,
},
}
desc := option.ModelType + FORWARD_SLASH + option.ModelName + FORWARD_SLASH + strings.ToLower(DCU)
instanceServiceName := "infer_instance" + UNDERSCORE + utils.TimeString()
resourceGroup := "kshdtest"
script, ok := ModelNameCmdMap[option.ModelName]
if !ok {
return "", errors.New("failed to set cmd, ModelName not exist")
}
param := &hpcAC.CreateParams{
AcceleratorType: strings.ToLower(DCU),
ContainerPortInfoList: containerPortInfoList,
CpuNumber: 8,
Description: desc,
//env
GpuNumber: 1,
ImagePath: "11.11.100.6:5000/dcu/admin/base/jupyterlab-pytorch:1.13.1-py3.7-dtk23.04-centos7.6",
InstanceServiceName: instanceServiceName,
MountInfoList: make([]*hpcAC.MountInfoList, 0),
//originalVersion
RamSize: 10 * RAM_SIZE_1G,
//rdma
ResourceGroup: resourceGroup,
StartScriptActionScope: "all",
StartScriptContent: script,
//startServiceCommand
//taskClassification: "interactive"
TaskNumber: 1,
TaskType: JUPYTER,
TimeoutLimit: "01:00:00",
UseStartScript: true,
//useStartServiceCommand: false
Version: "jupyterlab-pytorch:1.13.1-py3.7-dtk23.04-centos7.6",
}
req := &hpcacclient.CreateInstanceServiceReq{
Data: param,
}
resp, err := s.aCRpc.CreateInstanceService(ctx, req)
if err != nil {
return "", err
}
if resp.Code != "0" {
return "", errors.New(resp.Msg)
}
return resp.Data, nil
}
func (s *ShuguangAi) CheckModelExistence(ctx context.Context, name string, mtype string) bool {
modelPath := "model" + FORWARD_SLASH + name
req := &hpcAC.IsExistFileReq{
Path: KUNSHAN_DIR + FORWARD_SLASH + modelPath,
}
resp, err := s.aCRpc.IsExistFile(ctx, req)
if err != nil {
return false
}
if resp.Code != "0" || resp.Data == nil {
return false
}
return resp.Data.Exist
}