update octopushttp submit #516

Merged
tzwang merged 1 commits from tzwang/pcm-coordinator:master into master 2025-06-26 17:31:56 +08:00
2 changed files with 112 additions and 36 deletions

View File

@ -25,3 +25,9 @@ type OctResourceSpecs struct {
type OctTrainJob struct { type OctTrainJob struct {
TrainJob *omodel.TrainJob `json:"trainJob"` TrainJob *omodel.TrainJob `json:"trainJob"`
} }
type OctCreateAlgorithm struct {
AlgorithmId string `json:"algorithmId"`
Version string `json:"version"`
CreatedAt int `json:"createdAt"`
}

View File

@ -25,33 +25,34 @@ import (
) )
const ( const (
RESOURCE_POOL = "grampus-pool" RESOURCE_POOL = "grampus-pool"
Param_Token = "token" Param_Token = "token"
Param_Addr = "addr" Param_Addr = "addr"
Forward_Slash = "/" Forward_Slash = "/"
COMMA = "," COMMA = ","
UNDERSCORE = "_" UNDERSCORE = "_"
TASK_NAME_PREFIX = "trainJob" TASK_NAME_PREFIX = "trainJob"
Python = "python " Python = "python "
SemiColon = ";" SemiColon = ";"
BALANCE = "balance" BALANCE = "balance"
RATE = "rate" RATE = "rate"
PERHOUR = "per-hour" PERHOUR = "per-hour"
NUMBER = "number" NUMBER = "number"
KILOBYTE = "kb" KILOBYTE = "kb"
GIGABYTE = "gb" GIGABYTE = "gb"
CPUCORE = "core" CPUCORE = "core"
STORAGE = "STORAGE" STORAGE = "STORAGE"
DISK = "disk" DISK = "disk"
MEMORY = "memory" MEMORY = "memory"
RAM = "ram" RAM = "ram"
VRAM = "vram" VRAM = "vram"
RMB = "rmb" RMB = "rmb"
POINT = "point" POINT = "point"
RUNNINGTASK = "RUNNING_TASK" RUNNINGTASK = "RUNNING_TASK"
RUNNING = "RUNNING" RUNNING = "RUNNING"
CPU = "cpu" CPU = "cpu"
Gi = "Gi" Gi = "Gi"
AlgorithmRecordOnlyVersion = "V1"
) )
const ( const (
@ -60,6 +61,7 @@ const (
const ( const (
MyAlgorithmListUrl = "api/v1/algorithm/myAlgorithmList" MyAlgorithmListUrl = "api/v1/algorithm/myAlgorithmList"
CreateAlgorithm = "api/v1/algorithm/create"
ResourcespecsUrl = "api/v1/resource/specs" ResourcespecsUrl = "api/v1/resource/specs"
CreateTrainJobUrl = "api/v1/job/create" CreateTrainJobUrl = "api/v1/job/create"
TrainJobDetail = "api/v1/job/detail" TrainJobDetail = "api/v1/job/detail"
@ -120,7 +122,53 @@ func (o *OctopusHttp) Execute(ctx context.Context, option *option.AiOption, mode
option.Cmd = Python + option.AlgorithmId option.Cmd = Python + option.AlgorithmId
} }
// algorithm
param := &omodel.CreateMyAlgorithmParam{
AlgorithmName: option.AlgorithmId,
ModelName: option.AlgorithmId,
}
algorithm, err := o.createAlgorithm(ctx, param)
if err != nil {
return nil, err
}
if algorithm.Code != http.StatusOK {
if algorithm.Data != nil {
marshal, err := json.Marshal(algorithm.Data)
if err != nil {
return nil, err
}
errormdl := &omodel.Error{}
err = json.Unmarshal(marshal, errormdl)
if err != nil {
return nil, err
}
return nil, errors.New(errormdl.Message)
}
} else {
if algorithm.Data != nil {
result := &entity.OctCreateAlgorithm{}
marshal, err := json.Marshal(algorithm.Data)
if err != nil {
return nil, err
}
err = json.Unmarshal(marshal, result)
if err != nil {
return nil, err
}
if result.AlgorithmId == "" {
return nil, errors.New("createAlgorithm failed")
}
option.AlgorithmId = result.AlgorithmId
} else {
return nil, errors.New("createAlgorithm failed")
}
}
// resource
option.ResourceId = "964fdee2db544928bfea74dac12a924f" option.ResourceId = "964fdee2db544928bfea74dac12a924f"
// submit
task, err := o.SubmitTask(ctx, option.ImageId, option.Cmd, option.Envs, option.Params, option.ResourceId, option.DatasetsId, option.AlgorithmId, option.TaskType) task, err := o.SubmitTask(ctx, option.ImageId, option.Cmd, option.Envs, option.Params, option.ResourceId, option.DatasetsId, option.AlgorithmId, option.TaskType)
if err != nil { if err != nil {
return nil, err return nil, err
@ -169,12 +217,12 @@ func (o *OctopusHttp) SubmitTask(ctx context.Context, imageId string, cmd string
param := &omodel.CreateTrainJobParam{ param := &omodel.CreateTrainJobParam{
//DataSetId: datasetsId, //DataSetId: datasetsId,
//DataSetVersion: VERSION, //DataSetVersion: VERSION,
//AlgorithmId: algorithmId, AlgorithmId: algorithmId,
//AlgorithmVersion: VERSION, AlgorithmVersion: AlgorithmRecordOnlyVersion,
Name: TASK_NAME_PREFIX + UNDERSCORE + utils.RandomString(10), Name: TASK_NAME_PREFIX + UNDERSCORE + utils.RandomString(10),
ImageId: imageId, ImageId: imageId,
IsDistributed: false, IsDistributed: false,
ResourcePool: RESOURCE_POOL, ResourcePool: RESOURCE_POOL,
Config: []*omodel.CreateTrainJobConf{ Config: []*omodel.CreateTrainJobConf{
{ {
Command: cmd, Command: cmd,
@ -182,8 +230,8 @@ func (o *OctopusHttp) SubmitTask(ctx context.Context, imageId string, cmd string
MinFailedTaskCount: 1, MinFailedTaskCount: 1,
MinSucceededTaskCount: 1, MinSucceededTaskCount: 1,
TaskNumber: 1, TaskNumber: 1,
//Parameters: prms, Parameters: prms,
Envs: envMap, Envs: envMap,
}, },
}, },
} }
@ -206,6 +254,28 @@ func (o *OctopusHttp) SubmitTask(ctx context.Context, imageId string, cmd string
} }
func (o *OctopusHttp) createAlgorithm(ctx context.Context, param *omodel.CreateMyAlgorithmParam) (*entity.OctResp, error) {
createAlgorithmUrl := o.server + CreateAlgorithm
token, err := o.token.Get()
if err != nil {
return nil, err
}
resp := &entity.OctResp{}
req := common.GetRestyRequest(common.TIMEOUT)
_, err = req.
SetHeader("Authorization", "Bearer "+token).
SetBody(param).
SetResult(resp).
Post(createAlgorithmUrl)
if err != nil {
return nil, err
}
return resp, nil
}
// collector // collector
func (o *OctopusHttp) resourceSpecs(ctx context.Context) (*entity.OctResp, error) { func (o *OctopusHttp) resourceSpecs(ctx context.Context) (*entity.OctResp, error) {
resourcespecsUrl := o.server + ResourcespecsUrl resourcespecsUrl := o.server + ResourcespecsUrl
@ -447,7 +517,7 @@ func (o *OctopusHttp) UploadAlgorithmCode(ctx context.Context, resourceType stri
return nil return nil
} }
func (o OctopusHttp) GetComputeCards(ctx context.Context) ([]string, error) { func (o *OctopusHttp) GetComputeCards(ctx context.Context) ([]string, error) {
return nil, errors.New(NotImplementError) return nil, errors.New(NotImplementError)
} }