forked from JointCloud/pcm-coordinator
存算联动修改
This commit is contained in:
parent
a25b621bf5
commit
68915cf43a
|
@ -6,6 +6,7 @@ import (
|
|||
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/storeLink"
|
||||
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/svc"
|
||||
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/types"
|
||||
"gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/models"
|
||||
)
|
||||
|
||||
type GetLinkImageListLogic struct {
|
||||
|
@ -24,6 +25,14 @@ func NewGetLinkImageListLogic(ctx context.Context, svcCtx *svc.ServiceContext) *
|
|||
|
||||
func (l *GetLinkImageListLogic) GetLinkImageList(req *types.GetLinkImageListReq) (resp *types.GetLinkImageListResp, err error) {
|
||||
participant := storeLink.GetParticipantById(req.PartId, l.svcCtx.DbEngin)
|
||||
if *participant == (models.StorelinkCenter{}) {
|
||||
resp = &types.GetLinkImageListResp{}
|
||||
resp.Success = false
|
||||
resp.Images = nil
|
||||
resp.ErrorMsg = "partId不存在"
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
storelink := storeLink.NewStoreLink(l.ctx, l.svcCtx, participant)
|
||||
list, err := storelink.ILinkage.QueryImageList()
|
||||
if err != nil {
|
||||
|
|
|
@ -5,6 +5,7 @@ import (
|
|||
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/storeLink"
|
||||
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/svc"
|
||||
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/types"
|
||||
"gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/models"
|
||||
|
||||
"github.com/zeromicro/go-zero/core/logx"
|
||||
)
|
||||
|
@ -25,6 +26,14 @@ func NewGetLinkTaskLogic(ctx context.Context, svcCtx *svc.ServiceContext) *GetLi
|
|||
|
||||
func (l *GetLinkTaskLogic) GetLinkTask(req *types.GetLinkTaskReq) (resp *types.GetLinkTaskResp, err error) {
|
||||
participant := storeLink.GetParticipantById(req.PartId, l.svcCtx.DbEngin)
|
||||
if *participant == (models.StorelinkCenter{}) {
|
||||
resp = &types.GetLinkTaskResp{}
|
||||
resp.Success = false
|
||||
resp.Task = nil
|
||||
resp.ErrorMsg = "partId不存在"
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
storelink := storeLink.NewStoreLink(l.ctx, l.svcCtx, participant)
|
||||
task, err := storelink.ILinkage.QueryTask(req.TaskId)
|
||||
if err != nil {
|
||||
|
|
|
@ -2,12 +2,10 @@ package storelink
|
|||
|
||||
import (
|
||||
"context"
|
||||
"github.com/zeromicro/go-zero/core/logx"
|
||||
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/storeLink"
|
||||
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/svc"
|
||||
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/types"
|
||||
"strconv"
|
||||
|
||||
"github.com/zeromicro/go-zero/core/logx"
|
||||
)
|
||||
|
||||
type GetParticipantsLogic struct {
|
||||
|
@ -26,22 +24,23 @@ func NewGetParticipantsLogic(ctx context.Context, svcCtx *svc.ServiceContext) *G
|
|||
|
||||
func (l *GetParticipantsLogic) GetParticipants(req *types.GetParticipantsReq) (resp *types.GetParticipantsResp, err error) {
|
||||
participants := storeLink.GetParticipants(l.svcCtx.DbEngin)
|
||||
var res types.GetParticipantsResp
|
||||
resp = &types.GetParticipantsResp{}
|
||||
|
||||
if len(participants) == 0 {
|
||||
res.Success = false
|
||||
return &res, nil
|
||||
if participants == nil {
|
||||
resp.Success = false
|
||||
resp.Participants = nil
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
for _, participant := range participants {
|
||||
var p types.ParticipantSl
|
||||
p.ParticipantId = strconv.FormatInt(participant.Id, 10)
|
||||
p.ParticipantId = participant.Id
|
||||
p.ParticipantType = storeLink.AITYPE[participant.Type]
|
||||
p.ParticipantName = participant.Name
|
||||
res.Participants = append(res.Participants, &p)
|
||||
resp.Participants = append(resp.Participants, &p)
|
||||
}
|
||||
|
||||
res.Success = true
|
||||
return &res, nil
|
||||
resp.Success = true
|
||||
return resp, nil
|
||||
|
||||
}
|
||||
|
|
|
@ -5,6 +5,7 @@ import (
|
|||
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/storeLink"
|
||||
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/svc"
|
||||
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/types"
|
||||
"gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/models"
|
||||
|
||||
"github.com/zeromicro/go-zero/core/logx"
|
||||
)
|
||||
|
@ -25,6 +26,14 @@ func NewSubmitLinkTaskLogic(ctx context.Context, svcCtx *svc.ServiceContext) *Su
|
|||
|
||||
func (l *SubmitLinkTaskLogic) SubmitLinkTask(req *types.SubmitLinkTaskReq) (resp *types.SubmitLinkTaskResp, err error) {
|
||||
participant := storeLink.GetParticipantById(req.PartId, l.svcCtx.DbEngin)
|
||||
if *participant == (models.StorelinkCenter{}) {
|
||||
resp = &types.SubmitLinkTaskResp{}
|
||||
resp.Success = false
|
||||
resp.TaskId = ""
|
||||
resp.ErrorMsg = "partId不存在"
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
storelink := storeLink.NewStoreLink(l.ctx, l.svcCtx, participant)
|
||||
var params []string
|
||||
if len(req.Params) != 0 {
|
||||
|
@ -33,7 +42,14 @@ func (l *SubmitLinkTaskLogic) SubmitLinkTask(req *types.SubmitLinkTaskReq) (resp
|
|||
params = append(params, param)
|
||||
}
|
||||
}
|
||||
task, err := storelink.ILinkage.SubmitTask(req.ImageId, req.Cmd, params, req.ResourceId)
|
||||
var envs []string
|
||||
if len(req.Envs) != 0 {
|
||||
for _, v := range req.Envs {
|
||||
env := v.Key + storeLink.COMMA + v.Val
|
||||
envs = append(envs, env)
|
||||
}
|
||||
}
|
||||
task, err := storelink.ILinkage.SubmitTask(req.ImageId, req.Cmd, envs, params, req.ResourceId)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
|
@ -5,6 +5,7 @@ import (
|
|||
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/storeLink"
|
||||
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/svc"
|
||||
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/types"
|
||||
"gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/models"
|
||||
|
||||
"github.com/zeromicro/go-zero/core/logx"
|
||||
)
|
||||
|
@ -25,6 +26,14 @@ func NewUploadLinkImageLogic(ctx context.Context, svcCtx *svc.ServiceContext) *U
|
|||
|
||||
func (l *UploadLinkImageLogic) UploadLinkImage(req *types.UploadLinkImageReq) (resp *types.UploadLinkImageResp, err error) {
|
||||
participant := storeLink.GetParticipantById(req.PartId, l.svcCtx.DbEngin)
|
||||
if *participant == (models.StorelinkCenter{}) {
|
||||
resp = &types.UploadLinkImageResp{}
|
||||
resp.Success = false
|
||||
resp.Image = nil
|
||||
resp.ErrorMsg = "partId不存在"
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
storelink := storeLink.NewStoreLink(l.ctx, l.svcCtx, participant)
|
||||
img, err := storelink.ILinkage.UploadImage(req.FilePath)
|
||||
if err != nil {
|
||||
|
|
|
@ -62,7 +62,7 @@ func (o *ModelArtsLink) QueryImageList() (interface{}, error) {
|
|||
return imgListResp, nil
|
||||
}
|
||||
|
||||
func (o *ModelArtsLink) SubmitTask(imageId string, cmd string, params []string, resourceId string) (interface{}, error) {
|
||||
func (o *ModelArtsLink) SubmitTask(imageId string, cmd string, envs []string, params []string, resourceId string) (interface{}, error) {
|
||||
// modelArts提交任务
|
||||
environments := make(map[string]string)
|
||||
|
||||
|
|
|
@ -109,8 +109,10 @@ func (o *OctopusLink) QueryImageList() (interface{}, error) {
|
|||
return imgListResp, nil
|
||||
}
|
||||
|
||||
func (o *OctopusLink) SubmitTask(imageId string, cmd string, params []string, resourceId string) (interface{}, error) {
|
||||
func (o *OctopusLink) SubmitTask(imageId string, cmd string, envs []string, params []string, resourceId string) (interface{}, error) {
|
||||
// octopus提交任务
|
||||
|
||||
// python参数
|
||||
var prms []*octopus.Parameters
|
||||
for _, param := range params {
|
||||
var p octopus.Parameters
|
||||
|
@ -120,7 +122,13 @@ func (o *OctopusLink) SubmitTask(imageId string, cmd string, params []string, re
|
|||
prms = append(prms, &p)
|
||||
}
|
||||
|
||||
var str string
|
||||
//环境变量
|
||||
var envMap map[string]string
|
||||
for _, env := range envs {
|
||||
s := strings.Split(env, COMMA)
|
||||
envMap[s[0]] = s[1]
|
||||
}
|
||||
|
||||
req := &octopus.CreateTrainJobReq{
|
||||
Platform: o.participant.Name,
|
||||
Params: &octopus.CreateTrainJobParam{
|
||||
|
@ -135,7 +143,7 @@ func (o *OctopusLink) SubmitTask(imageId string, cmd string, params []string, re
|
|||
MinSucceededTaskCount: 1,
|
||||
TaskNumber: 1,
|
||||
Parameters: prms,
|
||||
Envs: str,
|
||||
Envs: envMap,
|
||||
},
|
||||
},
|
||||
},
|
||||
|
|
|
@ -57,7 +57,7 @@ func (s *ShuguangAi) QueryImageList() (interface{}, error) {
|
|||
return imgListResp, nil
|
||||
}
|
||||
|
||||
func (s *ShuguangAi) SubmitTask(imageId string, cmd string, params []string, resourceId string) (interface{}, error) {
|
||||
func (s *ShuguangAi) SubmitTask(imageId string, cmd string, envs []string, params []string, resourceId string) (interface{}, error) {
|
||||
// shuguangAi提交任务
|
||||
|
||||
//判断是否resourceId匹配自定义资源Id
|
||||
|
@ -79,13 +79,20 @@ func (s *ShuguangAi) SubmitTask(imageId string, cmd string, params []string, res
|
|||
pythonArg += PY_PARAM_PREFIX + s[0] + "=" + s[1] + SPACE
|
||||
}
|
||||
|
||||
//环境变量
|
||||
var env string
|
||||
for _, e := range envs {
|
||||
s := strings.Split(e, COMMA)
|
||||
env += s[0] + "=" + s[1] + SPACE
|
||||
}
|
||||
|
||||
req := &hpcAC.SubmitPytorchTaskReq{
|
||||
Params: &hpcAC.SubmitPytorchTaskParams{
|
||||
TaskName: TASK_PYTORCH_PREFIX + UNDERSCORE + utils.RandomString(10),
|
||||
WorkPath: WorkPath,
|
||||
IsDistributed: false,
|
||||
IsHvd: false,
|
||||
//Env:
|
||||
TaskName: TASK_PYTORCH_PREFIX + UNDERSCORE + utils.RandomString(10),
|
||||
WorkPath: WorkPath,
|
||||
IsDistributed: false,
|
||||
IsHvd: false,
|
||||
Env: env,
|
||||
AcceleratorType: DCU,
|
||||
Version: imageResp.Image.Version,
|
||||
ImagePath: imageResp.Image.Path,
|
||||
|
|
|
@ -11,14 +11,13 @@ import (
|
|||
"gitlink.org.cn/jcce-pcm/pcm-participant-modelarts/modelarts"
|
||||
"gitlink.org.cn/jcce-pcm/pcm-participant-octopus/octopus"
|
||||
"gorm.io/gorm"
|
||||
"strconv"
|
||||
)
|
||||
|
||||
type Linkage interface {
|
||||
UploadImage(path string) (interface{}, error)
|
||||
DeleteImage(imageId string) (interface{}, error)
|
||||
QueryImageList() (interface{}, error)
|
||||
SubmitTask(imageId string, cmd string, params []string, resourceId string) (interface{}, error)
|
||||
SubmitTask(imageId string, cmd string, envs []string, params []string, resourceId string) (interface{}, error)
|
||||
QueryTask(taskId string) (interface{}, error)
|
||||
QuerySpecs() (interface{}, error)
|
||||
DeleteTask(taskId string) (interface{}, error)
|
||||
|
@ -308,7 +307,7 @@ func ConvertType[T any](in *T, participant *models.StorelinkCenter) (interface{}
|
|||
var respec types.ResourceSpecSl
|
||||
respec.SpecId = spec.Id
|
||||
respec.SpecName = spec.Name
|
||||
respec.ParticipantId = strconv.FormatInt(participant.Id, 10)
|
||||
respec.ParticipantId = participant.Id
|
||||
respec.ParticipantName = participant.Name
|
||||
respec.SpecPrice = spec.Price
|
||||
resp.ResourceSpecs = append(resp.ResourceSpecs, &respec)
|
||||
|
@ -326,7 +325,7 @@ func ConvertType[T any](in *T, participant *models.StorelinkCenter) (interface{}
|
|||
var spec types.ResourceSpecSl
|
||||
resp.Success = true
|
||||
spec.ParticipantName = participant.Name
|
||||
spec.ParticipantId = strconv.FormatInt(participant.Id, 10)
|
||||
spec.ParticipantId = participant.Id
|
||||
spec.SpecName = SHUGUANGAI_CUSTOM_RESOURCE_NAME
|
||||
spec.SpecId = SHUGUANGAI_CUSTOM_RESOURCE_ID
|
||||
resp.ResourceSpecs = append(resp.ResourceSpecs, &spec)
|
||||
|
@ -346,7 +345,7 @@ func ConvertType[T any](in *T, participant *models.StorelinkCenter) (interface{}
|
|||
var respec types.ResourceSpecSl
|
||||
respec.SpecId = spec.FlavorId
|
||||
respec.SpecName = spec.FlavorName
|
||||
respec.ParticipantId = strconv.FormatInt(participant.Id, 10)
|
||||
respec.ParticipantId = participant.Id
|
||||
respec.ParticipantName = participant.Name
|
||||
respec.SpecPrice = 0
|
||||
resp.ResourceSpecs = append(resp.ResourceSpecs, &respec)
|
||||
|
|
Loading…
Reference in New Issue