forked from JointCloud/pcm-coordinator
787 lines
20 KiB
Go
787 lines
20 KiB
Go
package octopusHttp
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"encoding/json"
|
|
"errors"
|
|
"fmt"
|
|
"github.com/zeromicro/go-zero/core/logx"
|
|
common2 "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/common"
|
|
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/entity"
|
|
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/schedulers/option"
|
|
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/collector"
|
|
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/executor"
|
|
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/inference"
|
|
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/constants"
|
|
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/utils"
|
|
omodel "gitlink.org.cn/JointCloud/pcm-octopus/http/model"
|
|
"gitlink.org.cn/JointCloud/pcm-openi/common"
|
|
"mime/multipart"
|
|
"net/http"
|
|
"strconv"
|
|
"strings"
|
|
"time"
|
|
)
|
|
|
|
const (
|
|
RESOURCE_POOL = "grampus-pool"
|
|
Param_Token = "token"
|
|
Param_Addr = "addr"
|
|
Forward_Slash = "/"
|
|
COMMA = ","
|
|
UNDERSCORE = "_"
|
|
TASK_NAME_PREFIX = "trainJob"
|
|
Python = "python "
|
|
SemiColon = ";"
|
|
BALANCE = "balance"
|
|
RATE = "rate"
|
|
PERHOUR = "per-hour"
|
|
NUMBER = "number"
|
|
KILOBYTE = "kb"
|
|
GIGABYTE = "gb"
|
|
CPUCORE = "core"
|
|
STORAGE = "STORAGE"
|
|
DISK = "disk"
|
|
MEMORY = "memory"
|
|
RAM = "ram"
|
|
VRAM = "vram"
|
|
RMB = "rmb"
|
|
POINT = "point"
|
|
RUNNINGTASK = "RUNNING_TASK"
|
|
RUNNING = "RUNNING"
|
|
CPU = "cpu"
|
|
Gi = "Gi"
|
|
)
|
|
|
|
const (
|
|
NotImplementError = "not implemented"
|
|
)
|
|
|
|
const (
|
|
MyAlgorithmListUrl = "api/v1/algorithm/myAlgorithmList"
|
|
ResourcespecsUrl = "api/v1/resource/specs"
|
|
CreateTrainJobUrl = "api/v1/job/create"
|
|
TrainJobDetail = "api/v1/job/detail"
|
|
TrainJobLog = "api/v1/job/log"
|
|
)
|
|
|
|
// compute source
|
|
var (
|
|
ComputeSourceToCardType = map[string]string{
|
|
"nvidia-a100": "GPU",
|
|
"nvidia-a100-80g": "GPU",
|
|
"mr-v100": "ILUVATAR-GPGPU",
|
|
"bi-v100": "ILUVATAR-GPGPU",
|
|
"MR-V50": "ILUVATAR-GPGPU",
|
|
"BI-V100": "ILUVATAR-GPGPU",
|
|
"BI-V150": "ILUVATAR-GPGPU",
|
|
"MR-V100": "ILUVATAR-GPGPU",
|
|
|
|
"cambricon.com/mlu": "MLU",
|
|
"hygon.com/dcu": "DCU",
|
|
|
|
"huawei.com/Ascend910": "NPU",
|
|
"enflame.com/gcu": "GCU",
|
|
"ILUVATAR-GPGPU": "ILUVATAR-GPGPU",
|
|
"MXN260": "METAX-GPGPU",
|
|
}
|
|
)
|
|
|
|
type OctopusHttp struct {
|
|
server string
|
|
host string
|
|
platform string
|
|
participantId int64
|
|
token *Token
|
|
}
|
|
|
|
func NewOctopusHttp(id int64, name, server, host string, user string, pwd string) *OctopusHttp {
|
|
token, err := NewToken(server, host, user, pwd)
|
|
if err != nil {
|
|
logx.Infof("Init OctopusHttp, id: %d, host: %s, token error: %s \n", id, host, err)
|
|
}
|
|
return &OctopusHttp{platform: name, participantId: id, server: server, host: host, token: token}
|
|
}
|
|
|
|
// executor
|
|
func (o *OctopusHttp) Execute(ctx context.Context, option *option.AiOption, mode int) (interface{}, error) {
|
|
switch mode {
|
|
case executor.SUBMIT_MODE_JOINT_CLOUD:
|
|
|
|
case executor.SUBMIT_MODE_STORAGE_SCHEDULE:
|
|
// cmd
|
|
if option.AlgorithmId == "" {
|
|
return nil, errors.New("algorithmId is empty")
|
|
}
|
|
if option.Cmd != "" {
|
|
option.Cmd = option.Cmd + SemiColon + Python + option.AlgorithmId
|
|
} else {
|
|
option.Cmd = Python + option.AlgorithmId
|
|
}
|
|
|
|
option.ResourceId = "964fdee2db544928bfea74dac12a924f"
|
|
task, err := o.SubmitTask(ctx, option.ImageId, option.Cmd, option.Envs, option.Params, option.ResourceId, option.DatasetsId, option.AlgorithmId, option.TaskType)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return task, nil
|
|
}
|
|
return nil, nil
|
|
}
|
|
|
|
func (o *OctopusHttp) Stop(ctx context.Context, id string) error {
|
|
return nil
|
|
}
|
|
|
|
func (o *OctopusHttp) SubmitTask(ctx context.Context, imageId string, cmd string, envs []string, params []string, resourceId string, datasetsId string, algorithmId string, aiType string) (interface{}, error) {
|
|
// octopus提交任务
|
|
reqUrl := o.server + CreateTrainJobUrl
|
|
|
|
token, err := o.token.Get()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// python参数
|
|
var prms []struct {
|
|
Key string `json:"key"`
|
|
Value string `json:"value"`
|
|
}
|
|
for _, param := range params {
|
|
var p struct {
|
|
Key string `json:"key"`
|
|
Value string `json:"value"`
|
|
}
|
|
s := strings.Split(param, COMMA)
|
|
p.Key = s[0]
|
|
p.Value = s[1]
|
|
prms = append(prms, p)
|
|
}
|
|
|
|
//环境变量
|
|
envMap := make(map[string]string)
|
|
for _, env := range envs {
|
|
s := strings.Split(env, COMMA)
|
|
envMap[s[0]] = s[1]
|
|
}
|
|
|
|
param := &omodel.CreateTrainJobParam{
|
|
//DataSetId: datasetsId,
|
|
//DataSetVersion: VERSION,
|
|
//AlgorithmId: algorithmId,
|
|
//AlgorithmVersion: VERSION,
|
|
Name: TASK_NAME_PREFIX + UNDERSCORE + utils.RandomString(10),
|
|
ImageId: imageId,
|
|
IsDistributed: false,
|
|
ResourcePool: RESOURCE_POOL,
|
|
Config: []*omodel.CreateTrainJobConf{
|
|
{
|
|
Command: cmd,
|
|
ResourceSpecId: resourceId,
|
|
MinFailedTaskCount: 1,
|
|
MinSucceededTaskCount: 1,
|
|
TaskNumber: 1,
|
|
//Parameters: prms,
|
|
Envs: envMap,
|
|
},
|
|
},
|
|
}
|
|
|
|
resp := &entity.OctResp{}
|
|
|
|
req := common.GetRestyRequest(common.TIMEOUT)
|
|
_, err = req.
|
|
SetHeader("Authorization", "Bearer "+token).
|
|
SetQueryString("token=" + token).
|
|
SetQueryString("addr=" + o.host).
|
|
SetBody(param).
|
|
SetResult(resp).
|
|
Post(reqUrl)
|
|
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return resp, nil
|
|
|
|
}
|
|
|
|
// collector
|
|
func (o *OctopusHttp) resourceSpecs(ctx context.Context) (*entity.OctResp, error) {
|
|
resourcespecsUrl := o.server + ResourcespecsUrl
|
|
token, err := o.token.Get()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
param := omodel.ResourceSpecParam{
|
|
ResourcePool: RESOURCE_POOL,
|
|
}
|
|
|
|
b, _ := json.Marshal(param)
|
|
byt := bytes.NewBuffer(b)
|
|
|
|
resp := &entity.OctResp{}
|
|
|
|
req := common.GetRestyRequest(common.TIMEOUT)
|
|
r, _ := http.NewRequest("GET", resourcespecsUrl, byt)
|
|
req.RawRequest = r
|
|
req.URL = resourcespecsUrl
|
|
|
|
_, err = req.
|
|
SetHeader("Content-Type", "application/json").
|
|
SetQueryParam(Param_Token, token).
|
|
SetQueryParam(Param_Addr, o.host).
|
|
SetBody(byt).
|
|
SetResult(resp).
|
|
Send()
|
|
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return resp, nil
|
|
}
|
|
|
|
func (o *OctopusHttp) GetResourceStats(ctx context.Context) (*collector.ResourceStats, error) {
|
|
resp, err := o.resourceSpecs(ctx)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if resp.Code != http.StatusOK {
|
|
if resp.Data != nil {
|
|
marshal, err := json.Marshal(resp.Data)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
errormdl := &omodel.Error{}
|
|
err = json.Unmarshal(marshal, errormdl)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return nil, errors.New(errormdl.Message)
|
|
}
|
|
} else {
|
|
if resp.Data != nil {
|
|
spec := &entity.OctResourceSpecs{}
|
|
marshal, err := json.Marshal(resp.Data)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
err = json.Unmarshal(marshal, spec)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
}
|
|
|
|
return nil, nil
|
|
}
|
|
|
|
func (o *OctopusHttp) GetDatasetsSpecs(ctx context.Context) ([]*collector.DatasetsSpecs, error) {
|
|
return nil, nil
|
|
}
|
|
|
|
func (o *OctopusHttp) GetAlgorithms(ctx context.Context) ([]*collector.Algorithm, error) {
|
|
return nil, errors.New(NotImplementError)
|
|
}
|
|
|
|
func (o *OctopusHttp) GetTrainingTaskLog(ctx context.Context, taskId string, instanceNum string) (string, error) {
|
|
taskDetailsUrl := o.server + TrainJobLog
|
|
token, err := o.token.Get()
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
|
|
param := omodel.TrainJobLog{
|
|
JobId: taskId,
|
|
}
|
|
|
|
b, _ := json.Marshal(param)
|
|
byt := bytes.NewBuffer(b)
|
|
|
|
resp := &entity.OctResp{}
|
|
|
|
req := common.GetRestyRequest(common.TIMEOUT)
|
|
r, _ := http.NewRequest("GET", taskDetailsUrl, byt)
|
|
req.RawRequest = r
|
|
req.URL = taskDetailsUrl
|
|
|
|
_, err = req.
|
|
SetHeader("Content-Type", "application/json").
|
|
SetQueryParam(Param_Token, token).
|
|
SetQueryParam(Param_Addr, o.host).
|
|
SetBody(byt).
|
|
SetResult(resp).
|
|
Send()
|
|
|
|
if err != nil {
|
|
return "", errors.New("failed to invoke taskDetails")
|
|
}
|
|
|
|
if resp.Code != http.StatusOK {
|
|
return "", errors.New("failed to invoke taskDetails")
|
|
}
|
|
|
|
var log string
|
|
marshal, err := json.Marshal(resp.Data)
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
log = string(marshal)
|
|
|
|
if strings.Contains(log, "404 Not Found") || log == "" {
|
|
log = "waiting for logs..."
|
|
}
|
|
|
|
return log, nil
|
|
}
|
|
|
|
func (o *OctopusHttp) GetTrainingTask(ctx context.Context, taskId string) (*collector.Task, error) {
|
|
if taskId == "" {
|
|
return nil, errors.New("empty taskId")
|
|
}
|
|
|
|
resp, err := o.getTrainingTask(ctx, taskId)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if resp.Code != http.StatusOK {
|
|
if resp.Data != nil {
|
|
marshal, err := json.Marshal(resp.Data)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
errormdl := &omodel.Error{}
|
|
err = json.Unmarshal(marshal, errormdl)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return nil, errors.New(errormdl.Message)
|
|
}
|
|
} else {
|
|
if resp.Data != nil {
|
|
job := &entity.OctTrainJob{}
|
|
marshal, err := json.Marshal(resp.Data)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
err = json.Unmarshal(marshal, job)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
var task collector.Task
|
|
task.Id = job.TrainJob.Id
|
|
if job.TrainJob.StartedAt != 0 {
|
|
task.Start = time.Unix(int64(job.TrainJob.StartedAt), 0).Format(constants.Layout)
|
|
}
|
|
if job.TrainJob.CompletedAt != 0 {
|
|
task.End = time.Unix(int64(job.TrainJob.CompletedAt), 0).Format(constants.Layout)
|
|
}
|
|
switch job.TrainJob.Status {
|
|
case "succeeded":
|
|
task.Status = constants.Completed
|
|
case "failed":
|
|
task.Status = constants.Failed
|
|
case "running":
|
|
task.Status = constants.Running
|
|
case "stopped":
|
|
task.Status = constants.Stopped
|
|
case "pending":
|
|
task.Status = constants.Pending
|
|
default:
|
|
task.Status = "undefined"
|
|
}
|
|
|
|
return &task, nil
|
|
}
|
|
}
|
|
return nil, errors.New("failed to get trainjob")
|
|
}
|
|
|
|
func (o *OctopusHttp) getTrainingTask(ctx context.Context, taskId string) (*entity.OctResp, error) {
|
|
taskDetailsUrl := o.server + TrainJobDetail
|
|
token, err := o.token.Get()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
param := omodel.TrainJobDetailParam{
|
|
JobId: taskId,
|
|
}
|
|
|
|
b, _ := json.Marshal(param)
|
|
byt := bytes.NewBuffer(b)
|
|
|
|
resp := &entity.OctResp{}
|
|
|
|
req := common.GetRestyRequest(common.TIMEOUT)
|
|
r, _ := http.NewRequest("GET", taskDetailsUrl, byt)
|
|
req.RawRequest = r
|
|
req.URL = taskDetailsUrl
|
|
|
|
_, err = req.
|
|
SetHeader("Content-Type", "application/json").
|
|
SetQueryParam(Param_Token, token).
|
|
SetQueryParam(Param_Addr, o.host).
|
|
SetBody(byt).
|
|
SetResult(resp).
|
|
Send()
|
|
|
|
if err != nil {
|
|
return nil, errors.New("failed to invoke taskDetails")
|
|
}
|
|
|
|
return resp, nil
|
|
}
|
|
|
|
func (o *OctopusHttp) DownloadAlgorithmCode(ctx context.Context, resourceType string, card string, taskType string, dataset string, algorithm string) (string, error) {
|
|
return "", errors.New(NotImplementError)
|
|
}
|
|
|
|
func (o *OctopusHttp) UploadAlgorithmCode(ctx context.Context, resourceType string, card string, taskType string, dataset string, algorithm string, code string) error {
|
|
return nil
|
|
}
|
|
|
|
func (o OctopusHttp) GetComputeCards(ctx context.Context) ([]string, error) {
|
|
return nil, errors.New(NotImplementError)
|
|
}
|
|
|
|
func (o *OctopusHttp) GetUserBalance(ctx context.Context) (float64, error) {
|
|
return 0, errors.New(NotImplementError)
|
|
}
|
|
|
|
func (o *OctopusHttp) GetResourceSpecs(ctx context.Context, resrcType string) (*collector.ResourceSpec, error) {
|
|
resp, err := o.resourceSpecs(ctx)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
res := &collector.ResourceSpec{
|
|
ClusterId: strconv.FormatInt(o.participantId, 10),
|
|
Tag: resrcType,
|
|
}
|
|
|
|
if resp.Code != http.StatusOK {
|
|
if resp.Data != nil {
|
|
marshal, err := json.Marshal(resp.Data)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
errormdl := &omodel.Error{}
|
|
err = json.Unmarshal(marshal, errormdl)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return nil, errors.New(errormdl.Message)
|
|
}
|
|
} else {
|
|
if resp.Data != nil {
|
|
specs := &entity.OctResourceSpecs{}
|
|
marshal, err := json.Marshal(resp.Data)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
err = json.Unmarshal(marshal, specs)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
clusterResources, err := genSpecs(specs, resrcType)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
res.Resources = clusterResources
|
|
}
|
|
}
|
|
|
|
return res, nil
|
|
}
|
|
|
|
func genSpecs(specs *entity.OctResourceSpecs, resrcType string) ([]interface{}, error) {
|
|
res := make([]interface{}, 0)
|
|
if resrcType == "Inference" {
|
|
return res, nil
|
|
} else if resrcType == "Train" {
|
|
if specs.MapResourceSpecIdList.Train.ResourceSpecs == nil {
|
|
return res, nil
|
|
} else {
|
|
for _, s := range specs.MapResourceSpecIdList.Train.ResourceSpecs {
|
|
spec := &omodel.Spec{}
|
|
marshal, err := json.Marshal(s)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
err = json.Unmarshal(marshal, spec)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
resType, err := chooseResourceType(spec)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if resType == nil {
|
|
continue
|
|
}
|
|
res = append(res, resType)
|
|
}
|
|
}
|
|
}
|
|
|
|
return res, nil
|
|
}
|
|
|
|
func chooseResourceType(spec *omodel.Spec) (*collector.ClusterResource, error) {
|
|
if spec.ResourceQuantity.NvidiaA100 != "" {
|
|
tag, err := common2.GetJSONTag(spec.ResourceQuantity, "NvidiaA100")
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
cres, err := genClusterResources(tag, spec.ResourceQuantity.NvidiaA100, spec)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return cres, nil
|
|
} else if spec.ResourceQuantity.NvidiaA10080G != "" {
|
|
tag, err := common2.GetJSONTag(spec.ResourceQuantity, "NvidiaA10080G")
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
cres, err := genClusterResources(tag, spec.ResourceQuantity.NvidiaA10080G, spec)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return cres, nil
|
|
} else if spec.ResourceQuantity.MrV100 != "" {
|
|
tag, err := common2.GetJSONTag(spec.ResourceQuantity, "MrV100")
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
cres, err := genClusterResources(tag, spec.ResourceQuantity.MrV100, spec)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return cres, nil
|
|
} else if spec.ResourceQuantity.BiV100 != "" {
|
|
tag, err := common2.GetJSONTag(spec.ResourceQuantity, "BiV100")
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
cres, err := genClusterResources(tag, spec.ResourceQuantity.BiV100, spec)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return cres, nil
|
|
} else if spec.ResourceQuantity.MRV50 != "" {
|
|
tag, err := common2.GetJSONTag(spec.ResourceQuantity, "MRV50")
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
cres, err := genClusterResources(tag, spec.ResourceQuantity.MRV50, spec)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return cres, nil
|
|
} else if spec.ResourceQuantity.BIV100 != "" {
|
|
tag, err := common2.GetJSONTag(spec.ResourceQuantity, "NvidiaA100")
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
cres, err := genClusterResources(tag, spec.ResourceQuantity.NvidiaA100, spec)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return cres, nil
|
|
} else if spec.ResourceQuantity.BIV150 != "" {
|
|
tag, err := common2.GetJSONTag(spec.ResourceQuantity, "BIV150")
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
cres, err := genClusterResources(tag, spec.ResourceQuantity.BIV150, spec)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return cres, nil
|
|
} else if spec.ResourceQuantity.MRV100 != "" {
|
|
tag, err := common2.GetJSONTag(spec.ResourceQuantity, "MRV100")
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
cres, err := genClusterResources(tag, spec.ResourceQuantity.MRV100, spec)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return cres, nil
|
|
} else if spec.ResourceQuantity.CambriconComMlu != "" {
|
|
tag, err := common2.GetJSONTag(spec.ResourceQuantity, "CambriconComMlu")
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
cres, err := genClusterResources(tag, spec.ResourceQuantity.CambriconComMlu, spec)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return cres, nil
|
|
} else if spec.ResourceQuantity.HygonComDcu != "" {
|
|
tag, err := common2.GetJSONTag(spec.ResourceQuantity, "HygonComDcu")
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
cres, err := genClusterResources(tag, spec.ResourceQuantity.HygonComDcu, spec)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return cres, nil
|
|
} else if spec.ResourceQuantity.HuaweiComAscend910 != "" {
|
|
tag, err := common2.GetJSONTag(spec.ResourceQuantity, "HuaweiComAscend910")
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
cres, err := genClusterResources(tag, spec.ResourceQuantity.HuaweiComAscend910, spec)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return cres, nil
|
|
} else if spec.ResourceQuantity.EnflameComGcu != "" {
|
|
tag, err := common2.GetJSONTag(spec.ResourceQuantity, "EnflameComGcu")
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
cres, err := genClusterResources(tag, spec.ResourceQuantity.EnflameComGcu, spec)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return cres, nil
|
|
} else if spec.ResourceQuantity.MXN260 != "" {
|
|
tag, err := common2.GetJSONTag(spec.ResourceQuantity, "MXN260")
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
cres, err := genClusterResources(tag, spec.ResourceQuantity.MXN260, spec)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return cres, nil
|
|
} else if spec.ResourceQuantity.NvidiaV100 != "" {
|
|
tag, err := common2.GetJSONTag(spec.ResourceQuantity, "NvidiaV100")
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
cres, err := genClusterResources(tag, spec.ResourceQuantity.NvidiaV100, spec)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return cres, nil
|
|
} else if spec.ResourceQuantity.MetaxTechComGpu != "" {
|
|
tag, err := common2.GetJSONTag(spec.ResourceQuantity, "MetaxTechComGpu")
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
cres, err := genClusterResources(tag, spec.ResourceQuantity.MetaxTechComGpu, spec)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return cres, nil
|
|
}
|
|
|
|
return nil, nil
|
|
}
|
|
|
|
func genClusterResources(cType string, cNum string, s *omodel.Spec) (*collector.ClusterResource, error) {
|
|
cres := &collector.ClusterResource{}
|
|
bres := make([]*collector.Usage, 0)
|
|
|
|
var cardNum int64
|
|
var cpuCore int64
|
|
var memGi int64
|
|
|
|
cardNum, err := strconv.ParseInt(cNum, 10, 64)
|
|
if err != nil {
|
|
cardNum = 0
|
|
}
|
|
cpuCore, err = strconv.ParseInt(s.ResourceQuantity.Cpu, 10, 64)
|
|
if err != nil {
|
|
cpuCore = 0
|
|
}
|
|
|
|
if s.ResourceQuantity.Memory != "" {
|
|
gi := strings.Split(s.ResourceQuantity.Memory, Gi)
|
|
if len(gi) != 2 {
|
|
return nil, fmt.Errorf("s.ResourceQuantity.Memory convert error: %s", s.ResourceQuantity.Memory)
|
|
}
|
|
|
|
mGi, err := strconv.ParseInt(gi[0], 10, 64)
|
|
if err != nil {
|
|
memGi = 0
|
|
} else {
|
|
memGi = mGi
|
|
}
|
|
} else {
|
|
memGi = 0
|
|
}
|
|
|
|
card := &collector.Usage{
|
|
Type: ComputeSourceToCardType[cType],
|
|
Name: strings.ToUpper(cType),
|
|
Total: &collector.UnitValue{Unit: NUMBER, Value: cardNum},
|
|
Available: &collector.UnitValue{Unit: NUMBER, Value: cardNum},
|
|
}
|
|
cpu := &collector.Usage{
|
|
Type: strings.ToUpper(CPU),
|
|
Name: strings.ToUpper(CPU),
|
|
Total: &collector.UnitValue{Unit: CPUCORE, Value: cpuCore},
|
|
Available: &collector.UnitValue{Unit: CPUCORE, Value: cpuCore},
|
|
}
|
|
mem := &collector.Usage{
|
|
Type: strings.ToUpper(MEMORY),
|
|
Name: strings.ToUpper(RAM),
|
|
Total: &collector.UnitValue{Unit: GIGABYTE, Value: memGi},
|
|
Available: &collector.UnitValue{Unit: GIGABYTE, Value: memGi},
|
|
}
|
|
|
|
bres = append(bres, cpu)
|
|
bres = append(bres, mem)
|
|
|
|
cres.Resource = card
|
|
cres.BaseResources = bres
|
|
|
|
return cres, nil
|
|
}
|
|
|
|
// inference
|
|
func (o *OctopusHttp) GetClusterInferUrl(ctx context.Context, option *option.InferOption) (*inference.ClusterInferUrl, error) {
|
|
return nil, errors.New(NotImplementError)
|
|
}
|
|
|
|
func (o *OctopusHttp) GetInferDeployInstanceList(ctx context.Context) ([]*inference.DeployInstance, error) {
|
|
return nil, errors.New(NotImplementError)
|
|
}
|
|
|
|
func (o *OctopusHttp) StartInferDeployInstance(ctx context.Context, id string) bool {
|
|
return false
|
|
}
|
|
|
|
func (o *OctopusHttp) StopInferDeployInstance(ctx context.Context, id string) bool {
|
|
return false
|
|
}
|
|
|
|
func (o *OctopusHttp) GetInferDeployInstance(ctx context.Context, id string) (*inference.DeployInstance, error) {
|
|
return nil, errors.New(NotImplementError)
|
|
}
|
|
|
|
func (o *OctopusHttp) CreateInferDeployInstance(ctx context.Context, option *option.InferOption) (string, error) {
|
|
return "", errors.New(NotImplementError)
|
|
}
|
|
|
|
func (o *OctopusHttp) CheckModelExistence(ctx context.Context, modelName string, modelType string) bool {
|
|
return false
|
|
}
|
|
|
|
func (o *OctopusHttp) GetImageInferResult(ctx context.Context, url string, file multipart.File, fileName string) (string, error) {
|
|
return "", errors.New(NotImplementError)
|
|
}
|