createdataset&createmodel&tasksync #522

Merged
zhouqunjie merged 1 commits from zhouqj_dev into master 2025-07-15 16:31:07 +08:00
12 changed files with 494 additions and 44 deletions

View File

@ -1,18 +1,26 @@
package ai
import (
"github.com/zeromicro/go-zero/rest/httpx"
"encoding/json"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/logic/ai"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/svc"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/types"
dataset "gitlink.org.cn/JointCloud/pcm-coordinator/internal/types/ai"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/repository/result"
"io"
"net/http"
)
func CreateDataSetHandler(svcCtx *svc.ServiceContext) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
var req types.CreateDataSetReq
if err := httpx.Parse(r, &req); err != nil {
var req dataset.CreateDatasetReq
body, err := io.ReadAll(r.Body)
if err != nil {
result.ParamErrorResult(r, w, err)
return
}
if err = json.Unmarshal(body, &req); err != nil {
result.ParamErrorResult(r, w, err)
return
}

View File

@ -0,0 +1,32 @@
package ai
import (
"encoding/json"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/logic/ai"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/svc"
Model "gitlink.org.cn/JointCloud/pcm-coordinator/internal/types/ai"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/repository/result"
"io"
"net/http"
)
func CreateModelHandler(svcCtx *svc.ServiceContext) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
var req Model.CreateModelReq
body, err := io.ReadAll(r.Body)
if err != nil {
result.ParamErrorResult(r, w, err)
return
}
if err = json.Unmarshal(body, &req); err != nil {
result.ParamErrorResult(r, w, err)
return
}
l := ai.NewCreateModelLogic(r.Context(), svcCtx)
resp, err := l.CreateModel(&req)
result.HttpResult(r, w, resp, err)
}
}

View File

@ -0,0 +1,32 @@
package ai
import (
"encoding/json"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/logic/ai"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/svc"
Model "gitlink.org.cn/JointCloud/pcm-coordinator/internal/types/ai"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/repository/result"
"io"
"net/http"
)
func TaskResultSyncHandler(svcCtx *svc.ServiceContext) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
var req Model.ResultSyncReq
body, err := io.ReadAll(r.Body)
if err != nil {
result.ParamErrorResult(r, w, err)
return
}
if err = json.Unmarshal(body, &req); err != nil {
result.ParamErrorResult(r, w, err)
return
}
l := ai.NewTaskResultSyncLogic(r.Context(), svcCtx)
resp, err := l.TaskResultSync(&req)
result.HttpResult(r, w, resp, err)
}
}

View File

@ -218,9 +218,22 @@ func RegisterHandlers(server *rest.Server, serverCtx *svc.ServiceContext) {
{
// 创建数据集
Method: http.MethodPost,
Path: "/ai/createDataSet/:projectId",
Path: "/ai/createDataSet",
Handler: ai.CreateDataSetHandler(serverCtx),
},
{
// 创建模型
Method: http.MethodPost,
Path: "/ai/createModel",
Handler: ai.CreateModelHandler(serverCtx),
},
{
// 创建模型
Method: http.MethodPost,
Path: "/ai/task/sync",
Handler: ai.TaskResultSyncHandler(serverCtx),
},
{
// 创建notebook
Method: http.MethodPost,

View File

@ -16,15 +16,12 @@ package ai
import (
"context"
"github.com/jinzhu/copier"
"github.com/go-resty/resty/v2"
"github.com/pkg/errors"
"github.com/zeromicro/go-zero/core/logx"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/svc"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/types"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/helper/xerr"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/utils"
"gitlink.org.cn/JointCloud/pcm-modelarts/modelarts"
"github.com/zeromicro/go-zero/core/logx"
dataset "gitlink.org.cn/JointCloud/pcm-coordinator/internal/types/ai"
)
type CreateDataSetLogic struct {
@ -41,18 +38,22 @@ func NewCreateDataSetLogic(ctx context.Context, svcCtx *svc.ServiceContext) *Cre
}
}
func (l *CreateDataSetLogic) CreateDataSet(req *types.CreateDataSetReq) (resp *types.CreateDataSetResp, err error) {
// todo: add your logic here and delete this line
func (l *CreateDataSetLogic) CreateDataSet(req *dataset.CreateDatasetReq) (resp *dataset.CreateDatasetResp, err error) {
modelartsReq := &modelarts.CreateDataSetReq{}
err = copier.CopyWithOption(modelartsReq, req, copier.Option{Converters: utils.Converters})
CreateDataSetResp, err := l.svcCtx.ModelArtsRpc.CreateDataSet(l.ctx, modelartsReq)
if err != nil {
return nil, errors.Wrapf(xerr.NewErrMsg("Failed to get db DataSet list"), "Failed to get db DataSet list err : %v ,req:%+v", err, req)
cluster := &types.GetClusterByIdResp{}
tx := l.svcCtx.DbEngin.Raw("select * from t_cluster where id = ?", req.ClusterId).Scan(&cluster.ClusterInfo)
if tx.Error != nil {
logx.Errorf(tx.Error.Error())
return nil, errors.New("cluster create failed")
}
resp = &types.CreateDataSetResp{}
err = copier.CopyWithOption(&resp, &CreateDataSetResp, copier.Option{Converters: utils.Converters})
return resp, nil
httpClient := resty.New().R()
createDatasetResp := &dataset.CreateDatasetResp{}
_, err = httpClient.SetHeader("Content-Type", "application/json").
SetQueryParams(map[string]string{"pfId": cluster.ClusterInfo.Id}).
SetBody(req).
SetResult(&createDatasetResp).
Post(cluster.ClusterInfo.Server + "/ai/dataset/create")
return createDatasetResp, err
}

View File

@ -0,0 +1,59 @@
/*
Copyright (c) [2023] [pcm]
[pcm-coordinator] is licensed under Mulan PSL v2.
You can use this software according to the terms and conditions of the Mulan PSL v2.
You may obtain a copy of Mulan PSL v2 at:
http://license.coscl.org.cn/MulanPSL2
THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
EITHER EXPaRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
See the Mulan PSL v2 for more details.
*/
package ai
import (
"context"
"github.com/go-resty/resty/v2"
"github.com/pkg/errors"
"github.com/zeromicro/go-zero/core/logx"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/svc"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/types"
model "gitlink.org.cn/JointCloud/pcm-coordinator/internal/types/ai"
)
type CreateModelLogic struct {
logx.Logger
ctx context.Context
svcCtx *svc.ServiceContext
}
func NewCreateModelLogic(ctx context.Context, svcCtx *svc.ServiceContext) *CreateModelLogic {
return &CreateModelLogic{
Logger: logx.WithContext(ctx),
ctx: ctx,
svcCtx: svcCtx,
}
}
func (l *CreateModelLogic) CreateModel(req *model.CreateModelReq) (resp *model.CreateModelResp, err error) {
cluster := &types.GetClusterByIdResp{}
tx := l.svcCtx.DbEngin.Raw("select * from t_cluster where id = ?", req.ClusterId).Scan(&cluster.ClusterInfo)
if tx.Error != nil {
logx.Errorf(tx.Error.Error())
return nil, errors.New("cluster create failed")
}
httpClient := resty.New().R()
createModelResp := &model.CreateModelResp{}
_, err = httpClient.SetHeader("Content-Type", "application/json").
SetQueryParams(map[string]string{"pfId": cluster.ClusterInfo.Id}).
SetBody(req).
SetResult(&createModelResp).
Post(cluster.ClusterInfo.Server + "/ai/model/create")
return createModelResp, err
}

View File

@ -0,0 +1,59 @@
/*
Copyright (c) [2023] [pcm]
[pcm-coordinator] is licensed under Mulan PSL v2.
You can use this software according to the terms and conditions of the Mulan PSL v2.
You may obtain a copy of Mulan PSL v2 at:
http://license.coscl.org.cn/MulanPSL2
THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
EITHER EXPaRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
See the Mulan PSL v2 for more details.
*/
package ai
import (
"context"
"github.com/go-resty/resty/v2"
"github.com/pkg/errors"
"github.com/zeromicro/go-zero/core/logx"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/svc"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/types"
sync "gitlink.org.cn/JointCloud/pcm-coordinator/internal/types/ai"
)
type TaskResultSyncLogic struct {
logx.Logger
ctx context.Context
svcCtx *svc.ServiceContext
}
func NewTaskResultSyncLogic(ctx context.Context, svcCtx *svc.ServiceContext) *TaskResultSyncLogic {
return &TaskResultSyncLogic{
Logger: logx.WithContext(ctx),
ctx: ctx,
svcCtx: svcCtx,
}
}
func (l *TaskResultSyncLogic) TaskResultSync(req *sync.ResultSyncReq) (resp *sync.ResultSyncResp, err error) {
cluster := &types.GetClusterByIdResp{}
tx := l.svcCtx.DbEngin.Raw("select * from t_cluster where id = ?", req.ClusterId).Scan(&cluster.ClusterInfo)
if tx.Error != nil {
logx.Errorf(tx.Error.Error())
return nil, errors.New("cluster create failed")
}
httpClient := resty.New().R()
createModelResp := &sync.ResultSyncResp{}
_, err = httpClient.SetHeader("Content-Type", "application/json").
SetQueryParams(map[string]string{"pfId": cluster.ClusterInfo.Id}).
SetBody(req).
SetResult(&createModelResp).
Post(cluster.ClusterInfo.Server + "/ai/task/sync")
return createModelResp, err
}

View File

@ -1,30 +1,20 @@
package algorithm
package ai
import (
"encoding/json"
"fmt"
)
type CreateParameter interface {
type CreateAlgorithmParameter interface {
AlgorithmCreateParam()
}
type Source struct {
Jcs JcsBase `json:"jcs,omitempty"`
}
type JcsBase struct {
UserID int `json:"userID" binding:"required"`
PackageId int `json:"packageId" binding:"required"`
BucketID int `json:"bucketID" binding:"required"`
}
type CreateAlgorithmReq struct {
Name string `json:"name" binding:"required"`
ClusterId string `json:"clusterId"`
Desc string `json:"desc"`
Src Source `json:"src,omitempty"`
Param CreateParameter `json:"param,omitempty"`
Name string `json:"name" binding:"required"`
ClusterId string `json:"clusterId"`
Desc string `json:"desc"`
Src Source `json:"src,omitempty"`
Param CreateAlgorithmParameter `json:"param,omitempty"`
}
type CreateAlgorithmResp struct {
@ -33,18 +23,18 @@ type CreateAlgorithmResp struct {
ErrorMsg string `json:"errorMsg,omitempty" copier:"ErrorMsg"`
}
type OpenI struct {
type OpenIAlgorithm struct {
BootFile string `json:"bootFile,omitempty"`
DefaultBranch string `json:"defaultBranch,omitempty"`
}
func (o *OpenI) AlgorithmCreateParam() {
func (o *OpenIAlgorithm) AlgorithmCreateParam() {
}
type Octopus struct {
type OctopusAlgorithm struct {
}
func (o *Octopus) AlgorithmCreateParam() {
func (o *OctopusAlgorithm) AlgorithmCreateParam() {
}
func (cp *CreateAlgorithmReq) UnmarshalJSON(data []byte) error {
@ -70,7 +60,7 @@ func (cp *CreateAlgorithmReq) UnmarshalJSON(data []byte) error {
// 解析 param 字段的原始数据为具体类型
if temp.Param != nil {
// 尝试解析为 OpenI 类型
var openi OpenI
var openi OpenIAlgorithm
if err := json.Unmarshal(temp.Param, &openi); err != nil {
// 打印详细错误(如字段不匹配、类型错误等)
fmt.Printf("解析 OpenI 失败: %v\n", err) // 关键调试日志
@ -80,7 +70,7 @@ func (cp *CreateAlgorithmReq) UnmarshalJSON(data []byte) error {
}
// 新增:尝试解析为 Octopus 类型
var octopus Octopus
var octopus OctopusAlgorithm
if err := json.Unmarshal(temp.Param, &octopus); err == nil {
cp.Param = &octopus
return nil

View File

@ -0,0 +1,11 @@
package ai
type Source struct {
Jcs JcsBase `json:"jcs,omitempty"`
}
type JcsBase struct {
UserID int `json:"userID" binding:"required"`
PackageId int `json:"packageId" binding:"required"`
BucketID int `json:"bucketID" binding:"required"`
}

View File

@ -0,0 +1,82 @@
package ai
import (
"encoding/json"
"fmt"
)
type CreateDatasetParameter interface {
DatasetCreateParam()
}
type CreateDatasetReq struct {
Name string `json:"name" binding:"required"`
ClusterId string `json:"clusterId"`
Desc string `json:"desc"`
Src Source `json:"src,omitempty"`
Param CreateDatasetParameter `json:"param,omitempty"`
}
type CreateDatasetResp struct {
Code int32 `json:"code,omitempty" copier:"Code"`
Msg string `json:"msg,omitempty" copier:"Msg"`
ErrorMsg string `json:"errorMsg,omitempty" copier:"ErrorMsg"`
}
type OpenIDataset struct {
Repo string `json:"repo,omitempty"`
}
func (o *OpenIDataset) DatasetCreateParam() {
}
type OctopusDataset struct {
}
func (o *OctopusDataset) DatasetCreateParam() {
}
func (cp *CreateDatasetReq) UnmarshalJSON(data []byte) error {
// 临时结构体:用于捕获原始 JSON 中的 param 字段数据
type TempCreateParam struct {
Name string `json:"name"`
ClusterId string `json:"clusterId"`
Desc string `json:"desc"`
Src Source `json:"src,omitempty"`
Param json.RawMessage `json:"param,omitempty"` // 捕获原始 JSON 数据
}
var temp TempCreateParam
if err := json.Unmarshal(data, &temp); err != nil {
return err
}
// 将临时结构体的字段赋值给原结构体(除 Param 外)
cp.Name = temp.Name
cp.ClusterId = temp.ClusterId
cp.Desc = temp.Desc
cp.Src = temp.Src
// 解析 param 字段的原始数据为具体类型
if temp.Param != nil {
// 尝试解析为 OpenI 类型
var openi OpenIDataset
if err := json.Unmarshal(temp.Param, &openi); err != nil {
// 打印详细错误(如字段不匹配、类型错误等)
fmt.Printf("解析 OpenI 失败: %v\n", err) // 关键调试日志
} else {
cp.Param = &openi
return nil
}
// 新增:尝试解析为 Octopus 类型
var octopus OctopusDataset
if err := json.Unmarshal(temp.Param, &octopus); err == nil {
cp.Param = &octopus
return nil
}
return fmt.Errorf("unsupported param type in CreateParam")
}
return nil
}

View File

@ -0,0 +1,82 @@
package ai
import (
"encoding/json"
"fmt"
)
type CreateModelParameter interface {
ModelCreateParam()
}
type CreateModelReq struct {
Name string `json:"name" binding:"required"`
ClusterId string `json:"clusterId"`
Desc string `json:"desc"`
Src Source `json:"src,omitempty"`
Param CreateModelParameter `json:"param,omitempty"`
}
type CreateModelResp struct {
Code int32 `json:"code,omitempty" copier:"Code"`
Msg string `json:"msg,omitempty" copier:"Msg"`
ErrorMsg string `json:"errorMsg,omitempty" copier:"ErrorMsg"`
}
type OpenIModel struct {
RepoName string `json:"repoName,omitempty"`
}
func (o *OpenIModel) ModelCreateParam() {
}
type OctopusModel struct {
}
func (o *OctopusModel) ModelCreateParam() {
}
func (cp *CreateModelReq) UnmarshalJSON(data []byte) error {
// 临时结构体:用于捕获原始 JSON 中的 param 字段数据
type TempCreateParam struct {
Name string `json:"name"`
ClusterId string `json:"clusterId"`
Desc string `json:"desc"`
Src Source `json:"src,omitempty"`
Param json.RawMessage `json:"param,omitempty"` // 捕获原始 JSON 数据
}
var temp TempCreateParam
if err := json.Unmarshal(data, &temp); err != nil {
return err
}
// 将临时结构体的字段赋值给原结构体(除 Param 外)
cp.Name = temp.Name
cp.ClusterId = temp.ClusterId
cp.Desc = temp.Desc
cp.Src = temp.Src
// 解析 param 字段的原始数据为具体类型
if temp.Param != nil {
// 尝试解析为 OpenI 类型
var openi OpenIModel
if err := json.Unmarshal(temp.Param, &openi); err != nil {
// 打印详细错误(如字段不匹配、类型错误等)
fmt.Printf("解析 OpenI 失败: %v\n", err) // 关键调试日志
} else {
cp.Param = &openi
return nil
}
// 新增:尝试解析为 Octopus 类型
var octopus OctopusModel
if err := json.Unmarshal(temp.Param, &octopus); err == nil {
cp.Param = &octopus
return nil
}
return fmt.Errorf("unsupported param type in CreateParam")
}
return nil
}

81
internal/types/ai/task.go Normal file
View File

@ -0,0 +1,81 @@
package ai
import (
"encoding/json"
"fmt"
)
type ResultSyncParameter interface {
ResultSyncParam()
}
type ResultSyncReq struct {
ClusterId string `json:"clusterId"`
Src Source `json:"src,omitempty"`
Param ResultSyncParameter `json:"param,omitempty"`
}
type ResultSyncResp struct {
Code int32 `json:"code,omitempty" copier:"Code"`
Msg string `json:"msg,omitempty" copier:"Msg"`
ErrorMsg string `json:"errorMsg,omitempty" copier:"ErrorMsg"`
}
type OpenISync struct {
Id string `json:"id,omitempty"`
RepoName string `json:"repoName,omitempty"`
ParentDir string `json:"parentDir,omitempty"`
BootFile string `json:"bootFile,omitempty"`
DefaultBranch string `json:"defaultBranch,omitempty"`
}
func (o *OpenISync) ResultSyncParam() {
}
type OctopusSync struct {
}
func (o *OctopusSync) ResultSyncParam() {
}
func (cp *ResultSyncReq) UnmarshalJSON(data []byte) error {
// 临时结构体:用于捕获原始 JSON 中的 param 字段数据
type TempCreateParam struct {
ClusterId string `json:"clusterId"`
Src Source `json:"src,omitempty"`
Param json.RawMessage `json:"param,omitempty"` // 捕获原始 JSON 数据
}
var temp TempCreateParam
if err := json.Unmarshal(data, &temp); err != nil {
return err
}
// 将临时结构体的字段赋值给原结构体(除 Param 外)
cp.ClusterId = temp.ClusterId
cp.Src = temp.Src
// 解析 param 字段的原始数据为具体类型
if temp.Param != nil {
// 尝试解析为 OpenI 类型
var openi OpenISync
if err := json.Unmarshal(temp.Param, &openi); err != nil {
// 打印详细错误(如字段不匹配、类型错误等)
fmt.Printf("解析 OpenI 失败: %v\n", err) // 关键调试日志
} else {
cp.Param = &openi
return nil
}
// 新增:尝试解析为 Octopus 类型
var octopus OctopusSync
if err := json.Unmarshal(temp.Param, &octopus); err == nil {
cp.Param = &octopus
return nil
}
return fmt.Errorf("unsupported param type in CreateParam")
}
return nil
}