pcm-coordinator/internal/storeLink/openi.go

372 lines
9.1 KiB
Go

package storeLink
import (
"bytes"
"context"
"encoding/json"
"errors"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/schedulers/option"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/collector"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/inference"
"gitlink.org.cn/JointCloud/pcm-openi/common"
"gitlink.org.cn/JointCloud/pcm-openi/model"
"mime/multipart"
"net/http"
"strconv"
"strings"
"sync"
"sync/atomic"
)
const (
DEBUG = "DEBUG"
TRAIN = "TRAIN"
INFERENCE = "INFERENCE"
C2NET = "C2Net"
TESTREPO = "testrepo"
)
// compute source
var (
ComputeSource = []string{"GPU", "NPU", "GCU", "MLU", "DCU", "CPU", "ILUVATAR-GPGPU", "METAX-GPGPU"}
)
type OpenI struct {
participantId int64
host string
userName string
accessToken string
}
func NewOpenI(host string, id int64, name string, token string) *OpenI {
return &OpenI{
host: host,
participantId: id,
userName: name,
accessToken: token,
}
}
func (o OpenI) Execute(ctx context.Context, option *option.AiOption) (interface{}, error) {
return nil, errors.New("failed to implement")
}
func (o OpenI) GetClusterInferUrl(ctx context.Context, option *option.InferOption) (*inference.ClusterInferUrl, error) {
return nil, errors.New("failed to implement")
}
func (o OpenI) GetInferDeployInstanceList(ctx context.Context) ([]*inference.DeployInstance, error) {
return nil, errors.New("failed to implement")
}
func (o OpenI) StartInferDeployInstance(ctx context.Context, id string) bool {
return false
}
func (o OpenI) StopInferDeployInstance(ctx context.Context, id string) bool {
return false
}
func (o OpenI) GetInferDeployInstance(ctx context.Context, id string) (*inference.DeployInstance, error) {
return nil, errors.New("failed to implement")
}
func (o OpenI) CreateInferDeployInstance(ctx context.Context, option *option.InferOption) (string, error) {
return "", errors.New("failed to implement")
}
func (o OpenI) CheckModelExistence(ctx context.Context, modelName string, modelType string) bool {
return false
}
func (o OpenI) GetImageInferResult(ctx context.Context, url string, file multipart.File, fileName string) (string, error) {
return "", errors.New("failed to implement")
}
func (o OpenI) GetResourceStats(ctx context.Context) (*collector.ResourceStats, error) {
return nil, errors.New("failed to implement")
}
func (o OpenI) GetDatasetsSpecs(ctx context.Context) ([]*collector.DatasetsSpecs, error) {
return nil, errors.New("failed to implement")
}
func (o OpenI) GetAlgorithms(ctx context.Context) ([]*collector.Algorithm, error) {
return nil, errors.New("failed to implement")
}
func (o OpenI) GetTrainingTaskLog(ctx context.Context, taskId string, instanceNum string) (string, error) {
return "", errors.New("failed to implement")
}
func (o OpenI) GetTrainingTask(ctx context.Context, taskId string) (*collector.Task, error) {
return nil, errors.New("failed to implement")
}
func (o OpenI) DownloadAlgorithmCode(ctx context.Context, resourceType string, card string, taskType string, dataset string, algorithm string) (string, error) {
return "", errors.New("failed to implement")
}
func (o OpenI) UploadAlgorithmCode(ctx context.Context, resourceType string, card string, taskType string, dataset string, algorithm string, code string) error {
return errors.New("failed to implement")
}
func (o OpenI) GetComputeCards(ctx context.Context) ([]string, error) {
return nil, errors.New("failed to implement")
}
func (o OpenI) GetUserBalance(ctx context.Context) (float64, error) {
return 0, errors.New("failed to implement")
}
func (o OpenI) GetResourceSpecs(ctx context.Context) (*collector.ResourceSpec, error) {
var resources []interface{}
res := &collector.ResourceSpec{
ClusterId: strconv.FormatInt(o.participantId, 10),
}
creationRequirelUrl := o.host + "/api/v1/task/creationRequired"
reposUrl := o.host + "/api/v1/user/repos"
taskListUrl := o.host + "/api/v1/task/list"
//taskDetailsUrl := o.host + "/api/v1/task/detail"
var wg sync.WaitGroup
var ch = make(chan *collector.Usage)
var once sync.Once
wg.Add(3)
go func() {
defer wg.Done()
for c := range ComputeSource {
wg.Add(1)
i := c
go func() {
defer wg.Done()
param := model.TaskCreationRequiredParam{
UserName: o.userName,
RepoName: TESTREPO,
JobType: TRAIN,
ComputeSource: ComputeSource[i],
ClusterType: C2NET,
}
b, _ := json.Marshal(param)
byt := bytes.NewBuffer(b)
resp := struct {
Code int `json:"code"`
Msg string `json:"msg"`
Data model.TaskCreationRequired `json:"data"`
}{}
req := common.GetRestyRequest(common.TIMEOUT)
r, _ := http.NewRequest("GET", creationRequirelUrl, byt)
req.RawRequest = r
req.URL = creationRequirelUrl
_, err := req.
SetHeader("Content-Type", "application/json").
SetQueryParam(common.ACCESSTOKEN, o.accessToken).
SetBody(byt).
SetResult(&resp).
Send()
if err != nil {
return
}
if len(resp.Data.Data.Specs.All) == 0 {
return
}
// balance
var balanceCheck = func() {
balance := resp.Data.Data.PointAccount.Balance
bal := &collector.Usage{}
bal.Type = strings.ToUpper(BALANCE)
bal.Total = &collector.UnitValue{
Unit: POINT,
Value: balance,
}
ch <- bal
}
once.Do(balanceCheck)
m := make(map[string]struct {
Id int `json:"id"`
AccCardsNum int `json:"acc_cards_num"`
AccCardType string `json:"acc_card_type"`
CpuCores int `json:"cpu_cores"`
MemGiB int `json:"mem_gi_b"`
GpuMemGiB int `json:"gpu_mem_gi_b"`
ShareMemGiB int `json:"share_mem_gi_b"`
ComputeResource string `json:"compute_resource"`
UnitPrice int `json:"unit_price"`
SourceSpecId string `json:"source_spec_id"`
HasInternet int `json:"has_internet"`
EnableVisualization bool `json:"enable_visualization"`
})
for _, s := range resp.Data.Data.Specs.All {
e, ok := m[s.AccCardType]
if ok {
if s.AccCardsNum > e.AccCardsNum {
m[s.AccCardType] = s
}
} else {
m[s.AccCardType] = s
}
}
for k, v := range m {
u := &collector.Usage{
Type: ComputeSource[i],
Name: strings.ToUpper(k),
Total: &collector.UnitValue{Unit: NUMBER, Value: v.AccCardsNum},
Available: &collector.UnitValue{Unit: NUMBER, Value: v.AccCardsNum},
}
ch <- u
}
}()
}
}()
// repos
go func() {
defer wg.Done()
reporesp := struct {
Code int `json:"code"`
Msg string `json:"msg"`
Data []model.Repo `json:"data"`
}{}
reporeq := common.GetRestyRequest(common.TIMEOUT)
repor, _ := http.NewRequest("GET", reposUrl, nil)
reporeq.RawRequest = repor
reporeq.URL = reposUrl
_, err := reporeq.
SetHeader("Content-Type", "application/json").
SetQueryParam(common.ACCESSTOKEN, o.accessToken).
SetResult(&reporesp).
Send()
if err != nil {
return
}
if len(reporesp.Data) == 0 {
return
}
// tasklist
var runningJobs atomic.Int64
var jwg sync.WaitGroup
var errs []error
var ech = make(chan error)
jwg.Add(1)
go func() {
defer jwg.Done()
for _, datum := range reporesp.Data {
jwg.Add(1)
dat := datum
go func() {
defer jwg.Done()
param := model.TaskListParam{
UserName: o.userName,
RepoName: dat.Name,
}
b, _ := json.Marshal(param)
byt := bytes.NewBuffer(b)
resp := struct {
Code int `json:"code"`
Msg string `json:"msg"`
Data model.TaskList `json:"data"`
}{}
req := common.GetRestyRequest(common.TIMEOUT)
r, _ := http.NewRequest("GET", taskListUrl, byt)
req.RawRequest = r
req.URL = taskListUrl
_, err := req.
SetHeader("Content-Type", "application/json").
SetQueryParam(common.ACCESSTOKEN, o.accessToken).
SetBody(byt).
SetResult(&resp).
Send()
if err != nil {
// assume occupied running tasks
ech <- err
return
}
if len(resp.Data.Data.Tasks) == 0 {
return
}
for _, task := range resp.Data.Data.Tasks {
if task.Task.Status == RUNNING {
runningJobs.Add(1)
}
}
}()
}
}()
go func() {
jwg.Wait()
close(ech)
}()
for v := range ech {
errs = append(errs, v)
}
run := &collector.Usage{}
run.Type = strings.ToUpper(RUNNINGTASK)
if len(errs) == 0 {
run.Total = &collector.UnitValue{
Unit: NUMBER,
Value: runningJobs.Load(),
}
ch <- run
} else {
running := int64(len(errs)) * 4
run.Total = &collector.UnitValue{
Unit: NUMBER,
Value: running,
}
ch <- run
}
}()
go func() {
defer wg.Done()
rate := &collector.Usage{
Type: strings.ToUpper(RATE),
Total: &collector.UnitValue{Unit: PERHOUR, Value: 1},
}
ch <- rate
}()
go func() {
wg.Wait()
close(ch)
}()
for v := range ch {
resources = append(resources, v)
}
res.Resources = resources
return res, nil
}