forked from JointCloud/pcm-coordinator
997 lines
26 KiB
Go
997 lines
26 KiB
Go
/*
|
|
|
|
Copyright (c) [2023] [pcm]
|
|
[pcm-coordinator] is licensed under Mulan PSL v2.
|
|
You can use this software according to the terms and conditions of the Mulan PSL v2.
|
|
You may obtain a copy of Mulan PSL v2 at:
|
|
http://license.coscl.org.cn/MulanPSL2
|
|
THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
|
|
EITHER EXPaRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
|
|
MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
|
|
See the Mulan PSL v2 for more details.
|
|
|
|
*/
|
|
|
|
package storeLink
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"gitlink.org.cn/JointCloud/pcm-ac/hpcAC"
|
|
hpcacclient "gitlink.org.cn/JointCloud/pcm-ac/hpcacclient"
|
|
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/common"
|
|
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/schedulers/option"
|
|
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/collector"
|
|
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/inference"
|
|
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/types"
|
|
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/constants"
|
|
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/utils"
|
|
"mime/multipart"
|
|
"strconv"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
)
|
|
|
|
const (
|
|
RAM_SIZE_1G = 1024 // 1G
|
|
WORKER_NUMBER = 1
|
|
DCU = "DCU"
|
|
DCU_TOPS = 24.5
|
|
PYTORCH = "Pytorch"
|
|
TASK_PYTORCH_PREFIX = "PytorchTask"
|
|
TENSORFLOW = "Tensorflow"
|
|
RESOURCE_GROUP = "wzhdtest"
|
|
WorkPath = "/work/home/acgnnmfbwo/pcmv1/"
|
|
TimeoutLimit = "10:00:00"
|
|
PythonCodePath = "/work/home/acgnnmfbwo/111111/py/test.py"
|
|
DATASETS_DIR = "/work/home/acgnnmfbwo/pcmv1/dataset"
|
|
ALGORITHM_DIR = "/work/home/acgnnmfbwo/pcmv1/algorithm"
|
|
KUNSHAN_DIR = "/public/home/acgnnmfbwo/pcmv1"
|
|
TRAIN_FILE = "train.py"
|
|
CPUCOREPRICEPERHOUR = 0.09
|
|
DCUPRICEPERHOUR = 2.0
|
|
KB = 1024
|
|
TIMEOUT = 20
|
|
DEPLOY_INSTANCE_LIMIT = 100
|
|
ProtocolType = "HTTP"
|
|
ContainerPort = 8881
|
|
JUPYTER = "jupyter"
|
|
)
|
|
|
|
var (
|
|
RESOURCESGAIMAP = map[string]ResourceSpecSGAI{
|
|
"WodTB2rJ8SobMgQ1nrtR245jxOrsovFi": {
|
|
CPU: 1,
|
|
GPU: 1,
|
|
RAM: 2 * RAM_SIZE_1G,
|
|
},
|
|
"6d41v1XV53MQPmQOJ5kNatIck9yl8nWZ": {
|
|
CPU: 1,
|
|
GPU: 2,
|
|
RAM: 2 * RAM_SIZE_1G,
|
|
},
|
|
"OBtVaaXAv9n9FbLR7pWAoa3yR13jXwNc": {
|
|
CPU: 2,
|
|
GPU: 3,
|
|
RAM: 4 * RAM_SIZE_1G,
|
|
},
|
|
"sBWfpkntUzsWYly11kdwEHZOYYIsFmve": {
|
|
CPU: 4,
|
|
GPU: 4,
|
|
RAM: 8 * RAM_SIZE_1G,
|
|
},
|
|
"jeYBVPwyIALjVYNzHvysh2o5CsBpBLp2": {
|
|
CPU: 5,
|
|
GPU: 5,
|
|
RAM: 10 * RAM_SIZE_1G,
|
|
},
|
|
}
|
|
|
|
RESOURCESPECSAI = map[string]string{
|
|
"WodTB2rJ8SobMgQ1nrtR245jxOrsovFi": "CPU:1, DCU:1, RAM:2G",
|
|
"6d41v1XV53MQPmQOJ5kNatIck9yl8nWZ": "CPU:1, DCU:2, RAM:2G",
|
|
"OBtVaaXAv9n9FbLR7pWAoa3yR13jXwNc": "CPU:2, DCU:3, RAM:4G",
|
|
"sBWfpkntUzsWYly11kdwEHZOYYIsFmve": "CPU:4, DCU:4, RAM:8G",
|
|
"jeYBVPwyIALjVYNzHvysh2o5CsBpBLp2": "CPU:5, DCU:5, RAM:10G",
|
|
}
|
|
|
|
ModelNameCmdMap = map[string]string{
|
|
"blip-image-captioning-base": "pip install transformers python-multipart fastapi uvicorn[standard]; python /public/home/acgnnmfbwo/pcmv1/inference/pytorch/blip_image_captioning_base/infer.py",
|
|
"imagenet_resnet50": "pip install fastapi uvicorn[standard] python-multipart; python /public/home/acgnnmfbwo/pcmv1/inference/pytorch/imagenet_resnet50/infer.py",
|
|
}
|
|
)
|
|
|
|
type ResourceSpecSGAI struct {
|
|
CPU int64
|
|
GPU int64
|
|
RAM int64
|
|
}
|
|
|
|
type ShuguangAi struct {
|
|
aCRpc hpcacclient.HpcAC
|
|
platform string
|
|
participantId int64
|
|
}
|
|
|
|
func NewShuguangAi(aCRpc hpcAC.HpcACClient, name string, id int64) *ShuguangAi {
|
|
return &ShuguangAi{aCRpc: aCRpc, platform: name, participantId: id}
|
|
}
|
|
|
|
func (s *ShuguangAi) UploadImage(ctx context.Context, path string) (interface{}, error) {
|
|
return nil, nil
|
|
}
|
|
|
|
func (s *ShuguangAi) DeleteImage(ctx context.Context, imageId string) (interface{}, error) {
|
|
return nil, nil
|
|
}
|
|
|
|
func (s *ShuguangAi) QueryImageList(ctx context.Context) (interface{}, error) {
|
|
// shuguangAi获取镜像列表
|
|
req := &hpcAC.GetImageListAiReq{
|
|
AcceleratorType: DCU,
|
|
TaskType: PYTORCH,
|
|
}
|
|
resp, err := s.aCRpc.GetImageListAi(ctx, req)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return resp, nil
|
|
}
|
|
|
|
func (s *ShuguangAi) SubmitPytorchTask(ctx context.Context, imageId string, cmd string, envs []string, params []string, resourceId string, datasetsId string, algorithmId string) (interface{}, error) {
|
|
//判断是否resourceId匹配自定义资源Id
|
|
_, isMapContainsKey := RESOURCESPECSAI[resourceId]
|
|
if !isMapContainsKey {
|
|
return nil, errors.New("shuguangAi资源Id不存在")
|
|
}
|
|
|
|
//根据imageId获取imagePath, version
|
|
imageReq := &hpcAC.GetImageAiByIdReq{ImageId: imageId}
|
|
imageResp, err := s.aCRpc.GetImageAiById(ctx, imageReq)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
//python参数
|
|
var pythonArg string
|
|
for _, param := range params {
|
|
s := strings.Split(param, COMMA)
|
|
pythonArg += PY_PARAM_PREFIX + s[0] + "=" + s[1] + SPACE
|
|
}
|
|
|
|
//环境变量
|
|
var env string
|
|
for _, e := range envs {
|
|
s := strings.Split(e, COMMA)
|
|
env += s[0] + "=" + s[1] + SPACE
|
|
}
|
|
|
|
//set paths
|
|
paths := strings.Split(algorithmId, DASH)
|
|
workPath := ALGORITHM_DIR + FORWARD_SLASH + paths[0] + FORWARD_SLASH + paths[1] + DASH + paths[2]
|
|
codePath := workPath + FORWARD_SLASH + TRAIN_FILE
|
|
|
|
req := &hpcAC.SubmitPytorchTaskReq{
|
|
Params: &hpcAC.SubmitPytorchTaskParams{
|
|
TaskName: TASK_PYTORCH_PREFIX + UNDERSCORE + utils.RandomString(10),
|
|
WorkPath: workPath,
|
|
IsDistributed: false,
|
|
IsHvd: false,
|
|
Env: env,
|
|
AcceleratorType: DCU,
|
|
Version: imageResp.Image.Version,
|
|
ImagePath: imageResp.Image.Path,
|
|
WorkerNumber: WORKER_NUMBER,
|
|
ResourceGroup: RESOURCE_GROUP,
|
|
TimeoutLimit: TimeoutLimit,
|
|
PythonCodePath: codePath,
|
|
PythonArg: pythonArg,
|
|
},
|
|
}
|
|
|
|
updateSGAIRequestByResourceId(resourceId, req)
|
|
|
|
resp, err := s.aCRpc.SubmitPytorchTask(ctx, req)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return resp, nil
|
|
}
|
|
|
|
func updateSGAIRequestByResourceId(resourceId string, req *hpcAC.SubmitPytorchTaskReq) {
|
|
spec := RESOURCESGAIMAP[resourceId]
|
|
req.Params.WorkerCpuNumber = spec.CPU
|
|
req.Params.WorkerGpuNumber = spec.GPU
|
|
req.Params.WorkerRamSize = spec.RAM
|
|
}
|
|
|
|
func (s *ShuguangAi) SubmitTensorflowTask(ctx context.Context, imageId string, cmd string, envs []string, params []string, resourceId string, datasetsId string, algorithmId string) (interface{}, error) {
|
|
//req := &hpcAC.SubmitTensorflowTaskReq{
|
|
// Params: &hpcAC.SubmitTensorflowTaskParams{
|
|
//
|
|
// }
|
|
//}
|
|
return nil, nil
|
|
}
|
|
|
|
func (s *ShuguangAi) SubmitTask(ctx context.Context, imageId string, cmd string, envs []string, params []string, resourceId string, datasetsId string, algorithmId string, aiType string) (interface{}, error) {
|
|
// set algorithmId temporarily for storelink submit
|
|
if algorithmId == "" {
|
|
algorithmId = "pytorch-mnist-fcn"
|
|
}
|
|
|
|
// shuguangAi提交任务
|
|
switch aiType {
|
|
case PYTORCH_TASK:
|
|
task, err := s.SubmitPytorchTask(ctx, imageId, cmd, envs, params, resourceId, datasetsId, algorithmId)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return task, nil
|
|
case TENSORFLOW_TASK:
|
|
task, err := s.SubmitTensorflowTask(ctx, imageId, cmd, envs, params, resourceId, datasetsId, algorithmId)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return task, nil
|
|
}
|
|
return nil, errors.New("shuguangAi不支持的任务类型")
|
|
}
|
|
|
|
func (s *ShuguangAi) QueryTask(ctx context.Context, taskId string) (interface{}, error) {
|
|
// shuguangAi获取任务
|
|
req := &hpcAC.GetPytorchTaskReq{
|
|
Id: taskId,
|
|
}
|
|
resp, err := s.aCRpc.GetPytorchTask(ctx, req)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return resp, nil
|
|
}
|
|
|
|
func (s *ShuguangAi) DeleteTask(ctx context.Context, taskId string) (interface{}, error) {
|
|
// shuguangAi删除任务
|
|
req := &hpcAC.DeleteTaskAiReq{
|
|
Ids: taskId,
|
|
}
|
|
resp, err := s.aCRpc.DeleteTaskAi(ctx, req)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return resp, nil
|
|
}
|
|
|
|
func (s *ShuguangAi) QuerySpecs(ctx context.Context) (interface{}, error) {
|
|
resp := &types.GetResourceSpecsResp{}
|
|
|
|
for k, v := range RESOURCESPECSAI {
|
|
var respec types.ResourceSpecSl
|
|
respec.SpecId = k
|
|
respec.SpecName = v
|
|
respec.ParticipantId = s.participantId
|
|
respec.ParticipantName = s.platform
|
|
resp.ResourceSpecs = append(resp.ResourceSpecs, &respec)
|
|
}
|
|
|
|
resp.Success = true
|
|
return resp, nil
|
|
}
|
|
|
|
func (s *ShuguangAi) GetResourceStats(ctx context.Context) (*collector.ResourceStats, error) {
|
|
var wg sync.WaitGroup
|
|
wg.Add(5)
|
|
var cBalance = make(chan float64)
|
|
var cMemTotal = make(chan float64)
|
|
var cTotalCpu = make(chan int64)
|
|
|
|
resourceStats := &collector.ResourceStats{
|
|
ClusterId: strconv.FormatInt(s.participantId, 10),
|
|
Name: s.platform,
|
|
}
|
|
|
|
dcu := &collector.Card{
|
|
Platform: SHUGUANGAI,
|
|
Type: CARD,
|
|
Name: DCU,
|
|
TOpsAtFp16: DCU_TOPS,
|
|
}
|
|
|
|
//history jobs
|
|
go func() {
|
|
hReq := &hpcAC.ListHistoryJobReq{}
|
|
hReq.Start = 0
|
|
hReq.Limit = 1
|
|
hReq.IsQueryByQueueTime = "false"
|
|
hReq.TimeType = "CUSTOM"
|
|
hReq.StartTime = "2024-01-01 01:01:01"
|
|
endTime := time.Now().Format("2006-01-02 15:04:05")
|
|
hReq.EndTime = endTime
|
|
hResp, err := s.aCRpc.ListHistoryJob(ctx, hReq)
|
|
if err != nil || hResp.Code != "0" {
|
|
wg.Done()
|
|
return
|
|
}
|
|
resourceStats.TaskCompleted = int64(hResp.Data.Total)
|
|
|
|
wg.Done()
|
|
}()
|
|
|
|
//balance
|
|
go func() {
|
|
userReq := &hpcAC.GetUserInfoReq{}
|
|
userinfo, err := s.aCRpc.GetUserInfo(ctx, userReq)
|
|
if err != nil {
|
|
return
|
|
}
|
|
balance, _ := strconv.ParseFloat(userinfo.Data.AccountBalance, 64)
|
|
resourceStats.Balance = balance
|
|
|
|
cBalance <- balance
|
|
}()
|
|
|
|
//resource limit
|
|
go func() {
|
|
limitReq := &hpcAC.QueueReq{}
|
|
limitResp, err := s.aCRpc.QueryUserQuotasLimit(ctx, limitReq)
|
|
if err != nil || limitResp.Code != "0" {
|
|
wg.Done()
|
|
return
|
|
}
|
|
totalCpu := limitResp.Data.AccountMaxCpu
|
|
totalDcu := limitResp.Data.AccountMaxDcu
|
|
|
|
dcu.CardNum = int32(totalDcu)
|
|
resourceStats.CpuCoreTotal = totalCpu
|
|
|
|
cTotalCpu <- totalCpu
|
|
wg.Done()
|
|
}()
|
|
|
|
//disk
|
|
go func() {
|
|
diskReq := &hpcAC.ParaStorQuotaReq{}
|
|
diskResp, err := s.aCRpc.ParaStorQuota(ctx, diskReq)
|
|
if err != nil {
|
|
wg.Done()
|
|
return
|
|
}
|
|
|
|
totalDisk := common.RoundFloat(diskResp.Data[0].Threshold*KB*KB*KB, 3)
|
|
availDisk := common.RoundFloat((diskResp.Data[0].Threshold-diskResp.Data[0].Usage)*KB*KB*KB, 3)
|
|
|
|
resourceStats.DiskTotal = totalDisk
|
|
resourceStats.DiskAvail = availDisk
|
|
wg.Done()
|
|
}()
|
|
|
|
//memory
|
|
go func() {
|
|
nodeResp, err := s.aCRpc.GetNodeResources(ctx, nil)
|
|
if err != nil {
|
|
wg.Done()
|
|
return
|
|
}
|
|
memSize := common.RoundFloat(float64(nodeResp.Data.MemorySize)*KB*KB, 3) // MB to BYTES
|
|
|
|
resourceStats.MemTotal = memSize
|
|
cMemTotal <- memSize
|
|
wg.Done()
|
|
}()
|
|
|
|
//resources being occupied
|
|
go func() {
|
|
var memSize float64
|
|
var totalCpu int64
|
|
select {
|
|
case v := <-cMemTotal:
|
|
memSize = v
|
|
case <-time.After(TIMEOUT * time.Second):
|
|
wg.Done()
|
|
return
|
|
}
|
|
select {
|
|
case v := <-cTotalCpu:
|
|
totalCpu = v
|
|
case <-time.After(TIMEOUT * time.Second):
|
|
wg.Done()
|
|
return
|
|
}
|
|
memberJobResp, err := s.aCRpc.GetMemberJobs(ctx, nil)
|
|
if err != nil {
|
|
wg.Done()
|
|
return
|
|
}
|
|
var cpuCoreAvail int64
|
|
var memAvail float64
|
|
if len(memberJobResp.Data) != 0 {
|
|
cpuCoreAvail = totalCpu
|
|
memAvail = memSize
|
|
} else {
|
|
var cpuCoreUsed int64
|
|
var memUsed float64
|
|
for _, datum := range memberJobResp.Data {
|
|
cpuCoreUsed += datum.CpuCore
|
|
}
|
|
memUsed = float64(cpuCoreUsed * 2 * KB * KB * KB) // 2 GB per cpu core
|
|
if cpuCoreUsed > totalCpu {
|
|
cpuCoreAvail = 0
|
|
} else {
|
|
cpuCoreAvail = totalCpu - cpuCoreUsed
|
|
}
|
|
if memUsed > memSize {
|
|
memAvail = 0
|
|
} else {
|
|
memAvail = memSize - memUsed
|
|
}
|
|
}
|
|
resourceStats.CpuCoreAvail = cpuCoreAvail
|
|
resourceStats.MemAvail = memAvail
|
|
wg.Done()
|
|
}()
|
|
|
|
//usable hours
|
|
var balance float64
|
|
|
|
select {
|
|
case v := <-cBalance:
|
|
balance = v
|
|
case <-time.After(TIMEOUT * time.Second):
|
|
return nil, errors.New("get balance rpc call failed")
|
|
}
|
|
|
|
var cards []*collector.Card
|
|
cardHours := common.RoundFloat(balance/DCUPRICEPERHOUR, 3)
|
|
cpuHours := common.RoundFloat(balance/CPUCOREPRICEPERHOUR, 3)
|
|
|
|
dcu.CardHours = cardHours
|
|
resourceStats.CpuCoreHours = cpuHours
|
|
resourceStats.Balance = balance
|
|
|
|
wg.Wait()
|
|
|
|
cards = append(cards, dcu)
|
|
resourceStats.CardsAvail = cards
|
|
|
|
return resourceStats, nil
|
|
}
|
|
|
|
func (s *ShuguangAi) GetDatasetsSpecs(ctx context.Context) ([]*collector.DatasetsSpecs, error) {
|
|
req := &hpcAC.GetFileListReq{Limit: 100, Path: DATASETS_DIR, Start: 0}
|
|
list, err := s.aCRpc.GetFileList(ctx, req)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if list.Code != "0" {
|
|
return nil, errors.New(list.Msg)
|
|
}
|
|
specs := []*collector.DatasetsSpecs{}
|
|
for _, file := range list.Data.FileList {
|
|
spec := &collector.DatasetsSpecs{Name: file.Name, Size: strconv.FormatInt(file.Size, 10)}
|
|
specs = append(specs, spec)
|
|
}
|
|
return specs, nil
|
|
}
|
|
|
|
func (s *ShuguangAi) GetAlgorithms(ctx context.Context) ([]*collector.Algorithm, error) {
|
|
var algorithms []*collector.Algorithm
|
|
for _, t := range GetTaskTypes() {
|
|
taskType := t
|
|
req := &hpcAC.GetFileListReq{Limit: 100, Path: ALGORITHM_DIR + FORWARD_SLASH + taskType, Start: 0, Order: "asc", OrderBy: "name", KeyWord: ""}
|
|
list, err := s.aCRpc.GetFileList(ctx, req)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if list.Code != "0" {
|
|
return nil, errors.New(list.Msg)
|
|
}
|
|
for _, file := range list.Data.FileList {
|
|
algorithm := &collector.Algorithm{Name: file.Name, Platform: SHUGUANGAI, TaskType: taskType}
|
|
algorithms = append(algorithms, algorithm)
|
|
}
|
|
}
|
|
return algorithms, nil
|
|
}
|
|
|
|
func (s *ShuguangAi) GetComputeCards(ctx context.Context) ([]string, error) {
|
|
var cards []string
|
|
cards = append(cards, DCU)
|
|
return cards, nil
|
|
}
|
|
|
|
func (s *ShuguangAi) GetUserBalance(ctx context.Context) (float64, error) {
|
|
userReq := &hpcAC.GetUserInfoReq{}
|
|
userinfo, err := s.aCRpc.GetUserInfo(ctx, userReq)
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
balance, _ := strconv.ParseFloat(userinfo.Data.AccountBalance, 64)
|
|
return balance, nil
|
|
}
|
|
|
|
func (s *ShuguangAi) DownloadAlgorithmCode(ctx context.Context, resourceType string, card string, taskType string, dataset string, algorithm string) (string, error) {
|
|
algoName := dataset + DASH + algorithm
|
|
req := &hpcAC.GetFileReq{
|
|
Path: ALGORITHM_DIR + FORWARD_SLASH + taskType + FORWARD_SLASH + algoName + FORWARD_SLASH + TRAIN_FILE,
|
|
}
|
|
resp, err := s.aCRpc.GetFile(ctx, req)
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
|
|
return resp.Content, nil
|
|
}
|
|
|
|
func (s *ShuguangAi) UploadAlgorithmCode(ctx context.Context, resourceType string, card string, taskType string, dataset string, algorithm string, code string) error {
|
|
algoName := dataset + DASH + algorithm
|
|
req := &hpcAC.UploadFileReq{
|
|
Path: ALGORITHM_DIR + FORWARD_SLASH + taskType + FORWARD_SLASH + algoName + FORWARD_SLASH,
|
|
Cover: "cover",
|
|
File: code,
|
|
}
|
|
|
|
_, err := s.aCRpc.UploadFile(ctx, req)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (s *ShuguangAi) GetTrainingTaskLog(ctx context.Context, taskId string, instanceNum string) (string, error) {
|
|
req := &hpcAC.GetInstanceLogReq{
|
|
TaskId: taskId,
|
|
InstanceNum: instanceNum,
|
|
LineCount: 1000,
|
|
StartLineNum: -1,
|
|
}
|
|
resp, err := s.aCRpc.GetInstanceLog(ctx, req)
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
|
|
if resp.Code != "0" {
|
|
resp.Data.Content = "waiting for logs..."
|
|
}
|
|
|
|
return resp.Data.Content, nil
|
|
}
|
|
|
|
func (s *ShuguangAi) GetTrainingTask(ctx context.Context, taskId string) (*collector.Task, error) {
|
|
resp, err := s.QueryTask(ctx, taskId)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
jobresp := (resp).(*hpcAC.GetPytorchTaskResp)
|
|
if jobresp.Code != "0" {
|
|
return nil, errors.New(jobresp.Msg)
|
|
}
|
|
var task collector.Task
|
|
task.Id = jobresp.Data.Id
|
|
if jobresp.Data.StartTime != "" {
|
|
task.Start = jobresp.Data.StartTime
|
|
}
|
|
if jobresp.Data.EndTime != "" {
|
|
task.End = jobresp.Data.EndTime
|
|
}
|
|
task.Status = jobresp.Data.Status
|
|
|
|
return &task, nil
|
|
}
|
|
|
|
func (s *ShuguangAi) Execute(ctx context.Context, option *option.AiOption) (interface{}, error) {
|
|
err := s.GenerateSubmitParams(ctx, option)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
task, err := s.SubmitTask(ctx, option.ImageId, option.Cmd, option.Envs, option.Params, option.ResourceId, option.DatasetsId, option.AlgorithmId, option.TaskType)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return task, nil
|
|
}
|
|
|
|
func (s *ShuguangAi) GenerateSubmitParams(ctx context.Context, option *option.AiOption) error {
|
|
err := s.generateResourceId(option)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
err = s.generateImageId(ctx, option)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
err = s.generateAlgorithmId(ctx, option)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
err = s.generateCmd(option)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
err = s.generateEnv(option)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
err = s.generateParams(option)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (s *ShuguangAi) generateResourceId(option *option.AiOption) error {
|
|
if option.ResourceType == "" {
|
|
return errors.New("ResourceType not set")
|
|
}
|
|
|
|
if option.ResourceType == CPU {
|
|
option.ResourceId = "WodTB2rJ8SobMgQ1nrtR245jxOrsovFi"
|
|
option.ComputeCard = CPU
|
|
return nil
|
|
}
|
|
|
|
if option.ResourceType == CARD {
|
|
if option.ComputeCard == "" {
|
|
option.ComputeCard = DCU
|
|
}
|
|
|
|
if strings.ToUpper(option.ComputeCard) != DCU {
|
|
return errors.New("computeCard not found")
|
|
}
|
|
option.ComputeCard = DCU
|
|
|
|
if 0 <= option.Tops && option.Tops <= DCU_TOPS {
|
|
option.ResourceId = "WodTB2rJ8SobMgQ1nrtR245jxOrsovFi"
|
|
return nil
|
|
}
|
|
|
|
cardNum := 5
|
|
for k, v := range RESOURCESGAIMAP {
|
|
for i := 1; i <= cardNum; i++ {
|
|
if float64(i)*DCU_TOPS <= option.Tops && option.Tops <= float64(v.GPU)*DCU_TOPS {
|
|
option.ResourceId = k
|
|
return nil
|
|
}
|
|
}
|
|
}
|
|
|
|
if option.Tops > float64(cardNum)*DCU_TOPS {
|
|
option.ResourceId = "jeYBVPwyIALjVYNzHvysh2o5CsBpBLp2"
|
|
return nil
|
|
}
|
|
}
|
|
|
|
return errors.New("failed to get ResourceId")
|
|
}
|
|
|
|
func (s *ShuguangAi) generateImageId(ctx context.Context, option *option.AiOption) error {
|
|
if option.TaskType == "" {
|
|
return errors.New("TaskType not set")
|
|
}
|
|
taskType := strings.Title(option.TaskType)
|
|
req := &hpcAC.GetImageListAiReq{
|
|
AcceleratorType: DCU,
|
|
TaskType: taskType,
|
|
}
|
|
resp, err := s.aCRpc.GetImageListAi(ctx, req)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if resp.Code != "0" {
|
|
return errors.New("failed to get imageId")
|
|
}
|
|
|
|
for _, datum := range resp.Data {
|
|
ns := strings.Split(datum.Version, COLON)
|
|
if ns[0] == "jupyterlab-pytorch" {
|
|
option.ImageId = datum.ImageId
|
|
return nil
|
|
}
|
|
|
|
}
|
|
|
|
return errors.New("failed to get ImageId")
|
|
}
|
|
|
|
func (s *ShuguangAi) generateAlgorithmId(ctx context.Context, option *option.AiOption) error {
|
|
if option.DatasetsName == "" {
|
|
return errors.New("DatasetsName not set")
|
|
}
|
|
|
|
req := &hpcAC.GetFileListReq{Limit: 100, Path: ALGORITHM_DIR + FORWARD_SLASH + option.TaskType, Start: 0}
|
|
list, err := s.aCRpc.GetFileList(ctx, req)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if list.Code != "0" {
|
|
return errors.New(list.Msg)
|
|
}
|
|
|
|
var algorithmId string
|
|
for _, file := range list.Data.FileList {
|
|
ns := strings.Split(file.Name, DASH)
|
|
if ns[0] == option.DatasetsName {
|
|
algoName := ns[1]
|
|
if option.AlgorithmName == "" {
|
|
switch option.DatasetsName {
|
|
case "cifar10":
|
|
algorithmId = option.TaskType + DASH + option.DatasetsName + DASH + "cnn"
|
|
option.AlgorithmId = algorithmId
|
|
option.AlgorithmName = algoName
|
|
return nil
|
|
case "mnist":
|
|
algorithmId = option.TaskType + DASH + option.DatasetsName + DASH + "fcn"
|
|
option.AlgorithmId = algorithmId
|
|
option.AlgorithmName = algoName
|
|
return nil
|
|
}
|
|
} else {
|
|
if algoName == option.AlgorithmName {
|
|
algorithmId = option.TaskType + DASH + option.DatasetsName + DASH + algoName
|
|
option.AlgorithmId = algorithmId
|
|
return nil
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
if algorithmId == "" {
|
|
return errors.New("Algorithm does not exist")
|
|
}
|
|
|
|
return errors.New("failed to get AlgorithmId")
|
|
}
|
|
|
|
func (s *ShuguangAi) generateCmd(option *option.AiOption) error {
|
|
|
|
return nil
|
|
}
|
|
|
|
func (s *ShuguangAi) generateEnv(option *option.AiOption) error {
|
|
|
|
return nil
|
|
}
|
|
|
|
func (s *ShuguangAi) generateParams(option *option.AiOption) error {
|
|
if option.ResourceType == "" {
|
|
return errors.New("ResourceType not set")
|
|
}
|
|
|
|
if len(option.Params) == 0 {
|
|
epoch := "epoch" + COMMA + "1"
|
|
option.Params = append(option.Params, epoch)
|
|
}
|
|
|
|
switch option.ResourceType {
|
|
case CPU:
|
|
card := "card" + COMMA + CPU
|
|
option.Params = append(option.Params, card)
|
|
return nil
|
|
case CARD:
|
|
card := "card" + COMMA + "cuda:0"
|
|
option.Params = append(option.Params, card)
|
|
return nil
|
|
}
|
|
|
|
return errors.New("failed to set params")
|
|
}
|
|
|
|
func (s *ShuguangAi) GetClusterInferUrl(ctx context.Context, option *option.InferOption) (*inference.ClusterInferUrl, error) {
|
|
var imageUrls []*inference.InferUrl
|
|
|
|
urlReq := &hpcAC.GetInferUrlReq{
|
|
ModelName: option.ModelName,
|
|
Type: option.ModelType,
|
|
Card: "dcu",
|
|
}
|
|
|
|
urlResp, err := s.aCRpc.GetInferUrl(ctx, urlReq)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
imageUrl := &inference.InferUrl{
|
|
Url: urlResp.Url,
|
|
Card: "dcu",
|
|
}
|
|
imageUrls = append(imageUrls, imageUrl)
|
|
|
|
clusterWithUrl := &inference.ClusterInferUrl{
|
|
ClusterName: s.platform,
|
|
ClusterType: TYPE_SHUGUANGAI,
|
|
InferUrls: imageUrls,
|
|
}
|
|
return clusterWithUrl, nil
|
|
}
|
|
|
|
func (s *ShuguangAi) GetInferDeployInstanceList(ctx context.Context) ([]*inference.DeployInstance, error) {
|
|
var insList []*inference.DeployInstance
|
|
params := &hpcAC.GetInstanceServiceListReqParam{
|
|
InstanceServiceName: DEPLOY_INSTANCE_PREFIEX,
|
|
Status: "",
|
|
TaskType: "",
|
|
Start: 0,
|
|
Limit: DEPLOY_INSTANCE_LIMIT,
|
|
Sort: "desc",
|
|
}
|
|
req := &hpcacclient.GetInstanceServiceListReq{
|
|
Param: params,
|
|
}
|
|
list, err := s.aCRpc.GetInstanceServiceList(ctx, req)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if list.Code != "0" {
|
|
return nil, errors.New(list.Msg)
|
|
}
|
|
for _, datum := range list.Data {
|
|
ins := &inference.DeployInstance{}
|
|
ins.InstanceName = datum.InstanceServiceName
|
|
ins.InstanceId = datum.Id
|
|
ins.ClusterName = s.platform
|
|
ins.Status = datum.Status
|
|
ins.InferCard = DCU
|
|
ins.CreatedTime = datum.CreateTime
|
|
ins.ClusterType = TYPE_SHUGUANGAI
|
|
|
|
insList = append(insList, ins)
|
|
}
|
|
|
|
return insList, nil
|
|
}
|
|
|
|
func (s *ShuguangAi) StartInferDeployInstance(ctx context.Context, id string) bool {
|
|
req := &hpcAC.StartInstanceServiceReq{
|
|
InstanceServiceId: id,
|
|
}
|
|
resp, err := s.aCRpc.StartInstanceService(ctx, req)
|
|
if err != nil || resp.Code != "0" {
|
|
return false
|
|
}
|
|
if resp.Data == id && resp.Code == "0" {
|
|
return true
|
|
}
|
|
return false
|
|
}
|
|
|
|
func (s *ShuguangAi) StopInferDeployInstance(ctx context.Context, id string) bool {
|
|
ids := []string{id}
|
|
req := &hpcAC.StopInstanceServiceReq{
|
|
Ids: ids,
|
|
}
|
|
resp, err := s.aCRpc.StopInstanceService(ctx, req)
|
|
if err != nil || resp.Code != "0" {
|
|
return false
|
|
}
|
|
if resp.Code == "0" {
|
|
return true
|
|
}
|
|
return false
|
|
}
|
|
|
|
func (s *ShuguangAi) GetInferDeployInstance(ctx context.Context, id string) (*inference.DeployInstance, error) {
|
|
ins := &inference.DeployInstance{}
|
|
req := &hpcAC.GetInstanceServiceDetailReq{
|
|
Id: id,
|
|
}
|
|
resp, err := s.aCRpc.GetInstanceServiceDetail(ctx, req)
|
|
if err != nil || resp.Code != "0" {
|
|
return nil, err
|
|
}
|
|
|
|
var url string
|
|
if resp.Data.Status == constants.Running {
|
|
url = resp.Data.ContainerPortInfoList[0].AccessUrl
|
|
}
|
|
|
|
var modelType string
|
|
var modelName string
|
|
var card string
|
|
|
|
if resp.Data.Description != "" {
|
|
str := strings.Split(resp.Data.Description, FORWARD_SLASH)
|
|
if len(str) == 3 {
|
|
modelType = str[0]
|
|
modelName = str[1]
|
|
card = str[2]
|
|
}
|
|
}
|
|
|
|
ins.InstanceName = resp.Data.InstanceServiceName
|
|
ins.InstanceId = resp.Data.Id
|
|
ins.ClusterName = s.platform
|
|
ins.Status = resp.Data.Status
|
|
ins.InferCard = DCU
|
|
ins.CreatedTime = resp.Data.CreateTime
|
|
ins.ClusterType = TYPE_SHUGUANGAI
|
|
ins.ModelType = modelType
|
|
ins.ModelName = modelName
|
|
ins.InferUrl = url
|
|
ins.InferCard = card
|
|
|
|
return ins, nil
|
|
}
|
|
|
|
func (s *ShuguangAi) GetImageInferResult(ctx context.Context, url string, file multipart.File, fileName string) (string, error) {
|
|
return "", nil
|
|
}
|
|
|
|
func (s *ShuguangAi) CreateInferDeployInstance(ctx context.Context, option *option.InferOption) (string, error) {
|
|
containerPortInfoList := []*hpcAC.ContainerPortInfoList{
|
|
{
|
|
ProtocolType: ProtocolType,
|
|
ContainerPort: ContainerPort,
|
|
},
|
|
}
|
|
|
|
desc := option.ModelType + FORWARD_SLASH + option.ModelName + FORWARD_SLASH + strings.ToLower(DCU)
|
|
instanceServiceName := "infer_instance" + UNDERSCORE + utils.TimeString()
|
|
resourceGroup := "kshdtest"
|
|
|
|
script, ok := ModelNameCmdMap[option.ModelName]
|
|
if !ok {
|
|
return "", errors.New("failed to set cmd, ModelName not exist")
|
|
}
|
|
|
|
param := &hpcAC.CreateParams{
|
|
AcceleratorType: strings.ToLower(DCU),
|
|
ContainerPortInfoList: containerPortInfoList,
|
|
CpuNumber: 8,
|
|
Description: desc,
|
|
//env
|
|
GpuNumber: 1,
|
|
ImagePath: "11.11.100.6:5000/dcu/admin/base/jupyterlab-pytorch:1.13.1-py3.7-dtk23.04-centos7.6",
|
|
InstanceServiceName: instanceServiceName,
|
|
MountInfoList: make([]*hpcAC.MountInfoList, 0),
|
|
//originalVersion
|
|
RamSize: 10 * RAM_SIZE_1G,
|
|
//rdma
|
|
ResourceGroup: resourceGroup,
|
|
StartScriptActionScope: "all",
|
|
StartScriptContent: script,
|
|
//startServiceCommand
|
|
//taskClassification: "interactive"
|
|
TaskNumber: 1,
|
|
TaskType: JUPYTER,
|
|
TimeoutLimit: "01:00:00",
|
|
UseStartScript: true,
|
|
//useStartServiceCommand: false
|
|
Version: "jupyterlab-pytorch:1.13.1-py3.7-dtk23.04-centos7.6",
|
|
}
|
|
|
|
req := &hpcacclient.CreateInstanceServiceReq{
|
|
Data: param,
|
|
}
|
|
|
|
resp, err := s.aCRpc.CreateInstanceService(ctx, req)
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
if resp.Code != "0" {
|
|
return "", errors.New(resp.Msg)
|
|
}
|
|
|
|
return resp.Data, nil
|
|
}
|
|
|
|
func (s *ShuguangAi) CheckModelExistence(ctx context.Context, name string, mtype string) bool {
|
|
modelPath := "model" + FORWARD_SLASH + name
|
|
req := &hpcAC.IsExistFileReq{
|
|
Path: KUNSHAN_DIR + FORWARD_SLASH + modelPath,
|
|
}
|
|
resp, err := s.aCRpc.IsExistFile(ctx, req)
|
|
if err != nil {
|
|
return false
|
|
}
|
|
|
|
if resp.Code != "0" || resp.Data == nil {
|
|
return false
|
|
}
|
|
|
|
return resp.Data.Exist
|
|
}
|