存算联动修改

This commit is contained in:
tzwang 2023-10-26 20:21:10 +08:00
parent a2c53e24d1
commit ad11733867
5 changed files with 160 additions and 101 deletions

View File

@ -2,13 +2,10 @@ package storelink
import (
"context"
"github.com/zeromicro/go-zero/core/logx"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/storeLink"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/svc"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/types"
"gitlink.org.cn/jcce-pcm/pcm-participant-octopus/octopus"
"strconv"
"github.com/zeromicro/go-zero/core/logx"
)
type GetAISpecsLogic struct {
@ -17,10 +14,6 @@ type GetAISpecsLogic struct {
svcCtx *svc.ServiceContext
}
const (
Wzhdtest = "wzhdtest"
)
func NewGetAISpecsLogic(ctx context.Context, svcCtx *svc.ServiceContext) *GetAISpecsLogic {
return &GetAISpecsLogic{
Logger: logx.WithContext(ctx),
@ -30,39 +23,13 @@ func NewGetAISpecsLogic(ctx context.Context, svcCtx *svc.ServiceContext) *GetAIS
}
func (l *GetAISpecsLogic) GetAISpecs(req *types.GetResourceSpecsReq) (resp *types.GetResourceSpecsResp, err error) {
var res types.GetResourceSpecsResp
participant := storeLink.GetParticipantById(req.PartId, l.svcCtx.DbEngin)
switch participant.Type {
case storeLink.TYPE_OCTOPUS:
req := &octopus.GetResourceSpecsReq{
Platform: participant.Name,
ResourcePool: "common-pool",
}
specs, err := l.svcCtx.OctopusRpc.GetResourceSpecs(l.ctx, req)
if err != nil || !specs.Success {
return nil, err
}
for _, spec := range specs.TrainResourceSpecs {
var respec types.ResourceSpecSl
respec.SpecId = spec.Id
respec.SpecName = spec.Name
respec.ParticipantId = strconv.FormatInt(participant.Id, 10)
respec.ParticipantName = participant.Name
respec.SpecPrice = spec.Price
res.ResourceSpecs = append(res.ResourceSpecs, &respec)
}
case storeLink.TYPE_SHUGUANGAI:
storelink := storeLink.NewStoreLink(l.ctx, l.svcCtx, participant)
specs, err := storelink.ILinkage.QuerySpecs()
if err != nil {
return nil, err
}
if len(res.ResourceSpecs) == 0 {
res.Success = false
return &res, nil
}
res.Success = true
return &res, nil
specsResp := specs.(types.GetResourceSpecsResp)
return &specsResp, nil
}

View File

@ -3,6 +3,7 @@ package storeLink
import (
"context"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/svc"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/models"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/utils"
"gitlink.org.cn/jcce-pcm/pcm-participant-modelarts/modelarts"
"strings"
@ -23,8 +24,8 @@ type ModelArtsLink struct {
// RESOURCE_POOL = "common-pool"
//)
func NewModelArtsLink(ctx context.Context, svcCtx *svc.ServiceContext, platform string) *ModelArtsLink {
return &ModelArtsLink{ctx: ctx, svcCtx: svcCtx, platform: platform, pageIndex: 1, pageSize: 100}
func NewModelArtsLink(ctx context.Context, svcCtx *svc.ServiceContext, participant *models.ScParticipantPhyInfo) *ModelArtsLink {
return &ModelArtsLink{ctx: ctx, svcCtx: svcCtx, platform: participant.Name, pageIndex: 1, pageSize: 100}
}
func (o *ModelArtsLink) UploadImage(path string) (interface{}, error) {
@ -50,7 +51,7 @@ func (o *ModelArtsLink) QueryImageList() (interface{}, error) {
}
//转换成统一返回类型
imgListResp, err := ConvertType[modelarts.ListReposDetailsResp](resp)
imgListResp, err := ConvertType[modelarts.ListReposDetailsResp](resp, nil)
if err != nil {
return nil, err
}
@ -93,7 +94,7 @@ func (o *ModelArtsLink) SubmitTask(imageId string, cmd string, params []string,
}
//转换成统一返回类型
submitResp, err := ConvertType[modelarts.CreateTrainingJobResp](resp)
submitResp, err := ConvertType[modelarts.CreateTrainingJobResp](resp, nil)
if err != nil {
return nil, err
}
@ -112,7 +113,7 @@ func (o *ModelArtsLink) QueryTask(taskId string) (interface{}, error) {
}
//转换成统一返回类型
taskResp, err := ConvertType[modelarts.JobResponse](resp)
taskResp, err := ConvertType[modelarts.JobResponse](resp, nil)
if err != nil {
return nil, err
}
@ -131,10 +132,14 @@ func (o *ModelArtsLink) DeleteTask(taskId string) (interface{}, error) {
}
//转换成统一返回类型
deleteResp, err := ConvertType[modelarts.DeleteTrainingJobResp](resp)
deleteResp, err := ConvertType[modelarts.DeleteTrainingJobResp](resp, nil)
if err != nil {
return nil, err
}
return deleteResp, nil
}
func (o *ModelArtsLink) QuerySpecs() (interface{}, error) {
return nil, nil
}

View File

@ -3,17 +3,18 @@ package storeLink
import (
"context"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/svc"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/models"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/utils"
"gitlink.org.cn/jcce-pcm/pcm-participant-octopus/octopus"
"strings"
)
type OctopusLink struct {
ctx context.Context
svcCtx *svc.ServiceContext
platform string
pageIndex int32
pageSize int32
ctx context.Context
svcCtx *svc.ServiceContext
pageIndex int32
pageSize int32
participant *models.ScParticipantPhyInfo
}
const (
@ -23,17 +24,17 @@ const (
RESOURCE_POOL = "common-pool"
)
func NewOctopusLink(ctx context.Context, svcCtx *svc.ServiceContext, platform string) *OctopusLink {
return &OctopusLink{ctx: ctx, svcCtx: svcCtx, platform: platform, pageIndex: 1, pageSize: 100}
func NewOctopusLink(ctx context.Context, svcCtx *svc.ServiceContext, participant *models.ScParticipantPhyInfo) *OctopusLink {
return &OctopusLink{ctx: ctx, svcCtx: svcCtx, participant: participant, pageIndex: 1, pageSize: 100}
}
func (o *OctopusLink) UploadImage(path string) (interface{}, error) {
// octopus创建镜像
createReq := &octopus.CreateImageReq{
Platform: o.platform,
Platform: o.participant.Name,
CreateImage: &octopus.CreateImage{
SourceType: 1,
ImageName: IMG_NAME_PREFIX + utils.RandomString(5),
ImageName: IMG_NAME_PREFIX + utils.RandomString(7),
ImageVersion: IMG_VERSION_PREFIX + utils.RandomString(7),
},
}
@ -44,7 +45,7 @@ func (o *OctopusLink) UploadImage(path string) (interface{}, error) {
// octopus上传镜像
uploadReq := &octopus.UploadImageReq{
Platform: o.platform,
Platform: o.participant.Name,
ImageId: createResp.Payload.ImageId,
Params: &octopus.UploadImageParam{
Domain: "",
@ -59,7 +60,7 @@ func (o *OctopusLink) UploadImage(path string) (interface{}, error) {
// Todo 实际上传
//转换成统一返回类型
resp, err := ConvertType[octopus.UploadImageResp](uploadResp)
resp, err := ConvertType[octopus.UploadImageResp](uploadResp, nil)
if err != nil {
return nil, err
}
@ -70,7 +71,7 @@ func (o *OctopusLink) UploadImage(path string) (interface{}, error) {
func (o *OctopusLink) DeleteImage(imageId string) (interface{}, error) {
// octopus删除镜像
req := &octopus.DeleteImageReq{
Platform: o.platform,
Platform: o.participant.Name,
ImageId: imageId,
}
resp, err := o.svcCtx.OctopusRpc.DeleteImage(o.ctx, req)
@ -79,7 +80,7 @@ func (o *OctopusLink) DeleteImage(imageId string) (interface{}, error) {
}
//转换成统一返回类型
deleteResp, err := ConvertType[octopus.DeleteImageResp](resp)
deleteResp, err := ConvertType[octopus.DeleteImageResp](resp, nil)
if err != nil {
return nil, err
}
@ -90,7 +91,7 @@ func (o *OctopusLink) DeleteImage(imageId string) (interface{}, error) {
func (o *OctopusLink) QueryImageList() (interface{}, error) {
// octopus获取镜像列表
req := &octopus.GetUserImageListReq{
Platform: o.platform,
Platform: o.participant.Name,
PageIndex: o.pageIndex,
PageSize: o.pageSize,
}
@ -100,7 +101,7 @@ func (o *OctopusLink) QueryImageList() (interface{}, error) {
}
//转换成统一返回类型
imgListResp, err := ConvertType[octopus.GetUserImageListResp](resp)
imgListResp, err := ConvertType[octopus.GetUserImageListResp](resp, nil)
if err != nil {
return nil, err
}
@ -120,7 +121,7 @@ func (o *OctopusLink) SubmitTask(imageId string, cmd string, params []string, re
}
req := &octopus.CreateTrainJobReq{
Platform: o.platform,
Platform: o.participant.Name,
Params: &octopus.CreateTrainJobParam{
ImageId: imageId,
Name: TASK_NAME_PREFIX + utils.RandomString(7),
@ -143,7 +144,7 @@ func (o *OctopusLink) SubmitTask(imageId string, cmd string, params []string, re
}
//转换成统一返回类型
submitResp, err := ConvertType[octopus.CreateTrainJobResp](resp)
submitResp, err := ConvertType[octopus.CreateTrainJobResp](resp, nil)
if err != nil {
return nil, err
}
@ -154,7 +155,7 @@ func (o *OctopusLink) SubmitTask(imageId string, cmd string, params []string, re
func (o *OctopusLink) QueryTask(taskId string) (interface{}, error) {
// octopus获取任务
req := &octopus.GetTrainJobReq{
Platform: o.platform,
Platform: o.participant.Name,
Id: taskId,
}
resp, err := o.svcCtx.OctopusRpc.GetTrainJob(o.ctx, req)
@ -163,7 +164,7 @@ func (o *OctopusLink) QueryTask(taskId string) (interface{}, error) {
}
//转换成统一返回类型
taskResp, err := ConvertType[octopus.GetTrainJobResp](resp)
taskResp, err := ConvertType[octopus.GetTrainJobResp](resp, nil)
if err != nil {
return nil, err
}
@ -174,7 +175,7 @@ func (o *OctopusLink) QueryTask(taskId string) (interface{}, error) {
func (o *OctopusLink) DeleteTask(taskId string) (interface{}, error) {
// octopus删除任务
req := &octopus.DeleteTrainJobReq{
Platform: o.platform,
Platform: o.participant.Name,
JobIds: []string{taskId},
}
resp, err := o.svcCtx.OctopusRpc.DeleteTrainJob(o.ctx, req)
@ -183,10 +184,30 @@ func (o *OctopusLink) DeleteTask(taskId string) (interface{}, error) {
}
//转换成统一返回类型
deleteResp, err := ConvertType[octopus.DeleteTrainJobResp](resp)
deleteResp, err := ConvertType[octopus.DeleteTrainJobResp](resp, nil)
if err != nil {
return nil, err
}
return deleteResp, nil
}
func (o *OctopusLink) QuerySpecs() (interface{}, error) {
// octopus查询资源规格
req := &octopus.GetResourceSpecsReq{
Platform: o.participant.Name,
ResourcePool: "common-pool",
}
resp, err := o.svcCtx.OctopusRpc.GetResourceSpecs(o.ctx, req)
if err != nil {
return nil, err
}
//转换成统一返回类型
specsResp, err := ConvertType[octopus.GetResourceSpecsResp](resp, o.participant)
if err != nil {
return nil, err
}
return specsResp, nil
}

View File

@ -3,6 +3,7 @@ package storeLink
import (
"context"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/svc"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/models"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/utils"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/utils/timeutils"
"gitlink.org.cn/jcce-pcm/pcm-participant-ac/hpcAC"
@ -10,23 +11,13 @@ import (
)
type ShuguangAi struct {
ctx context.Context
svcCtx *svc.ServiceContext
ctx context.Context
svcCtx *svc.ServiceContext
participant *models.ScParticipantPhyInfo
}
const (
DCU = "dcu"
PYTORCH = "Pytorch"
TASK_PYTORCH_PREFIX = "PytorchTask"
TENSORFLOW = "Tensorflow"
RESOURCE_GROUP = "wzhdtest"
WorkPath = "/work/home/acgnnmfbwo/111111/py/"
TimeoutLimit = "10:00:00"
PythonCodePath = "/work/home/acgnnmfbwo/111111/py/test.py"
)
func NewShuguangAi(ctx context.Context, svcCtx *svc.ServiceContext) *ShuguangAi {
return &ShuguangAi{ctx: ctx, svcCtx: svcCtx}
func NewShuguangAi(ctx context.Context, svcCtx *svc.ServiceContext, participant *models.ScParticipantPhyInfo) *ShuguangAi {
return &ShuguangAi{ctx: ctx, svcCtx: svcCtx, participant: participant}
}
func (s *ShuguangAi) UploadImage(path string) (interface{}, error) {
@ -49,7 +40,7 @@ func (s *ShuguangAi) QueryImageList() (interface{}, error) {
}
//转换成统一返回类型
imgListResp, err := ConvertType[hpcAC.GetImageListAiResp](resp)
imgListResp, err := ConvertType[hpcAC.GetImageListAiResp](resp, nil)
if err != nil {
return nil, err
}
@ -94,7 +85,7 @@ func (s *ShuguangAi) SubmitTask(imageId string, cmd string, params []string, res
}
//转换成统一返回类型
submitResp, err := ConvertType[hpcAC.SubmitTaskAiResp](resp)
submitResp, err := ConvertType[hpcAC.SubmitTaskAiResp](resp, nil)
if err != nil {
return nil, err
}
@ -113,7 +104,7 @@ func (s *ShuguangAi) QueryTask(taskId string) (interface{}, error) {
}
//转换成统一返回类型
taskResp, err := ConvertType[hpcAC.GetPytorchTaskResp](resp)
taskResp, err := ConvertType[hpcAC.GetPytorchTaskResp](resp, nil)
if err != nil {
return nil, err
}
@ -132,10 +123,30 @@ func (s *ShuguangAi) DeleteTask(taskId string) (interface{}, error) {
}
//转换成统一返回类型
deleteResp, err := ConvertType[hpcAC.DeleteTaskAiResp](resp)
deleteResp, err := ConvertType[hpcAC.DeleteTaskAiResp](resp, nil)
if err != nil {
return nil, err
}
return deleteResp, nil
}
func (o *ShuguangAi) QuerySpecs() (interface{}, error) {
// ShuguangAi查询资源规格
req := &hpcAC.GetResourceSpecReq{
AcceleratorType: DCU,
ResourceGroup: RESOURCE_GROUP,
}
specs, err := o.svcCtx.ACRpc.GetResourceSpec(o.ctx, req)
if err != nil {
return nil, err
}
//转换成统一返回类型
specsResp, err := ConvertType[hpcAC.GetResourceSpecResp](specs, o.participant)
if err != nil {
return nil, err
}
return specsResp, nil
}

View File

@ -11,6 +11,7 @@ import (
"gitlink.org.cn/jcce-pcm/pcm-participant-modelarts/modelarts"
"gitlink.org.cn/jcce-pcm/pcm-participant-octopus/octopus"
"gorm.io/gorm"
"strconv"
)
type Linkage interface {
@ -19,16 +20,25 @@ type Linkage interface {
QueryImageList() (interface{}, error)
SubmitTask(imageId string, cmd string, params []string, resourceId string) (interface{}, error)
QueryTask(taskId string) (interface{}, error)
QuerySpecs() (interface{}, error)
DeleteTask(taskId string) (interface{}, error)
}
const (
COMMA = ","
TYPE_OCTOPUS = "1"
TYPE_MODELARTS = "2"
TYPE_SHUGUANGAI = "3"
OCTOPUS = "Octopus"
MODELARTS = "Modelarts"
COMMA = ","
TYPE_OCTOPUS = "1"
TYPE_MODELARTS = "2"
TYPE_SHUGUANGAI = "3"
OCTOPUS = "Octopus"
MODELARTS = "Modelarts"
DCU = "dcu"
PYTORCH = "Pytorch"
TASK_PYTORCH_PREFIX = "PytorchTask"
TENSORFLOW = "Tensorflow"
RESOURCE_GROUP = "wzhdtest"
WorkPath = "/work/home/acgnnmfbwo/111111/py/"
TimeoutLimit = "10:00:00"
PythonCodePath = "/work/home/acgnnmfbwo/111111/py/test.py"
)
var (
@ -47,25 +57,35 @@ type StoreLink struct {
ILinkage Linkage
}
func NewStoreLink(ctx context.Context, svcCtx *svc.ServiceContext, participant models.ScParticipantPhyInfo) *StoreLink {
//todo 创建modelarts client
linkStruct := NewOctopusLink(ctx, svcCtx, participant.Name)
return &StoreLink{ILinkage: linkStruct}
func NewStoreLink(ctx context.Context, svcCtx *svc.ServiceContext, participant *models.ScParticipantPhyInfo) *StoreLink {
switch participant.Type {
case TYPE_OCTOPUS:
linkStruct := NewOctopusLink(ctx, svcCtx, participant)
return &StoreLink{ILinkage: linkStruct}
case TYPE_MODELARTS:
linkStruct := NewModelArtsLink(ctx, svcCtx, participant)
return &StoreLink{ILinkage: linkStruct}
case TYPE_SHUGUANGAI:
linkStruct := NewShuguangAi(ctx, svcCtx, participant)
return &StoreLink{ILinkage: linkStruct}
default:
return nil
}
}
func GetParticipants(dbEngin *gorm.DB) []models.ScParticipantPhyInfo {
var participants []models.ScParticipantPhyInfo
func GetParticipants(dbEngin *gorm.DB) []*models.ScParticipantPhyInfo {
var participants []*models.ScParticipantPhyInfo
dbEngin.Raw("select * from sc_participant_phy_info where type = 1").Scan(&participants)
return participants
}
func GetParticipantById(partId int64, dbEngin *gorm.DB) models.ScParticipantPhyInfo {
func GetParticipantById(partId int64, dbEngin *gorm.DB) *models.ScParticipantPhyInfo {
var participant models.ScParticipantPhyInfo
dbEngin.Raw("select * from sc_participant_phy_info where id = ?", partId).Scan(&participant)
return participant
return &participant
}
func ConvertType[T any](in *T) (interface{}, error) {
func ConvertType[T any](in *T, participant *models.ScParticipantPhyInfo) (interface{}, error) {
switch (interface{})(in).(type) {
case *octopus.UploadImageResp:
@ -263,6 +283,41 @@ func ConvertType[T any](in *T) (interface{}, error) {
resp.ErrorMsg = inresp.Msg
}
return resp, nil
case *octopus.GetResourceSpecsResp:
var resp types.GetResourceSpecsResp
inresp := (interface{})(in).(*octopus.GetResourceSpecsResp)
resp.Success = inresp.Success
if !resp.Success {
resp.ResourceSpecs = nil
return resp, nil
}
for _, spec := range inresp.TrainResourceSpecs {
var respec types.ResourceSpecSl
respec.SpecId = spec.Id
respec.SpecName = spec.Name
respec.ParticipantId = strconv.FormatInt(participant.Id, 10)
respec.ParticipantName = participant.Name
respec.SpecPrice = spec.Price
resp.ResourceSpecs = append(resp.ResourceSpecs, &respec)
}
return resp, nil
case *hpcAC.GetResourceSpecResp:
var resp types.GetResourceSpecsResp
inresp := (interface{})(in).(*hpcAC.GetResourceSpecResp)
if inresp.Code != "0" {
resp.Success = false
resp.ResourceSpecs = nil
} else {
var spec types.ResourceSpecSl
resp.Success = true
spec.ParticipantName = participant.Name
spec.ParticipantId = strconv.FormatInt(participant.Id, 10)
resp.ResourceSpecs = append(resp.ResourceSpecs, &spec)
}
return resp, nil
default:
return nil, errors.New("type convert fail")
}