pcm-coordinator/internal/storeLink/openi.go

1122 lines
28 KiB
Go

package storeLink
import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"github.com/json-iterator/go"
"github.com/rs/zerolog/log"
openIcom "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/common"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/schedulers/option"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/collector"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/executor"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/inference"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/constants"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/utils"
"gitlink.org.cn/JointCloud/pcm-openi/common"
"gitlink.org.cn/JointCloud/pcm-openi/model"
"mime/multipart"
"net/http"
"net/url"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"
)
const (
DEBUG = "DEBUG"
TRAIN = "TRAIN"
INFERENCE = "INFERENCE"
C2NET = "C2Net"
TESTREPO = "testrepo"
ONLINEINFERENCE = "ONLINEINFERENCE" //online inference
)
const (
CreationRequirelUrl = "/api/v1/task/creationRequired"
TaskCreatelUrl = "/api/v1/task/create"
ReposUrl = "/api/v1/user/repos"
TaskListUrl = "/api/v1/task/list"
TaskDetailsUrl = "/api/v1/task/detail"
TaskLogUrl = "/api/v1/task/log"
TaskStopUrl = "/api/v1/task/stop"
TaskOnlineInferUrl = "/api/v1/task/onlineInferUrl"
)
// compute source
var (
ComputeSource = []string{"GPU", "NPU", "GCU", "MLU", "DCU", "CPU", "ILUVATAR-GPGPU", "METAX-GPGPU"}
)
type ResourceSpecOpenI struct {
ResType string
Name string
Number int64
}
type OpenI struct {
participantId int64
platform string
host string
userName string
accessToken string
}
func NewOpenI(host string, id int64, name string, token string, platform string) *OpenI {
return &OpenI{
host: host,
participantId: id,
userName: name,
accessToken: token,
platform: platform,
}
}
func (o *OpenI) Execute(ctx context.Context, option *option.AiOption, mode int) (interface{}, error) {
switch mode {
case executor.SUBMIT_MODE_JOINT_CLOUD:
case executor.SUBMIT_MODE_STORAGE_SCHEDULE:
var repoName string
codePaths := strings.SplitN(option.AlgorithmId, FORWARD_SLASH, 3)
if len(codePaths) != 3 {
return nil, fmt.Errorf("algorithmId %s format is incorrect", option.AlgorithmId)
}
repoName = codePaths[0]
spec := &ResourceSpecOpenI{}
for _, res := range option.ResourcesRequired {
typeName, ok := res["type"]
if !ok {
continue
}
name, ok := res["name"]
if !ok {
continue
}
for _, s := range ComputeSource {
switch typeName {
case s:
num, ok := res["number"]
if !ok {
continue
}
n := openIcom.ConvertTypeToString(num)
val, err := strconv.ParseInt(n, 10, 64)
if err != nil {
return nil, err
}
spec.ResType = s
spec.Name = openIcom.ConvertTypeToString(name)
spec.Number = val
break
}
}
}
if spec.ResType == "" || spec.Name == "" {
return nil, errors.New("resource spec not found")
}
creationRequirelUrl := o.host + CreationRequirelUrl
param := model.TaskCreationRequiredParam{
UserName: o.userName,
RepoName: repoName,
JobType: TRAIN,
ComputeSource: spec.ResType,
ClusterType: C2NET,
}
b, _ := json.Marshal(param)
byt := bytes.NewBuffer(b)
resp := struct {
Code int `json:"code"`
Msg string `json:"msg"`
Data model.TaskCreationRequired `json:"data"`
}{}
req := common.GetRestyRequest(common.TIMEOUT)
r, _ := http.NewRequest("GET", creationRequirelUrl, byt)
req.RawRequest = r
req.URL = creationRequirelUrl
_, err := req.
SetHeader("Content-Type", "application/json").
SetQueryParam(common.ACCESSTOKEN, o.accessToken).
SetBody(byt).
SetResult(&resp).
Send()
if err != nil {
return nil, errors.New("failed to invoke TaskCreationRequired; " + err.Error())
}
if len(resp.Data.Data.Specs.All) == 0 {
return nil, errors.New("TaskCreationRequired specs are empty")
}
for _, s := range resp.Data.Data.Specs.All {
if spec.ResType == s.ComputeResource && spec.Name == s.AccCardType {
if int(spec.Number) == s.AccCardsNum {
option.ResourceId = strconv.Itoa(s.Id) + FORWARD_SLASH + spec.ResType
break
}
}
}
if option.ResourceId == "" {
return nil, errors.New("can not find spec Id")
}
option.ComputeCard = spec.Name
}
task, err := o.SubmitTask(ctx, option.ImageId, option.Cmd, option.Envs, option.Params, option.ResourceId, option.DatasetsId, option.AlgorithmId, option.TaskType)
if err != nil {
return nil, err
}
return task, nil
}
func (o *OpenI) SubmitTask(ctx context.Context, imageId string, cmd string, envs []string, params []string, resourceId string, datasetsId string, algorithmId string, aiType string) (interface{}, error) {
taskCreatelUrl := o.host + TaskCreatelUrl
var repoName string
var branchName string
var bootFile string
codePaths := strings.SplitN(algorithmId, FORWARD_SLASH, 3)
if len(codePaths) != 3 {
return nil, fmt.Errorf("algorithmId %s format is incorrect", algorithmId)
}
specs := strings.Split(resourceId, FORWARD_SLASH)
specId, err := strconv.ParseInt(specs[0], 10, 0)
if err != nil {
return nil, err
}
computeSource := specs[1]
repoName = codePaths[0]
branchName = codePaths[1]
bootFile = codePaths[2]
// params
var parameters struct {
Parameter []struct {
Label string `json:"label"`
Value string `json:"value"`
} `json:"parameter"`
}
for _, param := range params {
s := strings.Split(param, COMMA)
st := struct {
Label string `json:"label"`
Value string `json:"value"`
}{
Label: s[0],
Value: s[1],
}
parameters.Parameter = append(parameters.Parameter, st)
}
paramStr, _ := json.Marshal(parameters)
// choose imageId and imageUrl
imgId, imgUrl, err := swapImageIdAndImageUrl(imageId)
if err != nil {
return nil, err
}
taskParam := &model.CreateTaskParam{
Description: algorithmId, // temporarily set reponame contained in the algorithmId to desc for missing taskdetail's reponame
JobType: TRAIN,
Cluster: C2NET,
DisplayJobName: TRAIN + UNDERSCORE + utils.RandomString(10),
ComputeSource: computeSource,
SpecId: int(specId),
BranchName: branchName,
ImageId: imgId,
ImageUrl: imgUrl,
DatasetUuidStr: datasetsId,
Params: string(paramStr),
BootFile: bootFile,
HasInternet: 2, // 0 不限制;1 不需要互联网;2 需要互联网
WorkServerNumber: 1, // 运行节点数
}
param := model.CreateTaskReq{
UserName: o.userName,
RepoName: repoName,
CreateTaskParam: taskParam,
}
resp := struct {
Code int `json:"code"`
Msg string `json:"msg"`
Data model.CreateTask `json:"data"`
}{}
req := common.GetRestyRequest(common.TIMEOUT)
_, err = req.
SetHeader("Content-Type", "application/json").
SetQueryParam(common.ACCESSTOKEN, o.accessToken).
SetBody(&param).
SetResult(&resp).
Post(taskCreatelUrl)
if err != nil {
return nil, err
}
if resp.Code != 200 {
return nil, errors.New(resp.Msg)
}
if resp.Data.Code != 0 {
return nil, errors.New(resp.Msg)
}
if (resp.Data == model.CreateTask{}) {
return nil, errors.New("failed to submit task, empty response")
}
return resp.Data, nil
}
func swapImageIdAndImageUrl(imageId string) (string, string, error) {
if imageId == "" {
return "", "", errors.New("imageId is empty")
}
var imgId string
var imgUrl string
parsedURL, err := url.Parse("http://" + imageId)
if err != nil {
return "", "", err
}
if utils.IsValidHostAddress(parsedURL.Host) {
imgId = ""
imgUrl = imageId
} else {
imgId = imageId
imgUrl = ""
}
return imgId, imgUrl, nil
}
func (o *OpenI) Stop(ctx context.Context, id string) error {
task, err := o.getTrainingTask(ctx, id)
if err != nil {
return err
}
codePaths := strings.SplitN(task.Data.Task.Description, FORWARD_SLASH, 3)
if len(codePaths) != 3 {
return errors.New("failed to stop, openI desc not set")
}
repoName := codePaths[0]
taskStopUrl := o.host + TaskStopUrl
param := model.StopTaskParam{
UserName: o.userName,
RepoName: repoName,
Id: id,
}
resp := struct {
Code int `json:"code"`
Msg string `json:"msg"`
Data model.StopTask `json:"data"`
}{}
req := common.GetRestyRequest(common.TIMEOUT)
_, err = req.
SetHeader("Content-Type", "application/json").
SetQueryParam(common.ACCESSTOKEN, o.accessToken).
SetBody(&param).
SetResult(&resp).
Post(taskStopUrl)
if err != nil {
return err
}
if resp.Code != http.StatusOK {
return errors.New("failed to stop")
}
return nil
}
func (o *OpenI) GetClusterInferUrl(ctx context.Context, option *option.InferOption) (*inference.ClusterInferUrl, error) {
return nil, errors.New("failed to implement")
}
func (o *OpenI) GetInferDeployInstanceList(ctx context.Context) ([]*inference.DeployInstance, error) {
return nil, errors.New("failed to implement")
}
func (o *OpenI) StartInferDeployInstance(ctx context.Context, id string) bool {
return false
}
func (o *OpenI) StopInferDeployInstance(ctx context.Context, id string) bool {
err := o.Stop(ctx, id)
if err != nil {
return false
}
return true
}
func (o *OpenI) GetInferDeployInstance(ctx context.Context, id string) (*inference.DeployInstance, error) {
task, err := o.getTrainingTask(ctx, id)
if err != nil {
return nil, err
}
description := task.Data.Task.Description
//从描述中解析出repoName
codePaths := strings.SplitN(description, FORWARD_SLASH, 3)
if len(codePaths) != 3 {
return nil, fmt.Errorf("algorithmId %s format is incorrect", description)
}
repoName := codePaths[0]
var resp inference.DeployInstance
resp.InstanceId = id
resp.InstanceName = task.Data.Task.DisplayJobName
resp.ModelName = task.Data.Task.PretrainModelName
resp.ModelType = ""
resp.InferCard = task.Data.Task.Spec.ComputeResource + "_" + task.Data.Task.Spec.AccCardType
resp.ClusterName = o.platform
resp.ClusterType = TYPE_OPENI
//获取在线推理url
var inferUrl string
if task.Data.Task.Status == "RUNNING" {
inferUrl, err = o.getOnlineInferUrl(ctx, id, repoName)
if err != nil {
return nil, err
}
}
resp.InferUrl = inferUrl
resp.Status = task.Data.Task.Status
resp.CreatedTime = time.Unix(int64(task.Data.Task.CreatedUnix), 0).Format(constants.Layout)
log.Debug().Msgf("func GetInferDeployInstance, resp: %v", resp)
return &resp, nil
}
func (o *OpenI) CreateInferDeployInstance(ctx context.Context, option *option.InferOption) (string, error) {
var repoName string
codePaths := strings.SplitN(option.AlgorithmId, FORWARD_SLASH, 3)
if len(codePaths) != 3 {
return "", fmt.Errorf("algorithmId %s format is incorrect", option.AlgorithmId)
}
repoName = codePaths[0]
spec := &ResourceSpecOpenI{}
for _, res := range option.ResourcesRequired {
typeName, ok := res["type"]
if !ok {
continue
}
name, ok := res["name"]
if !ok {
continue
}
for _, s := range ComputeSource {
switch typeName {
case s:
num, ok := res["number"]
if !ok {
continue
}
n := openIcom.ConvertTypeToString(num)
val, err := strconv.ParseInt(n, 10, 64)
if err != nil {
return "", err
}
spec.ResType = s
spec.Name = openIcom.ConvertTypeToString(name)
spec.Number = val
break
}
}
}
if spec.ResType == "" || spec.Name == "" {
return "", errors.New("resource spec not found")
}
creationRequirelUrl := o.host + CreationRequirelUrl
param := model.TaskCreationRequiredParam{
UserName: o.userName,
RepoName: repoName,
JobType: ONLINEINFERENCE,
ComputeSource: spec.ResType,
ClusterType: C2NET,
}
b, _ := json.Marshal(param)
byt := bytes.NewBuffer(b)
resp := struct {
Code int `json:"code"`
Msg string `json:"msg"`
Data model.TaskCreationRequired `json:"data"`
}{}
req := common.GetRestyRequest(common.TIMEOUT)
r, _ := http.NewRequest("GET", creationRequirelUrl, byt)
req.RawRequest = r
req.URL = creationRequirelUrl
_, err := req.
SetHeader("Content-Type", "application/json").
SetQueryParam(common.ACCESSTOKEN, o.accessToken).
SetBody(byt).
SetResult(&resp).
Send()
if err != nil {
return "", errors.New("failed to invoke TaskCreationRequired")
}
if len(resp.Data.Data.Specs.All) == 0 {
return "", errors.New("TaskCreationRequired specs are empty")
}
for _, s := range resp.Data.Data.Specs.All {
if spec.ResType == s.ComputeResource && spec.Name == s.AccCardType {
if int(spec.Number) == s.AccCardsNum {
option.ResourceId = strconv.Itoa(s.Id) + FORWARD_SLASH + spec.ResType
break
}
}
}
if option.ResourceId == "" {
return "", errors.New("can not find spec Id")
}
option.ComputeCard = spec.Name
task, err := o.SubmitInferTask(ctx, option.ImageId, option.Cmd, option.Envs, option.Params, option.ResourceId, option.AlgorithmId, option.ModelID)
if err != nil {
return "", err
}
ma, err := jsoniter.Marshal(task)
if err != nil {
return "", err
}
taskId := jsoniter.Get(ma, "data").Get("id").ToString()
return taskId, nil
}
func (o *OpenI) SubmitInferTask(ctx context.Context, imageId string, cmd string, envs []string, params []string, resourceId string, algorithmId string, modelId string) (interface{}, error) {
taskCreatelUrl := o.host + TaskCreatelUrl
var repoName string
var branchName string
var bootFile string
//从描述中解析出repoName
codePaths := strings.SplitN(algorithmId, FORWARD_SLASH, 3)
if len(codePaths) != 3 {
return nil, fmt.Errorf("algorithmId %s format is incorrect", algorithmId)
}
specs := strings.Split(resourceId, FORWARD_SLASH)
specId, err := strconv.ParseInt(specs[0], 10, 0)
if err != nil {
return nil, err
}
computeSource := specs[1]
repoName = codePaths[0]
branchName = codePaths[1]
bootFile = strings.Join(codePaths[2:], "/")
log.Printf("repoName: %s, branchName: %s, bootFile: %s", repoName, branchName, bootFile)
//params := "{\"parameter\":[{\"label\":\"a\",\"value\":\"1\"},{\"label\":\"b\",\"value\":\"2\"}]}"
// choose imageId and imageUrl
imgId, imgUrl, err := swapImageIdAndImageUrl(imageId)
if err != nil {
return nil, err
}
taskParam := &model.CreateTaskParam{
Description: algorithmId, // temporarily set reponame contained in the algorithmId to desc for missing taskdetail's reponame
JobType: ONLINEINFERENCE,
Cluster: C2NET,
DisplayJobName: ONLINEINFERENCE + UNDERSCORE + utils.RandomString(10),
ComputeSource: computeSource,
SpecId: int(specId),
BranchName: branchName,
ImageId: imgId,
ImageUrl: imgUrl,
PretrainModelIdStr: modelId,
BootFile: bootFile,
HasInternet: 2, // 0 不限制;1 不需要互联网;2 需要互联网
WorkServerNumber: 1, // 运行节点数
}
param := model.CreateTaskReq{
UserName: o.userName,
RepoName: repoName,
CreateTaskParam: taskParam,
}
resp := struct {
Code int `json:"code"`
Msg string `json:"msg"`
Data model.CreateTask `json:"data"`
}{}
req := common.GetRestyRequest(common.TIMEOUT)
_, err = req.
SetHeader("Content-Type", "application/json").
SetQueryParam(common.ACCESSTOKEN, o.accessToken).
SetBody(&param).
SetResult(&resp).
Post(taskCreatelUrl)
if err != nil {
return nil, err
}
if resp.Code != http.StatusOK {
return nil, errors.New(resp.Msg)
}
if resp.Data.Data.Id == 0 {
return nil, fmt.Errorf("failed to submit task, msg: [%s]", resp.Data.Msg)
}
return resp.Data, nil
}
func (o *OpenI) CheckModelExistence(ctx context.Context, modelName string, modelType string) bool {
return false
}
func (o *OpenI) GetImageInferResult(ctx context.Context, url string, file multipart.File, fileName string) (string, error) {
return "", errors.New("failed to implement")
}
func (o *OpenI) GetResourceStats(ctx context.Context) (*collector.ResourceStats, error) {
return nil, errors.New("failed to implement")
}
func (o *OpenI) GetDatasetsSpecs(ctx context.Context) ([]*collector.DatasetsSpecs, error) {
return nil, errors.New("failed to implement")
}
func (o *OpenI) GetAlgorithms(ctx context.Context) ([]*collector.Algorithm, error) {
return nil, errors.New("failed to implement")
}
func (o *OpenI) GetTrainingTaskLog(ctx context.Context, taskId string, instanceNum string) (string, error) {
task, err := o.getTrainingTask(ctx, taskId)
if err != nil {
return "", err
}
codePaths := strings.SplitN(task.Data.Task.Description, FORWARD_SLASH, 3)
if len(codePaths) != 3 {
return "", errors.New("failed to get log, openI desc not set")
}
repoName := codePaths[0]
tasklogurl := o.host + TaskLogUrl
param := model.GetLogParam{
UserName: o.userName,
RepoName: repoName,
Id: taskId,
}
b, _ := json.Marshal(param)
byt := bytes.NewBuffer(b)
resp := struct {
Code int `json:"code"`
Msg string `json:"msg"`
Data string `json:"data"`
}{}
req := common.GetRestyRequest(common.TIMEOUT)
r, _ := http.NewRequest("GET", tasklogurl, byt)
req.RawRequest = r
req.URL = tasklogurl
_, err = req.
SetHeader("Content-Type", "application/json").
SetQueryParam(common.ACCESSTOKEN, o.accessToken).
SetBody(byt).
SetResult(&resp).
Send()
if err != nil {
return "", err
}
if resp.Data == "" {
return "waiting for logs", nil
}
return resp.Data, nil
}
func (o *OpenI) getTrainingTask(ctx context.Context, taskId string) (*model.TaskDetail, error) {
taskDetailsUrl := o.host + TaskDetailsUrl
param := model.TaskDetailParam{
UserName: o.userName,
RepoName: TESTREPO,
Id: taskId,
}
b, _ := json.Marshal(param)
byt := bytes.NewBuffer(b)
resp := struct {
Code int `json:"code"`
Msg string `json:"msg"`
Data model.TaskDetail `json:"data"`
}{}
req := common.GetRestyRequest(common.TIMEOUT)
r, _ := http.NewRequest("GET", taskDetailsUrl, byt)
req.RawRequest = r
req.URL = taskDetailsUrl
_, err := req.
SetHeader("Content-Type", "application/json").
SetQueryParam(common.ACCESSTOKEN, o.accessToken).
SetBody(byt).
SetResult(&resp).
Send()
if err != nil {
return nil, errors.New("failed to invoke taskDetails")
}
if resp.Data.Code != 0 && resp.Data.Msg != "" {
return nil, errors.New(resp.Data.Msg)
}
return &resp.Data, nil
}
func (o *OpenI) GetTrainingTask(ctx context.Context, taskId string) (*collector.Task, error) {
task, err := o.getTrainingTask(ctx, taskId)
if err != nil {
return nil, err
}
var resp collector.Task
resp.Id = strconv.Itoa(task.Data.Task.Id)
if task.Data.Task.StartTime != 0 {
resp.Start = time.Unix(int64(task.Data.Task.StartTime), 0).Format(constants.Layout)
}
if task.Data.Task.EndTime != 0 {
resp.End = time.Unix(int64(task.Data.Task.EndTime), 0).Format(constants.Layout)
}
switch task.Data.Task.Status {
case "SUCCEEDED":
resp.Status = constants.Completed
case "FAILED":
resp.Status = constants.Failed
case "CREATED_FAILED":
resp.Status = constants.Failed
case "RUNNING":
resp.Status = constants.Running
case "STOPPED":
resp.Status = constants.Stopped
case "PENDING":
resp.Status = constants.Pending
case "WAITING":
resp.Status = constants.Waiting
default:
resp.Status = "undefined"
}
return &resp, nil
}
func (o *OpenI) DownloadAlgorithmCode(ctx context.Context, resourceType string, card string, taskType string, dataset string, algorithm string) (string, error) {
return "", errors.New("failed to implement")
}
func (o *OpenI) UploadAlgorithmCode(ctx context.Context, resourceType string, card string, taskType string, dataset string, algorithm string, code string) error {
return errors.New("failed to implement")
}
func (o *OpenI) GetComputeCards(ctx context.Context) ([]string, error) {
return nil, errors.New("failed to implement")
}
func (o *OpenI) GetUserBalance(ctx context.Context) (float64, error) {
return 0, errors.New("failed to implement")
}
func (o *OpenI) GetResourceSpecs(ctx context.Context, resrcType string) (*collector.ResourceSpec, error) {
var jobType string
if resrcType == "Inference" {
jobType = ONLINEINFERENCE
} else if resrcType == "Train" {
jobType = TRAIN
}
var resources []interface{}
res := &collector.ResourceSpec{
ClusterId: strconv.FormatInt(o.participantId, 10),
Tag: resrcType,
}
//clres := &collector.ClusterResource{}
creationRequirelUrl := o.host + CreationRequirelUrl
reposUrl := o.host + ReposUrl
taskListUrl := o.host + TaskListUrl
var wg sync.WaitGroup
var ch = make(chan *collector.ClusterResource)
var once sync.Once
wg.Add(2)
go o.genComputeResources(&wg, ch, &once, jobType, creationRequirelUrl)
go o.genRunningTaskNum(&wg, ch, reposUrl, taskListUrl)
go func() {
wg.Wait()
close(ch)
}()
for v := range ch {
resources = append(resources, v)
}
res.Resources = resources
return res, nil
}
func (o *OpenI) genComputeResources(wg *sync.WaitGroup, ch chan *collector.ClusterResource, once *sync.Once, jobType string, creationRequirelUrl string) {
defer wg.Done()
for c := range ComputeSource {
wg.Add(1)
i := c
go func() {
defer wg.Done()
param := model.TaskCreationRequiredParam{
UserName: o.userName,
RepoName: TESTREPO,
JobType: jobType,
ComputeSource: ComputeSource[i],
ClusterType: C2NET,
}
b, _ := json.Marshal(param)
byt := bytes.NewBuffer(b)
resp := struct {
Code int `json:"code"`
Msg string `json:"msg"`
Data model.TaskCreationRequired `json:"data"`
}{}
req := common.GetRestyRequest(common.TIMEOUT)
r, _ := http.NewRequest("GET", creationRequirelUrl, byt)
req.RawRequest = r
req.URL = creationRequirelUrl
_, err := req.
SetHeader("Content-Type", "application/json").
SetQueryParam(common.ACCESSTOKEN, o.accessToken).
SetBody(byt).
SetResult(&resp).
Send()
if err != nil {
return
}
if len(resp.Data.Data.Specs.All) == 0 {
return
}
// balance
var balanceCheck = func() {
balance := resp.Data.Data.PointAccount.Balance
bal := &collector.Usage{}
bal.Type = strings.ToUpper(BALANCE)
bal.Total = &collector.UnitValue{
Unit: POINT,
Value: balance,
}
ch <- &collector.ClusterResource{Resource: bal}
//rate
var v float64
v = 1
rate := &collector.Usage{
Type: strings.ToUpper(RATE),
Total: &collector.UnitValue{Unit: PERHOUR, Value: v},
}
ch <- &collector.ClusterResource{Resource: rate}
}
once.Do(balanceCheck)
m := make(map[string]struct {
Id int `json:"id"`
AccCardsNum int `json:"acc_cards_num"`
AccCardType string `json:"acc_card_type"`
CpuCores int `json:"cpu_cores"`
MemGiB int `json:"mem_gi_b"`
GpuMemGiB int `json:"gpu_mem_gi_b"`
ShareMemGiB int `json:"share_mem_gi_b"`
ComputeResource string `json:"compute_resource"`
UnitPrice int `json:"unit_price"`
SourceSpecId string `json:"source_spec_id"`
HasInternet int `json:"has_internet"`
EnableVisualization bool `json:"enable_visualization"`
})
for _, s := range resp.Data.Data.Specs.All {
e, ok := m[s.AccCardType]
if ok {
if s.AccCardsNum > e.AccCardsNum {
m[s.AccCardType] = s
}
} else {
m[s.AccCardType] = s
}
}
for k, v := range m {
bres := make([]*collector.Usage, 0)
cres := &collector.ClusterResource{}
card := &collector.Usage{
Type: ComputeSource[i],
Name: strings.ToUpper(k),
Total: &collector.UnitValue{Unit: NUMBER, Value: v.AccCardsNum},
Available: &collector.UnitValue{Unit: NUMBER, Value: v.AccCardsNum},
}
cpu := &collector.Usage{
Type: strings.ToUpper(CPU),
Name: strings.ToUpper(CPU),
Total: &collector.UnitValue{Unit: CPUCORE, Value: v.CpuCores},
Available: &collector.UnitValue{Unit: CPUCORE, Value: v.CpuCores},
}
mem := &collector.Usage{
Type: strings.ToUpper(MEMORY),
Name: strings.ToUpper(RAM),
Total: &collector.UnitValue{Unit: GIGABYTE, Value: v.MemGiB},
Available: &collector.UnitValue{Unit: GIGABYTE, Value: v.MemGiB},
}
vmem := &collector.Usage{
Type: strings.ToUpper(MEMORY),
Name: strings.ToUpper(VRAM),
Total: &collector.UnitValue{Unit: GIGABYTE, Value: v.GpuMemGiB},
Available: &collector.UnitValue{Unit: GIGABYTE, Value: v.GpuMemGiB},
}
//storage
var s float64
s = 1024
storage := &collector.Usage{}
storage.Type = STORAGE
storage.Name = DISK
storage.Total = &collector.UnitValue{
Unit: GIGABYTE,
Value: s,
}
storage.Available = &collector.UnitValue{
Unit: GIGABYTE,
Value: s,
}
bres = append(bres, storage)
bres = append(bres, cpu)
bres = append(bres, mem)
bres = append(bres, vmem)
cres.Resource = card
cres.BaseResources = bres
ch <- cres
}
}()
}
}
func (o *OpenI) genRunningTaskNum(wg *sync.WaitGroup, ch chan *collector.ClusterResource, reposUrl string, taskListUrl string) {
defer wg.Done()
reporesp := struct {
Code int `json:"code"`
Msg string `json:"msg"`
Data []model.Repo `json:"data"`
}{}
reporeq := common.GetRestyRequest(common.TIMEOUT)
repor, _ := http.NewRequest("GET", reposUrl, nil)
reporeq.RawRequest = repor
reporeq.URL = reposUrl
_, err := reporeq.
SetHeader("Content-Type", "application/json").
SetQueryParam(common.ACCESSTOKEN, o.accessToken).
SetResult(&reporesp).
Send()
if err != nil {
return
}
if len(reporesp.Data) == 0 {
return
}
// tasklist
var runningJobs atomic.Int64
var jwg sync.WaitGroup
var errs []error
var ech = make(chan error)
jwg.Add(1)
go func() {
defer jwg.Done()
for _, datum := range reporesp.Data {
jwg.Add(1)
dat := datum
go func() {
defer jwg.Done()
param := model.TaskListParam{
UserName: o.userName,
RepoName: dat.Name,
}
b, _ := json.Marshal(param)
byt := bytes.NewBuffer(b)
resp := struct {
Code int `json:"code"`
Msg string `json:"msg"`
Data model.TaskList `json:"data"`
}{}
req := common.GetRestyRequest(common.TIMEOUT)
r, _ := http.NewRequest("GET", taskListUrl, byt)
req.RawRequest = r
req.URL = taskListUrl
_, err := req.
SetHeader("Content-Type", "application/json").
SetQueryParam(common.ACCESSTOKEN, o.accessToken).
SetBody(byt).
SetResult(&resp).
Send()
if err != nil {
// assume occupied running tasks
ech <- err
return
}
if len(resp.Data.Data.Tasks) == 0 {
return
}
for _, task := range resp.Data.Data.Tasks {
if task.Task.Status == RUNNING {
runningJobs.Add(1)
}
}
}()
}
}()
go func() {
jwg.Wait()
close(ech)
}()
for v := range ech {
errs = append(errs, v)
}
// running tasks num
var runningNum int64
runningNum = runningJobs.Load()
run := &collector.Usage{}
run.Type = strings.ToUpper(RUNNINGTASK)
if len(errs) == 0 {
run.Total = &collector.UnitValue{
Unit: NUMBER,
Value: runningNum,
}
ch <- &collector.ClusterResource{Resource: run}
} else {
runningNum = int64(len(errs)) * 4
run.Total = &collector.UnitValue{
Unit: NUMBER,
Value: runningNum,
}
ch <- &collector.ClusterResource{Resource: run}
}
}
func (o *OpenI) getOnlineInferUrl(ctx context.Context, taskId string, repoName string) (string, error) {
taskDetailsUrl := o.host + TaskOnlineInferUrl
param := model.TaskDetailParam{
UserName: o.userName,
RepoName: repoName,
Id: taskId,
}
b, _ := json.Marshal(param)
byt := bytes.NewBuffer(b)
resp := model.SelfEndpointUrlResp{}
req := common.GetRestyRequest(common.TIMEOUT)
r, _ := http.NewRequest("GET", taskDetailsUrl, byt)
req.RawRequest = r
req.URL = taskDetailsUrl
_, err := req.
SetHeader("Content-Type", "application/json").
SetQueryParam(common.ACCESSTOKEN, o.accessToken).
SetBody(byt).
SetResult(&resp).
Send()
if err != nil {
return "", err
}
if resp.Code != http.StatusOK {
return "", errors.New(resp.Msg)
}
return resp.Data.Url, nil
}