pcm-modelarts/internal/pkg/cron/taskcron.go

301 lines
9.9 KiB
Go

package cron
import (
"bytes"
"fmt"
"github.com/go-resty/resty/v2"
coreClient "gitlink.org.cn/JointCloud/pcm-coordinator/client"
"gitlink.org.cn/JointCloud/pcm-modelarts/internal/svc"
"gitlink.org.cn/JointCloud/pcm-modelarts/internal/util"
"k8s.io/apimachinery/pkg/util/json"
"log"
"net/http"
"strconv"
)
type AiJobInfo struct {
TaskId int64 `json:"taskId,omitempty"`
ProjectId string `json:"project_id,omitempty"`
AdapterId int64 `json:"adapterId,omitempty,optional"`
AdapterName string `json:"adapterName,omitempty,optional"`
ClusterId int64 `json:"clusterId,omitempty,optional"`
ClusterName string `json:"clusterName,omitempty,optional"`
Name string `json:"name,omitempty"`
Status string `json:"status,omitempty"`
StartTime string `json:"startTime,omitempty"`
RunningTime int64 `json:"runningTime,omitempty"`
Result string `json:"result,omitempty"`
JobId string `json:"jobId,omitempty"`
CreateTime string `json:"createTime,omitempty"`
ImageUrl string `json:"imageUrl,omitempty"`
Command string `json:"command,omitempty"`
FlavorId string `json:"flavorId,omitempty"`
SubscriptionId string `json:"subscriptionId,omitempty"`
ItemVersionId string `json:"itemVersionId,omitempty"`
}
type CreateTrainingJobReq struct {
Kind string `json:"kind,omitempty"`
MetadataS MetadataS `json:"metadata,omitempty"`
Algorithms Algorithms `json:"algorithm,omitempty"`
SpecsC SpecsC `json:"spec,omitempty"`
Platform string `json:"platform,omitempty"`
}
type MetadataS struct {
Id string `json:"id,omitempty"`
Name string `json:"name,omitempty"`
Description string `json:"description,omitempty"`
WorkspaceId string `json:"workspace_id,omitempty"`
}
type Algorithms struct {
Id string `json:"id,omitempty"`
Name string `json:"name,omitempty"`
CodeDir string `json:"code_dir,omitempty"`
BootFile string `json:"boot_file,omitempty"`
Engine EngineCreateTraining `json:"engine,omitempty"`
Parameters []string `json:"parameters,omitempty"`
Policies PoliciesCreateTraining `json:"policies,omitempty"`
Command string `json:"command,omitempty"`
SubscriptionId string `json:"subscription_id,omitempty"`
ItemVersionId string `json:"item_version_id,omitempty"`
InputTra []InputTraining `json:"inputs,omitempty"`
OutputTra []OutputTraining `json:"outputs,omitempty"`
Environments []string `json:"environments,omitempty"`
LocalCodeDir string `json:"local_code_dir,omitempty"`
WorkingDir string `json:"working_dir,omitempty"`
}
type SpecsC struct {
Resource ResourceCreateTraining `json:"resource,omitempty"`
LogExportPath LogExportPath `json:"log_export_path,omitempty"`
Volumes []Volumes `json:"volumes,omitempty"`
}
type EngineCreateTraining struct {
ImageUrl string `json:"image_url,omitempty"`
}
type CreateTrainingJobResp struct {
Kind string `json:"kind,omitempty"`
MetadataS MetadataS `json:"metadata,omitempty"`
Algorithms Algorithms `json:"algorithm,omitempty"`
SpecsC SpecsC `json:"spec,omitempty"`
Status Status `json:"status,omitempty"`
ErrorMsg string `json:"error_msg,omitempty"`
ErrorCode string `json:"error_code,omitempty"`
ErrorSolution string `json:"error_solution,omitempty"`
}
type Status struct {
Phase string `json:"phase,omitempty"`
SecondaryPhase string `json:"secondary_phase,omitempty"`
Duration string `json:"duration,omitempty"`
Tasks []string `json:"tasks,omitempty"`
StartTime uint64 `json:"start_time,omitempty"`
Task_statuses []Task_statuses `json:"task_statuses,omitempty"`
}
type Task_statuses struct {
Task string `json:"task,omitempty"`
ExitCode string `json:"exit_code,omitempty"`
Message string `json:"message,omitempty"`
}
type ParametersTrainJob struct {
}
type PoliciesCreateTraining struct {
}
type InputTraining struct {
Name string `json:"name,omitempty"`
AccessMethod string `json:"access_method,omitempty"`
RemoteTra RemoteTra `json:"remote,omitempty"`
}
type OutputTraining struct {
Name string `json:"name,omitempty"`
AccessMethod string `json:"access_method,omitempty"`
PrefetchToLocal bool `json:"prefetch_to_local,omitempty"`
RemoteOut RemoteOut `json:"remote,omitempty"`
}
type ResourceCreateTraining struct {
FlavorId string `json:"flavor_id,omitempty"`
NodeCount int32 `json:"node_count,omitempty"`
Policy string `json:"policy,omitempty"`
FlavorLabel string `json:"flavor_label,omitempty"`
}
type LogExportPath struct {
}
type Volumes struct {
}
type RemoteOut struct {
ObsTra ObsTra `json:"obs,omitempty"`
}
type ObsTra struct {
ObsUrl string `json:"obs_url,omitempty"`
}
type RemoteTra struct {
Obs ObsTra `json:"obs,omitempty"`
}
type DatasetTra struct {
Id string `json:"id,omitempty"`
Name string `json:"name,omitempty"`
VersionName string `json:"version_name,omitempty"`
VersionId string `json:"version_id,omitempty"`
}
type GetTrainingJobIdByIdReq struct {
TrainingJobId string `json:"training_job_id,omitempty"`
Platform string `json:"platform,omitempty"`
ProjectId string `json:"project_id,omitempty"`
}
type GetTrainingJobIdByIdResp struct {
Kind string `json:"kind,omitempty"`
MetadataS MetadataS `json:"metadata,omitempty"`
Algorithms Algorithms `json:"algorithm,omitempty"`
SpecsC SpecsC `json:"spec,omitempty"`
Status Status `json:"status,omitempty"`
}
func PullTaskInfo(svc *svc.ServiceContext) {
AdapterId := svc.Config.AdapterId
CoreUrl := svc.Config.CoreUrl
fmt.Println("AdapterId:", AdapterId)
fmt.Println("CoreUrl:", CoreUrl)
httpClient := resty.New().R()
result, _ := httpClient.SetHeader("Content-Type", "application/json").
SetQueryParam("adapterId", strconv.FormatInt(AdapterId, 10)).
Get(CoreUrl + "/pcm/v1/core/pullTaskInfo")
var resp coreClient.PullTaskInfoResp
err := json.Unmarshal(result.Body(), &resp)
if err != nil {
return
}
if resp.AiInfoList != nil && len(resp.AiInfoList) != 0 {
var oldAiInfoList []coreClient.AiInfo
CreateTrainingJobReq := CreateTrainingJobReq{}
for _, aiInfo := range resp.AiInfoList {
if aiInfo.Status == "Saved" {
CreateTrainingJobReq.Kind = "job"
CreateTrainingJobReq.MetadataS.Name = aiInfo.Name
CreateTrainingJobReq.MetadataS.WorkspaceId = "0"
CreateTrainingJobReq.Algorithms.Id = aiInfo.AlgorithmId
CreateTrainingJobReq.Algorithms.Engine.ImageUrl = aiInfo.ImageId
CreateTrainingJobReq.Algorithms.Command = aiInfo.Command
CreateTrainingJobReq.Algorithms.Environments = aiInfo.Environments
CreateTrainingJobReq.Algorithms.Parameters = aiInfo.Parameters
CreateTrainingJobReq.SpecsC.Resource.FlavorId = aiInfo.FlavorId
CreateTrainingJobReq.SpecsC.Resource.NodeCount = 1
CreateTrainingJobReq.SpecsC.Resource.Policy = "regular"
respTraining, err := CreateTrainingJob(CreateTrainingJobReq)
if err != nil {
log.Print(err)
}
aiInfo.JobId = respTraining.MetadataS.Id
oldAiInfoList = append(oldAiInfoList, *aiInfo)
aiInfo.Status = "Running"
// push submitted mark to coordinator
PushReq := coreClient.PushTaskInfoReq{
AdapterId: AdapterId,
AiInfoList: resp.AiInfoList,
}
if len(PushReq.AiInfoList) != 0 {
url := CoreUrl + "/pcm/v1/core/pushTaskInfo"
method := "POST"
jsonStr, _ := json.Marshal(PushReq)
payload := bytes.NewBuffer(jsonStr)
client := &http.Client{}
req, _ := http.NewRequest(method, url, payload)
req.Header.Add("Content-Type", "application/json")
resp, err := client.Do(req)
if err != nil {
return
}
fmt.Print(resp)
}
} else if aiInfo.Status == "Pending" || aiInfo.Status == "Running" {
GetTrainingJobIdByIdReq := GetTrainingJobIdByIdReq{}
GetTrainingJobIdByIdReq.TrainingJobId = aiInfo.JobId
GetTrainingJobIdByIdReq.ProjectId = aiInfo.ProjectId
GetTrainingJobByIdResp, err := GetTrainingJob(GetTrainingJobIdByIdReq)
if err != nil {
return
}
aiInfo.Status = GetTrainingJobByIdResp.Status.Phase
// push submitted mark to coordinator
PushReq := coreClient.PushTaskInfoReq{
AdapterId: AdapterId,
AiInfoList: resp.AiInfoList,
}
if len(PushReq.AiInfoList) != 0 {
url := CoreUrl + "/pcm/v1/core/pushTaskInfo"
method := "POST"
jsonStr, _ := json.Marshal(PushReq)
payload := bytes.NewBuffer(jsonStr)
client := &http.Client{}
req, _ := http.NewRequest(method, url, payload)
req.Header.Add("Content-Type", "application/json")
resp, err := client.Do(req)
if err != nil {
return
}
fmt.Print(resp)
}
}
}
}
}
func CreateTrainingJob(req CreateTrainingJobReq) (CreateTrainingJobResp, error) {
var resp CreateTrainingJobResp
platform, err := util.GetModelArtsConfWithPlatform("modelarts-CloudBrain2")
if err != nil {
log.Print(err)
}
reqByte, err := json.Marshal(req)
body, err := util.SendRequest("POST", platform.Endpoint+"v2/"+platform.ProjectId+"/training-jobs",
bytes.NewBuffer(reqByte), "modelarts-CloudBrain2")
if err != nil {
log.Print(err)
}
json.Unmarshal(*body, &resp)
return resp, nil
}
func GetTrainingJob(req GetTrainingJobIdByIdReq) (GetTrainingJobIdByIdResp, error) {
var resp GetTrainingJobIdByIdResp
platform, err := util.GetModelArtsConfWithPlatform("modelarts-CloudBrain2")
if err != nil {
log.Print(err)
}
body, err := util.SendRequest("GET", platform.Endpoint+"v2/"+platform.ProjectId+"/training-jobs/"+req.TrainingJobId,
nil, "modelarts-CloudBrain2")
if err != nil {
log.Print(err)
}
json.Unmarshal(*body, &resp)
return resp, nil
}