This commit is contained in:
zhangwei 2025-07-19 15:41:09 +08:00
commit d73bdee020
25 changed files with 647 additions and 397 deletions

View File

@ -8,7 +8,7 @@ type IAlgorithm interface {
All(ctx context.Context) (Algorithms, error)
Train(ctx context.Context) (Algorithms, error)
Infer(ctx context.Context) (Algorithms, error)
Create(ctx context.Context, param *CreateParam) (interface{}, error)
Create(ctx context.Context, param *CreateParam) (*CreateResp, error)
Features() *Features
}

View File

@ -13,6 +13,11 @@ type CreateParam struct {
Param CreateParameter `json:"param,omitempty"`
}
type CreateResp struct {
Id string `json:"id"`
Name string `json:"name"`
}
type OpenI struct {
BootFile string `json:"bootFile,omitempty"`
DefaultBranch string `json:"defaultBranch,omitempty"`

View File

@ -6,7 +6,7 @@ type IDataset interface {
All(ctx context.Context) (Datasets, error)
Train(ctx context.Context) (Datasets, error)
Infer(ctx context.Context) (Datasets, error)
Create(ctx context.Context, param *CreateParam) (interface{}, error)
Create(ctx context.Context, param *CreateParam) (*CreateResp, error)
Features() *Features
}

View File

@ -1,6 +1,8 @@
package dataset
import (
"encoding/json"
"fmt"
"gitlink.org.cn/JointCloud/pcm-participant-ai/common"
)
@ -14,9 +16,47 @@ type CreateParam struct {
Param CreateParameter `json:"param,omitempty"`
}
type CreateResp struct {
Id string `json:"id"`
Name string `json:"name"`
}
type OpenI struct {
Repo string `json:"repo,omitempty"`
}
func (o OpenI) DatasetCreateParam() {
}
func (cp *CreateParam) UnmarshalJSON(data []byte) error {
// 临时结构体:用于捕获原始 JSON 中的 param 字段数据
type TempCreateParam struct {
Name string `json:"name"`
Desc string `json:"desc"`
Src *common.Source `json:"src,omitempty"`
Param json.RawMessage `json:"param,omitempty"` // 捕获原始 JSON 数据
}
var temp TempCreateParam
if err := json.Unmarshal(data, &temp); err != nil {
return err
}
// 将临时结构体的字段赋值给原结构体(除 Param 外)
cp.Name = temp.Name
cp.Desc = temp.Desc
cp.Src = temp.Src
// 解析 param 字段的原始数据为具体类型
if temp.Param != nil {
// 尝试解析为 OpenI 类型
var openi OpenI
if err := json.Unmarshal(temp.Param, &openi); err == nil {
cp.Param = &openi
return nil
}
return fmt.Errorf("unsupported param type in CreateParam")
}
return nil
}

View File

@ -6,7 +6,7 @@ type IModel interface {
All(ctx context.Context) (Models, error)
Train(ctx context.Context) (Models, error)
Infer(ctx context.Context) (Models, error)
Create(ctx context.Context, param *CreateParam) (interface{}, error)
Create(ctx context.Context, param *CreateParam) (*CreateResp, error)
Features() *Features
}

View File

@ -1,6 +1,10 @@
package model
import "gitlink.org.cn/JointCloud/pcm-participant-ai/common"
import (
"encoding/json"
"fmt"
"gitlink.org.cn/JointCloud/pcm-participant-ai/common"
)
type CreateParam struct {
Name string `json:"name" binding:"required"`
@ -9,6 +13,11 @@ type CreateParam struct {
Param CreateParameter `json:"param,omitempty"`
}
type CreateResp struct {
Id string `json:"id"`
Name string `json:"name"`
}
type Search struct {
}
@ -18,3 +27,36 @@ type OpenI struct {
func (o OpenI) ModelCreateParam() {
}
func (cp *CreateParam) UnmarshalJSON(data []byte) error {
// 临时结构体:用于捕获原始 JSON 中的 param 字段数据
type TempCreateParam struct {
Name string `json:"name"`
Desc string `json:"desc"`
Src *common.Source `json:"src,omitempty"`
Param json.RawMessage `json:"param,omitempty"` // 捕获原始 JSON 数据
}
var temp TempCreateParam
if err := json.Unmarshal(data, &temp); err != nil {
return err
}
// 将临时结构体的字段赋值给原结构体(除 Param 外)
cp.Name = temp.Name
cp.Desc = temp.Desc
cp.Src = temp.Src
// 解析 param 字段的原始数据为具体类型
if temp.Param != nil {
// 尝试解析为 OpenI 类型
var openi OpenI
if err := json.Unmarshal(temp.Param, &openi); err == nil {
cp.Param = &openi
return nil
}
return fmt.Errorf("unsupported param type in CreateParam")
}
return nil
}

View File

@ -12,4 +12,4 @@ type Platform struct {
}
type Type string
type Id int64
type Id string

View File

@ -74,7 +74,7 @@ func (a *Algorithm) Infer(ctx context.Context, filter *algorithm.Filter) (algori
return alg, nil
}
func (a *Algorithm) Create(ctx context.Context, param *algorithm.CreateParam) (interface{}, error) {
func (a *Algorithm) Create(ctx context.Context, param *algorithm.CreateParam) (*algorithm.CreateResp, error) {
alg, err := a.algo.Create(ctx, param)
if err != nil {
return nil, err

View File

@ -18,7 +18,7 @@ import (
func TestGet(t *testing.T) {
convey.Convey(aiconf.Octopus, t, func() {
o, err := octopus.New(aiconf.Cfg[aiconf.Octopus].URL, aiconf.Cfg[aiconf.Octopus].Username, aiconf.Cfg[aiconf.Octopus].Password, platform.Id(123))
o, err := octopus.New(aiconf.Cfg[aiconf.Octopus].URL, aiconf.Cfg[aiconf.Octopus].Username, aiconf.Cfg[aiconf.Octopus].Password, aiconf.Octopus, platform.Id(123))
if err != nil {
fmt.Println(err)
return
@ -41,7 +41,7 @@ func TestGet(t *testing.T) {
})
convey.Convey(aiconf.OpenI, t, func() {
o, _ := openI.New(aiconf.Cfg[aiconf.OpenI].Username, aiconf.Cfg[aiconf.OpenI].Password, aiconf.Cfg[aiconf.OpenI].APIKey, platform.Id(123), aiconf.Cfg[aiconf.OpenI].DataRepo)
o, _ := openI.New(aiconf.Cfg[aiconf.OpenI].Username, aiconf.Cfg[aiconf.OpenI].Password, aiconf.Cfg[aiconf.OpenI].APIKey, aiconf.OpenI, platform.Id(123), aiconf.Cfg[aiconf.OpenI].DataRepo)
common.InitClient()
alg := NewAlgorithm(o.Alg)
@ -92,7 +92,7 @@ func TestCreate(t *testing.T) {
//o, _ := openI.New("nudt-ysz", "", "8cff1d2db9171462c02901d086d13221389fd082", platform.Id(123), "data")
//common.InitClient()
oct, _ := octopus.New(aiconf.Cfg[aiconf.Octopus].URL, aiconf.Cfg[aiconf.Octopus].Username, aiconf.Cfg[aiconf.Octopus].Password, platform.Id(123))
oct, _ := octopus.New(aiconf.Cfg[aiconf.Octopus].URL, aiconf.Cfg[aiconf.Octopus].Username, aiconf.Cfg[aiconf.Octopus].Password, aiconf.Octopus, platform.Id(123))
alg := NewAlgorithm(oct.Alg)
@ -111,23 +111,23 @@ func TestCreate(t *testing.T) {
},
}
//convey.Convey("OpenI", func() {
// op := &algorithm.OpenI{
// BootFile: "123.txt",
// DefaultBranch: "master",
// }
// param := &algorithm.CreateParam{
// Name: name,
// Desc: desc,
// Src: src,
// Param: op,
// }
// _, err := alg.Create(ctx, param)
// if err != nil {
// fmt.Println(err.Error())
// }
// convey.So(err, convey.ShouldBeNil)
//})
convey.Convey("OpenI", func() {
op := &algorithm.OpenI{
BootFile: "123.txt",
DefaultBranch: "master",
}
param := &algorithm.CreateParam{
Name: name,
Desc: desc,
Src: src,
Param: op,
}
_, err := alg.Create(ctx, param)
if err != nil {
fmt.Println(err.Error())
}
convey.So(err, convey.ShouldBeNil)
})
convey.Convey("octopus", func() {
param := &algorithm.CreateParam{

View File

@ -18,7 +18,7 @@ import (
func TestDataset(t *testing.T) {
convey.Convey(aiconf.OpenI, t, func() {
o, _ := openI.New(aiconf.Cfg[aiconf.OpenI].Username, aiconf.Cfg[aiconf.OpenI].Password, aiconf.Cfg[aiconf.OpenI].APIKey, platform.Id(123), aiconf.Cfg[aiconf.OpenI].DataRepo)
o, _ := openI.New(aiconf.Cfg[aiconf.OpenI].Username, aiconf.Cfg[aiconf.OpenI].Password, aiconf.Cfg[aiconf.OpenI].APIKey, aiconf.OpenI, platform.Id(123), aiconf.Cfg[aiconf.OpenI].DataRepo)
common.InitClient()
ds := NewDataset(o.Ds)
@ -62,7 +62,7 @@ func TestDataset(t *testing.T) {
})
convey.Convey(aiconf.Octopus, t, func() {
oct, _ := octopus.New(aiconf.Cfg[aiconf.Octopus].URL, aiconf.Cfg[aiconf.Octopus].Username, aiconf.Cfg[aiconf.Octopus].Password, platform.Id(123))
oct, _ := octopus.New(aiconf.Cfg[aiconf.Octopus].URL, aiconf.Cfg[aiconf.Octopus].Username, aiconf.Cfg[aiconf.Octopus].Password, aiconf.Octopus, platform.Id(123))
ds := NewDataset(oct.Ds)
@ -87,10 +87,10 @@ func TestDataset(t *testing.T) {
func TestCreateDataset(t *testing.T) {
convey.Convey("Create Dataset", t, func() {
//o, _ := openI.New(aiconf.Cfg[aiconf.OpenI].Username, aiconf.Cfg[aiconf.OpenI].Password, aiconf.Cfg[aiconf.OpenI].APIKey, platform.Id(123), aiconf.Cfg[aiconf.OpenI].DataRepo)
//o, _ := openI.New(aiconf.Cfg[aiconf.OpenI].Username, aiconf.Cfg[aiconf.OpenI].Password, aiconf.Cfg[aiconf.OpenI].APIKey, aiconf.OpenI, platform.Id(123), aiconf.Cfg[aiconf.OpenI].DataRepo)
//common.InitClient()
oct, _ := octopus.New(aiconf.Cfg[aiconf.Octopus].URL, aiconf.Cfg[aiconf.Octopus].Username, aiconf.Cfg[aiconf.Octopus].Password, platform.Id(123))
oct, _ := octopus.New(aiconf.Cfg[aiconf.Octopus].URL, aiconf.Cfg[aiconf.Octopus].Username, aiconf.Cfg[aiconf.Octopus].Password, aiconf.Octopus, platform.Id(123))
ds := NewDataset(oct.Ds)

View File

@ -16,7 +16,7 @@ import (
func TestImage(t *testing.T) {
convey.Convey(aiconf.OpenI, t, func() {
o, _ := openI.New(aiconf.Cfg[aiconf.OpenI].Username, aiconf.Cfg[aiconf.OpenI].Password, aiconf.Cfg[aiconf.OpenI].APIKey, platform.Id(123), aiconf.Cfg[aiconf.OpenI].DataRepo)
o, _ := openI.New(aiconf.Cfg[aiconf.OpenI].Username, aiconf.Cfg[aiconf.OpenI].Password, aiconf.Cfg[aiconf.OpenI].APIKey, aiconf.OpenI, platform.Id(123), aiconf.Cfg[aiconf.OpenI].DataRepo)
common.InitClient()
i := NewImage(o.Img)
@ -83,7 +83,7 @@ func TestImage(t *testing.T) {
func TestOctopusImage(t *testing.T) {
convey.Convey(aiconf.Octopus, t, func() {
o, _ := octopus.New(aiconf.Cfg[aiconf.Octopus].URL, aiconf.Cfg[aiconf.Octopus].Username, aiconf.Cfg[aiconf.Octopus].Password, platform.Id(123))
o, _ := octopus.New(aiconf.Cfg[aiconf.Octopus].URL, aiconf.Cfg[aiconf.Octopus].Username, aiconf.Cfg[aiconf.Octopus].Password, aiconf.Octopus, platform.Id(123))
i := NewImage(o.Img)

View File

@ -20,7 +20,7 @@ import (
func TestCreateModel(t *testing.T) {
convey.Convey("Create Model", t, func() {
o, _ := openI.New(aiconf.Cfg[aiconf.OpenI].Username, aiconf.Cfg[aiconf.OpenI].Password, aiconf.Cfg[aiconf.OpenI].APIKey, platform.Id(123), aiconf.Cfg[aiconf.OpenI].DataRepo)
o, _ := openI.New(aiconf.Cfg[aiconf.OpenI].Username, aiconf.Cfg[aiconf.OpenI].Password, aiconf.Cfg[aiconf.OpenI].APIKey, aiconf.OpenI, platform.Id(123), aiconf.Cfg[aiconf.OpenI].DataRepo)
common.InitClient()
mdl := NewModel(o.Mdl)

View File

@ -16,7 +16,7 @@ import (
func TestResource(t *testing.T) {
convey.Convey(aiconf.OpenI, t, func() {
o, _ := openI.New(aiconf.Cfg[aiconf.OpenI].Username, aiconf.Cfg[aiconf.OpenI].Password, aiconf.Cfg[aiconf.OpenI].APIKey, platform.Id(123), aiconf.Cfg[aiconf.OpenI].DataRepo)
o, _ := openI.New(aiconf.Cfg[aiconf.OpenI].Username, aiconf.Cfg[aiconf.OpenI].Password, aiconf.Cfg[aiconf.OpenI].APIKey, aiconf.OpenI, platform.Id(123), aiconf.Cfg[aiconf.OpenI].DataRepo)
common.InitClient()
res := NewResource(o.Res)
@ -97,7 +97,7 @@ func TestResource(t *testing.T) {
func TestGetResourceSpecs(t *testing.T) {
convey.Convey(aiconf.OpenI, t, func() {
o, _ := openI.New(aiconf.Cfg[aiconf.OpenI].Username, aiconf.Cfg[aiconf.OpenI].Password, aiconf.Cfg[aiconf.OpenI].APIKey, platform.Id(123), aiconf.Cfg[aiconf.OpenI].DataRepo)
o, _ := openI.New(aiconf.Cfg[aiconf.OpenI].Username, aiconf.Cfg[aiconf.OpenI].Password, aiconf.Cfg[aiconf.OpenI].APIKey, aiconf.OpenI, platform.Id(123), aiconf.Cfg[aiconf.OpenI].DataRepo)
common.InitClient()
res := NewResource(o.Res)
@ -116,7 +116,7 @@ func TestGetResourceSpecs(t *testing.T) {
})
convey.Convey(aiconf.Octopus, t, func() {
oct, _ := octopus.New(aiconf.Cfg[aiconf.Octopus].URL, aiconf.Cfg[aiconf.Octopus].Username, aiconf.Cfg[aiconf.Octopus].Password, platform.Id(123))
oct, _ := octopus.New(aiconf.Cfg[aiconf.Octopus].URL, aiconf.Cfg[aiconf.Octopus].Username, aiconf.Cfg[aiconf.Octopus].Password, aiconf.Octopus, platform.Id(123))
res := NewResource(oct.Res)

View File

@ -2,8 +2,12 @@ package service
import (
"context"
"errors"
"fmt"
"gitlink.org.cn/JointCloud/pcm-participant-ai/image"
"sync"
"gitlink.org.cn/JointCloud/pcm-participant-ai/algorithm"
"gitlink.org.cn/JointCloud/pcm-participant-ai/common"
"gitlink.org.cn/JointCloud/pcm-participant-ai/dataset"
"gitlink.org.cn/JointCloud/pcm-participant-ai/model"
"gitlink.org.cn/JointCloud/pcm-participant-ai/platform"
@ -11,303 +15,322 @@ import (
"gitlink.org.cn/JointCloud/pcm-participant-ai/task"
"gitlink.org.cn/JointCloud/pcm-participant-octopus"
openI "gitlink.org.cn/JointCloud/pcm-participant-openi"
"sync"
)
type Service struct {
rmap map[platform.Id]*Resource
dmap map[platform.Id]*Dataset
tmap map[platform.Id]*Task
amap map[platform.Id]*Algorithm
imap map[platform.Id]*Image
mmap map[platform.Id]*Model
rlock sync.Mutex
dlock sync.Mutex
tlock sync.Mutex
resourceMap sync.Map
datasetMap sync.Map
taskMap sync.Map
algorithmMap sync.Map
imageMap sync.Map
modelMap sync.Map
}
func NewService(platforms ...platform.IPlatform) (*Service, error) {
rmap := make(map[platform.Id]*Resource)
amap := make(map[platform.Id]*Algorithm)
imap := make(map[platform.Id]*Image)
mmap := make(map[platform.Id]*Model)
s := &Service{}
for _, pf := range platforms {
switch pf.Type() {
case platform.OpenI:
openI, ok := pf.(*openI.OpenI)
if !ok {
}
if openI.Res == nil {
}
if openI.Img == nil {
}
resource := NewResource(openI.Res)
rmap[pf.Id()] = resource
alg := NewAlgorithm(openI.Alg)
amap[pf.Id()] = alg
img := NewImage(openI.Img)
imap[pf.Id()] = img
mdl := NewModel(openI.Mdl)
mmap[pf.Id()] = mdl
case platform.Octopus:
oct, ok := pf.(*octopus.Octopus)
if !ok {
}
alg := NewAlgorithm(oct.Alg)
amap[pf.Id()] = alg
if err := s.registerPlatform(pf); err != nil {
return nil, fmt.Errorf("failed to register platform %d: %w", pf.Id(), err)
}
}
return &Service{rmap: rmap, amap: amap, imap: imap, mmap: mmap}, nil
return s, nil
}
// resource
func (s *Service) GetResourceSpecs(ctx context.Context, pfId int64, rtype string) (interface{}, error) {
var pid = platform.Id(pfId)
res, found := s.rmap[pid]
if !found {
// registerPlatform 注册单个平台及其所有组件
func (s *Service) registerPlatform(pf platform.IPlatform) error {
// 内部注册函数
register := func(res resource.IResource, img image.IImage, task task.Task, ds dataset.IDataset, alg algorithm.IAlgorithm, mdl model.IModel, platformName string) error {
if res == nil || img == nil || task == nil || ds == nil || alg == nil || mdl == nil {
return fmt.Errorf("%s platform missing required components", platformName)
}
s.resourceMap.Store(pf.Id(), NewResource(res))
s.imageMap.Store(pf.Id(), NewImage(img))
s.taskMap.Store(pf.Id(), NewTask(task))
s.datasetMap.Store(pf.Id(), NewDataset(ds))
s.algorithmMap.Store(pf.Id(), NewAlgorithm(alg))
s.modelMap.Store(pf.Id(), NewModel(mdl))
return nil
}
specs, err := res.GetResourcespecs(ctx, rtype)
if err != nil {
return nil, err
switch pt := pf.(type) {
case *openI.OpenI:
return register(pt.Res, pt.Img, pt.Task, pt.Ds, pt.Alg, pt.Mdl, "OpenI")
case *octopus.Octopus:
return register(pt.Res, pt.Img, pt.Task, pt.Ds, pt.Alg, pt.Mdl, "Octopus")
default:
return fmt.Errorf("unsupported platform type: %T", pf)
}
return specs, nil
}
func (s *Service) TrainResources(ctx context.Context, pfId int64) ([]*resource.Spec, error) {
var pid = platform.Id(pfId)
res, found := s.rmap[pid]
if !found {
// Resource operations
func (s *Service) GetResourceSpecs(ctx context.Context, id string, rtype string) (interface{}, error) {
pid := platform.Id(id)
val, err := s.loadResource(pid)
if err != nil {
return nil, err
}
return val.GetResourcespecs(ctx, rtype)
}
}
resources, err := res.Train(ctx, nil)
func (s *Service) TrainResources(ctx context.Context, id string) ([]*resource.Spec, error) {
pid := platform.Id(id)
val, err := s.loadResource(pid)
if err != nil {
return nil, err
}
specs, err := resources.Specs()
resources, err := val.Train(ctx, nil)
if err != nil {
return nil, err
return nil, fmt.Errorf("resource training failed: %w", err)
}
return specs, nil
return resources.Specs()
}
func (s *Service) AllTrainResources(ctx context.Context) ([]*resource.Spec, error) {
all := make([]*resource.Spec, 0)
for id, _ := range s.rmap {
resources, err := s.TrainResources(ctx, int64(id))
if err != nil {
return nil, err
}
if len(resources) == 0 {
continue
}
all = common.ConcatMultipleSlices([][]*resource.Spec{all, resources})
}
return all, nil
var (
all []*resource.Spec
mu sync.Mutex
wg sync.WaitGroup
errList []error
)
s.resourceMap.Range(func(key, value interface{}) bool {
wg.Add(1)
go func(id platform.Id) {
defer wg.Done()
resources, err := s.TrainResources(ctx, string(id))
mu.Lock()
defer mu.Unlock()
if err != nil {
errList = append(errList, fmt.Errorf("platform %d: %w", id, err))
return
}
if len(resources) > 0 {
all = append(all, resources...)
}
}(key.(platform.Id))
return true
})
wg.Wait()
return all, errors.Join(errList...)
}
// algorithm
func (s *Service) TrainAlgorithms(ctx context.Context, pfId int64) ([]*algorithm.Algorithm, error) {
var pid = platform.Id(pfId)
a, found := s.amap[pid]
if !found {
// Algorithm operations
func (s *Service) TrainAlgorithms(ctx context.Context, id string) ([]*algorithm.Algorithm, error) {
pid := platform.Id(id)
val, err := s.loadAlgorithm(pid)
if err != nil {
return nil, err
}
}
alg, err := a.Train(ctx, nil)
alg, err := val.Train(ctx, nil)
if err != nil {
return nil, err
return nil, fmt.Errorf("algorithm training failed: %w", err)
}
algorithms, err := alg.Algorithms()
if err != nil {
return nil, err
}
return algorithms, nil
return alg.Algorithms()
}
func (s *Service) AllTrainAlgorithms(ctx context.Context) ([]*algorithm.Algorithm, error) {
all := make([]*algorithm.Algorithm, 0)
for id, _ := range s.amap {
algorithms, err := s.TrainAlgorithms(ctx, int64(id))
if err != nil {
return nil, err
}
if len(algorithms) == 0 {
continue
}
all = common.ConcatMultipleSlices([][]*algorithm.Algorithm{all, algorithms})
}
return all, nil
var (
all []*algorithm.Algorithm
mu sync.Mutex
wg sync.WaitGroup
errs []error
)
s.algorithmMap.Range(func(key, value interface{}) bool {
wg.Add(1)
go func(id platform.Id) {
defer wg.Done()
algorithms, err := s.TrainAlgorithms(ctx, string(id))
mu.Lock()
defer mu.Unlock()
if err != nil {
errs = append(errs, fmt.Errorf("platform %s: %w", id, err))
return
}
if len(algorithms) > 0 {
all = append(all, algorithms...)
}
}(key.(platform.Id))
return true
})
wg.Wait()
return all, errors.Join(errs...)
}
func (s *Service) CreateAlgorithm(ctx context.Context, pfId int64, param *algorithm.CreateParam) (interface{}, error) {
var pid = platform.Id(pfId)
alg, found := s.amap[pid]
if !found {
}
resp, err := alg.Create(ctx, param)
func (s *Service) CreateAlgorithm(ctx context.Context, id string, param *algorithm.CreateParam) (*algorithm.CreateResp, error) {
pid := platform.Id(id)
val, err := s.loadAlgorithm(pid)
if err != nil {
return nil, err
}
return resp, nil
return val.Create(ctx, param)
}
// task
func (s *Service) CreateTrainTask(ctx context.Context, param *CreateTrainTaskParam) (interface{}, error) {
trainParams, err := s.generateParamsForTrainTask(ctx, param.Id, param)
if err != nil {
return nil, err
if param == nil || param.Id == nil {
return nil, fmt.Errorf("invalid task parameters")
}
t, found := s.tmap[*param.Id]
if !found {
}
resp, err := t.createTrainingTask(ctx, trainParams)
trainParams, err := s.generateParamsForTrainTask(ctx, *param.Id, param)
if err != nil {
return nil, err
return nil, fmt.Errorf("failed to generate train params: %w", err)
}
return resp, nil
val, err := s.loadTask(*param.Id)
if err != nil {
return nil, fmt.Errorf("failed to load task: %w", err)
}
return val.createTrainingTask(ctx, trainParams)
}
func (s *Service) generateParamsForTrainTask(ctx context.Context, pid *platform.Id, cp *CreateTrainTaskParam) (*task.TrainParams, error) {
res, found := s.rmap[*pid]
if !found {
func (s *Service) generateParamsForTrainTask(ctx context.Context, id platform.Id, cp *CreateTrainTaskParam) (*task.TrainParams, error) {
res, err := s.loadResource(id)
if err != nil {
return nil, err
}
dat, found := s.dmap[*pid]
if !found {
dat, err := s.loadDataset(id)
if err != nil {
return nil, err
}
img, found := s.imap[*pid]
if !found {
img, err := s.loadImage(id)
if err != nil {
return nil, err
}
param := &task.TrainParams{}
resourceParam, err := res.TrainParam(ctx, cp.Resource)
if err != nil {
return nil, err
}
datasetParam, err := dat.TrainParam(ctx, cp.Dataset)
if err != nil {
return nil, err
}
imgParam, err := img.TrainParam(ctx, cp.Image)
if err != nil {
return nil, err
if resourceParam, err := res.TrainParam(ctx, cp.Resource); err != nil {
return nil, fmt.Errorf("resource param error: %w", err)
} else {
param.Resource = resourceParam
}
param.Resource = resourceParam
param.Dataset = datasetParam
param.Image = imgParam
if datasetParam, err := dat.TrainParam(ctx, cp.Dataset); err != nil {
return nil, fmt.Errorf("dataset param error: %w", err)
} else {
param.Dataset = datasetParam
}
if imgParam, err := img.TrainParam(ctx, cp.Image); err != nil {
return nil, fmt.Errorf("image param error: %w", err)
} else {
param.Image = imgParam
}
return param, nil
}
func (s *Service) TaskResultSync(ctx context.Context, pfId int64, param *task.ResultSyncParam) error {
var pid = platform.Id(pfId)
tsk, found := s.tmap[pid]
if !found {
}
err := tsk.resultSync(ctx, param)
func (s *Service) TaskResultSync(ctx context.Context, id string, param *task.ResultSyncParam) error {
pid := platform.Id(id)
val, err := s.loadTask(pid)
if err != nil {
return err
}
return nil
return val.resultSync(ctx, param)
}
func (s *Service) TaskLog(ctx context.Context, pfId int64, id string) (interface{}, error) {
var pid = platform.Id(pfId)
tsk, found := s.tmap[pid]
if !found {
}
resp, err := tsk.getLog(ctx, id)
func (s *Service) TaskLog(ctx context.Context, id string, taskId string) (interface{}, error) {
pid := platform.Id(id)
val, err := s.loadTask(pid)
if err != nil {
return nil, err
}
return resp, nil
return val.getLog(ctx, taskId)
}
func (s *Service) GetTrainingTask(ctx context.Context, pfId int64, id string) (interface{}, error) {
var pid = platform.Id(pfId)
tsk, found := s.tmap[pid]
if !found {
}
resp, err := tsk.getTrainingTask(ctx, id)
func (s *Service) GetTrainingTask(ctx context.Context, id string, taskId string) (interface{}, error) {
pid := platform.Id(id)
val, err := s.loadTask(pid)
if err != nil {
return nil, err
}
return resp, nil
return val.getTrainingTask(ctx, taskId)
}
func (s *Service) GetInferenceTask(ctx context.Context, pfId int64, id string) (interface{}, error) {
var pid = platform.Id(pfId)
tsk, found := s.tmap[pid]
if !found {
}
resp, err := tsk.getInferenceTask(ctx, id)
func (s *Service) GetInferenceTask(ctx context.Context, id string, taskId string) (interface{}, error) {
pid := platform.Id(id)
val, err := s.loadTask(pid)
if err != nil {
return nil, err
}
return resp, nil
return val.getInferenceTask(ctx, taskId)
}
// dataset
func (s *Service) CreateDataset(ctx context.Context, pfId int64, param *dataset.CreateParam) (interface{}, error) {
var pid = platform.Id(pfId)
ds, found := s.dmap[pid]
if !found {
}
resp, err := ds.Create(ctx, param)
// Dataset operations
func (s *Service) CreateDataset(ctx context.Context, id string, param *dataset.CreateParam) (interface{}, error) {
pid := platform.Id(id)
val, err := s.loadDataset(pid)
if err != nil {
return nil, err
}
return resp, nil
return val.Create(ctx, param)
}
// model
func (s *Service) CreateModel(ctx context.Context, pfId int64, param *model.CreateParam) (interface{}, error) {
var pid = platform.Id(pfId)
mdl, found := s.mmap[pid]
if !found {
}
resp, err := mdl.Create(ctx, param)
// Model operations
func (s *Service) CreateModel(ctx context.Context, id string, param *model.CreateParam) (interface{}, error) {
pid := platform.Id(id)
val, err := s.loadModel(pid)
if err != nil {
return nil, err
}
return resp, nil
return val.Create(ctx, param)
}
func (s *Service) TestFuncRes(ctx context.Context, pfId int64) {
var pid = platform.Id(pfId)
res, found := s.rmap[pid]
if !found {
// Helper methods for loading
func (s *Service) loadResource(id platform.Id) (*Resource, error) {
return loadFromSyncMap[*Resource](&s.resourceMap, id, "resource")
}
func (s *Service) loadDataset(id platform.Id) (*Dataset, error) {
return loadFromSyncMap[*Dataset](&s.datasetMap, id, "dataset")
}
func (s *Service) loadTask(id platform.Id) (*Task, error) {
return loadFromSyncMap[*Task](&s.taskMap, id, "task")
}
func (s *Service) loadAlgorithm(id platform.Id) (*Algorithm, error) {
return loadFromSyncMap[*Algorithm](&s.algorithmMap, id, "algorithm")
}
func (s *Service) loadImage(id platform.Id) (*Image, error) {
return loadFromSyncMap[*Image](&s.imageMap, id, "image")
}
func (s *Service) loadModel(id platform.Id) (*Model, error) {
return loadFromSyncMap[*Model](&s.modelMap, id, "model")
}
func loadFromSyncMap[T any](m *sync.Map, id platform.Id, resourceType string) (T, error) {
var zero T
val, ok := m.Load(id)
if !ok {
return zero, fmt.Errorf("%s for platform ID %d not found", resourceType, id)
}
res.TrainParam(context.Background(), nil)
result, ok := val.(T)
if !ok {
return zero, fmt.Errorf("invalid %s type for platform ID %d", resourceType, id)
}
return result, nil
}

View File

@ -7,21 +7,21 @@ import (
"github.com/smartystreets/goconvey/convey"
aiconf "gitlink.org.cn/JointCloud/pcm-participant-ai/config"
"gitlink.org.cn/JointCloud/pcm-participant-ai/platform"
"gitlink.org.cn/JointCloud/pcm-participant-octopus"
octopus "gitlink.org.cn/JointCloud/pcm-participant-octopus"
openI "gitlink.org.cn/JointCloud/pcm-participant-openi"
"gitlink.org.cn/JointCloud/pcm-participant-openi/common"
"testing"
"time"
)
func TestService(t *testing.T) {
func TestAlgorithm_Train(t *testing.T) {
convey.Convey("Test Service", t, func() {
o, _ := openI.New(aiconf.Cfg[aiconf.OpenI].Username, aiconf.Cfg[aiconf.OpenI].Password, aiconf.Cfg[aiconf.OpenI].APIKey, platform.Id(123), aiconf.Cfg[aiconf.OpenI].DataRepo)
oct, _ := octopus.New(aiconf.Cfg[aiconf.Octopus].URL, aiconf.Cfg[aiconf.Octopus].Username, aiconf.Cfg[aiconf.Octopus].Password, platform.Id(123))
o, _ := openI.New(aiconf.Cfg[aiconf.OpenI].Username, aiconf.Cfg[aiconf.OpenI].Password, aiconf.Cfg[aiconf.OpenI].APIKey, aiconf.OpenI, platform.Id(123), aiconf.Cfg[aiconf.OpenI].DataRepo)
//oct, _ := octopus.New(aiconf.Cfg[aiconf.Octopus].URL, aiconf.Cfg[aiconf.Octopus].Username, aiconf.Cfg[aiconf.Octopus].Password, platform.Id(456))
common.InitClient()
svc, err := NewService(o, oct)
svc, err := NewService(o)
if err != nil {
}
@ -29,6 +29,64 @@ func TestService(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
//convey.Convey("all train algorithms", func() {
// algorithms, err := svc.AllTrainAlgorithms(ctx)
// fmt.Println(len(algorithms))
// if err != nil {
// fmt.Println(err)
// }
//
// convey.So(err, convey.ShouldBeNil)
// convey.So(algorithms, convey.ShouldNotBeEmpty)
// for _, algorithm := range algorithms {
// marshal, err := json.Marshal(algorithm)
// if err != nil {
// return
// }
// fmt.Println(string(marshal))
// }
//
//})
convey.Convey("all train algorithms", func() {
algorithms, err := svc.TrainAlgorithms(ctx, "123")
if err != nil {
fmt.Println(err)
}
convey.So(err, convey.ShouldBeNil)
convey.So(algorithms, convey.ShouldNotBeEmpty)
for _, algorithm := range algorithms {
marshal, err := json.Marshal(algorithm)
if err != nil {
return
}
fmt.Println(string(marshal))
}
})
})
}
func TestService(t *testing.T) {
convey.Convey("Test Service", t, func() {
o, _ := openI.New(aiconf.Cfg[aiconf.OpenI].Username, aiconf.Cfg[aiconf.OpenI].Password, aiconf.Cfg[aiconf.OpenI].APIKey, aiconf.OpenI, platform.Id(123), aiconf.Cfg[aiconf.OpenI].DataRepo)
common.InitClient()
o1, err := octopus.New(aiconf.Cfg[aiconf.Octopus].URL, aiconf.Cfg[aiconf.Octopus].Username, aiconf.Cfg[aiconf.Octopus].Password, aiconf.Octopus, platform.Id(456))
if err != nil {
fmt.Println(err)
}
svc, err := NewService(o, o1)
if err != nil {
fmt.Println(err)
}
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
convey.Convey("all train algorithms", func() {
algorithms, err := svc.AllTrainAlgorithms(ctx)
if err != nil {

View File

@ -18,7 +18,7 @@ import (
func TestOpenITask(t *testing.T) {
convey.Convey(aiconf.OpenI, t, func() {
o, _ := openI.New(aiconf.Cfg[aiconf.OpenI].Username, aiconf.Cfg[aiconf.OpenI].Password, aiconf.Cfg[aiconf.OpenI].APIKey, platform.Id(123), aiconf.Cfg[aiconf.OpenI].DataRepo)
o, _ := openI.New(aiconf.Cfg[aiconf.OpenI].Username, aiconf.Cfg[aiconf.OpenI].Password, aiconf.Cfg[aiconf.OpenI].APIKey, aiconf.OpenI, platform.Id(123), aiconf.Cfg[aiconf.OpenI].DataRepo)
common.InitClient()
tps := &task.TrainParams{
@ -64,7 +64,7 @@ func TestOpenITask(t *testing.T) {
func TestResultSync(t *testing.T) {
convey.Convey(aiconf.OpenI, t, func() {
o, _ := openI.New(aiconf.Cfg[aiconf.OpenI].Username, aiconf.Cfg[aiconf.OpenI].Password, aiconf.Cfg[aiconf.OpenI].APIKey, platform.Id(123), aiconf.Cfg[aiconf.OpenI].DataRepo)
o, _ := openI.New(aiconf.Cfg[aiconf.OpenI].Username, aiconf.Cfg[aiconf.OpenI].Password, aiconf.Cfg[aiconf.OpenI].APIKey, aiconf.OpenI, platform.Id(123), aiconf.Cfg[aiconf.OpenI].DataRepo)
common.InitClient()
src := &aicom.Source{
@ -99,7 +99,7 @@ func TestResultSync(t *testing.T) {
func TestOctopusTask(t *testing.T) {
convey.Convey(aiconf.Octopus, t, func() {
o, _ := octopus.New(aiconf.Cfg[aiconf.Octopus].URL, aiconf.Cfg[aiconf.Octopus].Username, aiconf.Cfg[aiconf.Octopus].Password, platform.Id(123))
o, _ := octopus.New(aiconf.Cfg[aiconf.Octopus].URL, aiconf.Cfg[aiconf.Octopus].Username, aiconf.Cfg[aiconf.Octopus].Password, aiconf.Octopus, platform.Id(123))
tps := &task.TrainParams{
TaskName: "testTask123",

View File

@ -1,6 +1,10 @@
package task
import "gitlink.org.cn/JointCloud/pcm-participant-ai/common"
import (
"encoding/json"
"fmt"
"gitlink.org.cn/JointCloud/pcm-participant-ai/common"
)
type ResultSyncParam struct {
Src *common.Source `json:"src,omitempty"`
@ -17,3 +21,31 @@ type OpenI struct {
func (o OpenI) ResultSyncParam() {
}
func (cp *ResultSyncParam) UnmarshalJSON(data []byte) error {
// 临时结构体:用于捕获原始 JSON 中的 param 字段数据
type TempCreateParam struct {
Src *common.Source `json:"src,omitempty"`
Param json.RawMessage `json:"param,omitempty"` // 捕获原始 JSON 数据
}
var temp TempCreateParam
if err := json.Unmarshal(data, &temp); err != nil {
return err
}
// 将临时结构体的字段赋值给原结构体(除 Param 外)
cp.Src = temp.Src
// 解析 param 字段的原始数据为具体类型
if temp.Param != nil {
// 尝试解析为 OpenI 类型
var openi OpenI
if err := json.Unmarshal(temp.Param, &openi); err == nil {
cp.Param = &openi
return nil
}
return fmt.Errorf("unsupported param type in ResultSyncParam")
}
return nil
}

View File

@ -1,6 +1,7 @@
package api
import (
"errors"
"fmt"
"github.com/gin-gonic/gin"
"github.com/go-playground/validator/v10"
@ -8,9 +9,9 @@ import (
"gitlink.org.cn/JointCloud/pcm-participant-ai/dataset"
aiModel "gitlink.org.cn/JointCloud/pcm-participant-ai/model"
"gitlink.org.cn/JointCloud/pcm-participant-ai/service"
"gitlink.org.cn/JointCloud/pcm-participant-ai/task"
"gitlink.org.cn/JointCloud/pcm-participant-openi/model"
"net/http"
"strconv"
"strings"
"sync"
)
@ -34,15 +35,10 @@ func (a *aiApi) RegisterSvc(svc *service.Service) {
// TrainAlgorithmsHandler 处理获取特定平台训练算法的请求
func (a *aiApi) TrainAlgorithmsHandler(c *gin.Context) {
pfIdStr := c.Query("pfId")
pfId, err := strconv.ParseInt(pfIdStr, 10, 64)
if err != nil {
model.Response(c, http.StatusBadRequest, "invalid pfId", nil)
return
}
pfId := c.Query("pfId")
algorithms, err := a.service.TrainAlgorithms(c, pfId)
if err != nil {
model.Response(c, http.StatusInternalServerError, "failed to get train algorithms", nil)
model.Response(c, http.StatusInternalServerError, err.Error(), nil)
return
}
model.Response(c, http.StatusOK, "success", algorithms)
@ -52,7 +48,7 @@ func (a *aiApi) TrainAlgorithmsHandler(c *gin.Context) {
func (a *aiApi) AllTrainAlgorithmsHandler(c *gin.Context) {
algorithms, err := a.service.AllTrainAlgorithms(c)
if err != nil {
model.Response(c, http.StatusInternalServerError, "failed to get all train algorithms", nil)
model.Response(c, http.StatusInternalServerError, err.Error(), nil)
return
}
model.Response(c, http.StatusOK, "success", algorithms)
@ -60,32 +56,26 @@ func (a *aiApi) AllTrainAlgorithmsHandler(c *gin.Context) {
// CreateAlgorithmHandler 处理创建算法的请求
func (a *aiApi) CreateAlgorithmHandler(c *gin.Context) {
pfIdStr := c.Query("pfId")
pfId, err := strconv.ParseInt(pfIdStr, 10, 64)
if err != nil {
model.Response(c, http.StatusBadRequest, "invalid pfId", nil)
return
}
pfId := c.Query("pfId")
var param algorithm.CreateParam
if err := c.ShouldBindJSON(&param); err != nil {
if ve, ok := err.(validator.ValidationErrors); ok {
var errorMsg []string
for _, e := range ve {
errorMsg = append(errorMsg, fmt.Sprintf("字段 %s 验证失败: %s", e.Field(), e.Tag()))
var ve validator.ValidationErrors
if errors.As(err, &ve) {
errorMsgs := make([]string, len(ve))
for i, e := range ve {
errorMsgs[i] = fmt.Sprintf("字段 %s 验证失败: %s", e.Field(), e.Tag())
}
model.Response(c, http.StatusBadRequest, "请求体格式错误: "+strings.Join(errorMsg, "; "), nil)
model.Response(c, http.StatusBadRequest, strings.Join(errorMsgs, "; "), nil)
} else {
model.Response(c, http.StatusBadRequest, "请求体解析失败: "+err.Error(), nil)
model.Response(c, http.StatusBadRequest, "Invalid request format", nil)
}
return
//model.Response(c, http.StatusBadRequest, "invalid request body", nil)
//return
}
resp, err := a.service.CreateAlgorithm(c.Request.Context(), pfId, &param)
if err != nil {
model.Response(c, http.StatusInternalServerError, "failed to create algorithm", nil)
model.Response(c, http.StatusInternalServerError, err.Error(), nil)
return
}
@ -94,30 +84,26 @@ func (a *aiApi) CreateAlgorithmHandler(c *gin.Context) {
// CreateDatasetHandler 处理创建数据集的请求
func (a *aiApi) CreateDatasetHandler(c *gin.Context) {
pfIdStr := c.Query("pfId")
pfId, err := strconv.ParseInt(pfIdStr, 10, 64)
if err != nil {
model.Response(c, http.StatusBadRequest, "invalid pfId", nil)
return
}
pfId := c.Query("pfId")
var param dataset.CreateParam
if err := c.ShouldBindJSON(&param); err != nil {
if ve, ok := err.(validator.ValidationErrors); ok {
var errorMsg []string
for _, e := range ve {
errorMsg = append(errorMsg, fmt.Sprintf("字段 %s 验证失败: %s", e.Field(), e.Tag()))
var ve validator.ValidationErrors
if errors.As(err, &ve) {
errorMsgs := make([]string, len(ve))
for i, e := range ve {
errorMsgs[i] = fmt.Sprintf("字段 %s 验证失败: %s", e.Field(), e.Tag())
}
model.Response(c, http.StatusBadRequest, "请求体格式错误: "+strings.Join(errorMsg, "; "), nil)
model.Response(c, http.StatusBadRequest, strings.Join(errorMsgs, "; "), nil)
} else {
model.Response(c, http.StatusBadRequest, "请求体解析失败: "+err.Error(), nil)
model.Response(c, http.StatusBadRequest, "Invalid request format", nil)
}
return
}
resp, err := a.service.CreateDataset(c.Request.Context(), pfId, &param)
if err != nil {
model.Response(c, http.StatusInternalServerError, "failed to create algorithm", nil)
model.Response(c, http.StatusInternalServerError, err.Error(), nil)
return
}
@ -126,30 +112,26 @@ func (a *aiApi) CreateDatasetHandler(c *gin.Context) {
// CreateModelHandler 处理创建模型的请求
func (a *aiApi) CreateModelHandler(c *gin.Context) {
pfIdStr := c.Query("pfId")
pfId, err := strconv.ParseInt(pfIdStr, 10, 64)
if err != nil {
model.Response(c, http.StatusBadRequest, "invalid pfId", nil)
return
}
pfId := c.Query("pfId")
var param aiModel.CreateParam
if err := c.ShouldBindJSON(&param); err != nil {
if ve, ok := err.(validator.ValidationErrors); ok {
var errorMsg []string
for _, e := range ve {
errorMsg = append(errorMsg, fmt.Sprintf("字段 %s 验证失败: %s", e.Field(), e.Tag()))
var ve validator.ValidationErrors
if errors.As(err, &ve) {
errorMsgs := make([]string, len(ve))
for i, e := range ve {
errorMsgs[i] = fmt.Sprintf("字段 %s 验证失败: %s", e.Field(), e.Tag())
}
model.Response(c, http.StatusBadRequest, "请求体格式错误: "+strings.Join(errorMsg, "; "), nil)
model.Response(c, http.StatusBadRequest, strings.Join(errorMsgs, "; "), nil)
} else {
model.Response(c, http.StatusBadRequest, "请求体解析失败: "+err.Error(), nil)
model.Response(c, http.StatusBadRequest, "Invalid request format", nil)
}
return
}
resp, err := a.service.CreateModel(c.Request.Context(), pfId, &param)
if err != nil {
model.Response(c, http.StatusInternalServerError, "failed to create algorithm", nil)
model.Response(c, http.StatusInternalServerError, err.Error(), nil)
return
}
@ -158,28 +140,23 @@ func (a *aiApi) CreateModelHandler(c *gin.Context) {
// GetResourceSpecsHandler 处理获取资源规格的请求
func (a *aiApi) GetResourceSpecsHandler(c *gin.Context) {
pfIdStr := c.Query("pfId")
pfId := c.Query("pfId")
rtype := c.Query("rType")
pfId, err := strconv.ParseInt(pfIdStr, 10, 64)
resp, err := a.service.GetResourceSpecs(c, pfId, rtype)
if err != nil {
model.Response(c, http.StatusBadRequest, "invalid pfId", nil)
model.Response(c, http.StatusBadRequest, err.Error(), nil)
return
}
a.service.GetResourceSpecs(c, pfId, rtype)
model.Response(c, http.StatusOK, "success", nil)
model.Response(c, http.StatusOK, "success", resp)
}
// TrainResourcesHandler 处理获取特定平台训练资源的请求
func (a *aiApi) TrainResourcesHandler(c *gin.Context) {
pfIdStr := c.Query("pfId")
pfId, err := strconv.ParseInt(pfIdStr, 10, 64)
if err != nil {
model.Response(c, http.StatusBadRequest, "invalid pfId", nil)
return
}
pfId := c.Query("pfId")
resources, err := a.service.TrainResources(c, pfId)
if err != nil {
model.Response(c, http.StatusInternalServerError, "failed to get train resources", nil)
model.Response(c, http.StatusInternalServerError, err.Error(), nil)
return
}
model.Response(c, http.StatusOK, "success", resources)
@ -189,7 +166,7 @@ func (a *aiApi) TrainResourcesHandler(c *gin.Context) {
func (a *aiApi) AllTrainResourcesHandler(c *gin.Context) {
resources, err := a.service.AllTrainResources(c)
if err != nil {
model.Response(c, http.StatusInternalServerError, "failed to get all train resources", nil)
model.Response(c, http.StatusInternalServerError, err.Error(), nil)
return
}
model.Response(c, http.StatusOK, "success", resources)
@ -199,25 +176,86 @@ func (a *aiApi) AllTrainResourcesHandler(c *gin.Context) {
func (a *aiApi) CreateTrainTaskHandler(c *gin.Context) {
var param service.CreateTrainTaskParam
if err := c.ShouldBindJSON(&param); err != nil {
model.Response(c, http.StatusBadRequest, "invalid request body", nil)
var ve validator.ValidationErrors
if errors.As(err, &ve) {
errorMsgs := make([]string, len(ve))
for i, e := range ve {
errorMsgs[i] = fmt.Sprintf("字段 %s 验证失败: %s", e.Field(), e.Tag())
}
model.Response(c, http.StatusBadRequest, strings.Join(errorMsgs, "; "), nil)
} else {
model.Response(c, http.StatusBadRequest, "Invalid request format", nil)
}
return
}
resp, err := a.service.CreateTrainTask(c, &param)
if err != nil {
model.Response(c, http.StatusBadRequest, "failed to create train task", err)
model.Response(c, http.StatusInternalServerError, err.Error(), nil)
return
}
model.Response(c, http.StatusOK, "success", resp)
}
// TestFuncResHandler 处理测试资源相关功能的请求
func (a *aiApi) TestFuncResHandler(c *gin.Context) {
pfIdStr := c.Query("pfId")
pfId, err := strconv.ParseInt(pfIdStr, 10, 64)
if err != nil {
model.Response(c, http.StatusBadRequest, "invalid pfId", nil)
// TaskResultSyncHandler 同步任务结果数据
func (a *aiApi) TaskResultSyncHandler(c *gin.Context) {
pfId := c.Query("pfId")
var param task.ResultSyncParam
if err := c.ShouldBindJSON(&param); err != nil {
var ve validator.ValidationErrors
if errors.As(err, &ve) {
errorMsgs := make([]string, len(ve))
for i, e := range ve {
errorMsgs[i] = fmt.Sprintf("字段 %s 验证失败: %s", e.Field(), e.Tag())
}
model.Response(c, http.StatusBadRequest, strings.Join(errorMsgs, "; "), nil)
} else {
model.Response(c, http.StatusBadRequest, "Invalid request format", nil)
}
return
}
err := a.service.TaskResultSync(c, pfId, &param)
if err != nil {
model.Response(c, http.StatusInternalServerError, err.Error(), nil)
return
}
a.service.TestFuncRes(c, pfId)
model.Response(c, http.StatusOK, "success", nil)
}
// TaskLogHandler 查询任务日志
func (a *aiApi) TaskLogHandler(c *gin.Context) {
pfId := c.Query("pfId")
taskId := c.Query("taskId")
resp, err := a.service.TaskLog(c, pfId, taskId)
if err != nil {
model.Response(c, http.StatusInternalServerError, err.Error(), nil)
return
}
model.Response(c, http.StatusOK, "success", resp)
}
// TrainTaskDetailHandler 查询训练任务详情
func (a *aiApi) TrainTaskDetailHandler(c *gin.Context) {
pfId := c.Query("pfId")
taskId := c.Query("taskId")
resp, err := a.service.GetTrainingTask(c, pfId, taskId)
if err != nil {
model.Response(c, http.StatusInternalServerError, err.Error(), nil)
return
}
model.Response(c, http.StatusOK, "success", resp)
}
// InferTaskDetailHandler 查询推理任务详情
func (a *aiApi) InferTaskDetailHandler(c *gin.Context) {
pfId := c.Query("pfId")
taskId := c.Query("taskId")
resp, err := a.service.GetInferenceTask(c, pfId, taskId)
if err != nil {
model.Response(c, http.StatusInternalServerError, err.Error(), nil)
return
}
model.Response(c, http.StatusOK, "success", resp)
}

View File

@ -8,4 +8,4 @@ pcm-core:
coordinator-host: http://127.0.0.1:8999
participant-host: http://localhost:8080
hpc-cluster-list: /pcm/v1/adapter/cluster/list?type=2&pageNum=1&pageSize=10&storageSchedule=1
ai-cluster-list: /pcm/v1/adapter/cluster/list?name=openI-new-p&type=1&pageNum=1&pageSize=10&storageSchedule=1
ai-cluster-list: /pcm/v1/adapter/cluster/list?adapterId=1777144940456666666&type=1&pageNum=1&pageSize=10&storageSchedule=1

View File

@ -9,10 +9,10 @@ import (
"gitlink.org.cn/JointCloud/pcm-participant-client/common/utils"
"gitlink.org.cn/JointCloud/pcm-participant-client/config"
octopus "gitlink.org.cn/JointCloud/pcm-participant-octopus"
"gitlink.org.cn/JointCloud/pcm-participant-openi/common"
"strconv"
openI "gitlink.org.cn/JointCloud/pcm-participant-openi"
"gitlink.org.cn/JointCloud/pcm-participant-openi/common"
"go.uber.org/zap"
)
@ -128,22 +128,33 @@ func initAISvcs(client *utils.RestyClient, core config.PcmCore) (*service.Servic
zap.L().Warn("跳过无效集群条目: 缺少集群ID")
continue
}
clusterId, _ := strconv.ParseInt(cluster.Id, 10, 64)
switch cluster.Label {
case "openI":
oi, _ := openI.New(cluster.Username, cluster.Password, cluster.Token, platform.Id(clusterId), "data")
oi, err := openI.New(cluster.Username, cluster.Password, cluster.Token, cluster.Name, platform.Id(cluster.Id), "data")
if err != nil {
Error("初始化失败", zap.Error(err))
continue
}
platforms = append(platforms, oi)
common.InitClient()
//更新C端集群状态
case "octopus":
oct, _ := octopus.New(cluster.Server, cluster.Username, cluster.Password, platform.Id(clusterId))
oct, err := octopus.New(cluster.Address, cluster.Username, cluster.Password, cluster.Name, platform.Id(cluster.Id))
if err != nil {
Error("初始化失败", zap.Error(err))
continue
}
platforms = append(platforms, oct)
}
common.InitClient()
}
svc, _ = service.NewService(platforms...)
if len(platforms) == 0 {
return nil, fmt.Errorf("注册集群列表为空")
}
svc, err = service.NewService(platforms...)
if err != nil {
return nil, err
}
return svc, nil
}

View File

@ -37,11 +37,15 @@ func main() {
}
defer initialize.Close()
// 初始化集群连接
if err := initialize.InitHPCCluster(cfg); err != nil {
zap.L().Fatal("集群初始化失败", zap.Error(err))
}
//if err := initialize.InitHPCCluster(cfg); err != nil {
// zap.L().Fatal("集群初始化失败", zap.Error(err))
//}
// 初始化集群连接
svc, _ := initialize.InitAICluster(cfg)
svc, err := initialize.InitAICluster(cfg)
if err != nil {
initialize.Panic("Server started failed: %s", err)
return
}
api.AiApi.RegisterSvc(svc)
defer initialize.CloseAllPools()

View File

@ -9,18 +9,28 @@ func AIRoutes(server *gin.Engine) {
ai := server.Group("/ai")
aiApi := api.AiApi
{
//算法路由
ai.GET("/algorithm/get", aiApi.TrainAlgorithmsHandler)
ai.GET("/algorithm/list", aiApi.AllTrainAlgorithmsHandler)
ai.POST("/algorithm/create", aiApi.CreateAlgorithmHandler)
//数据集路由
ai.POST("/dataset/create", aiApi.CreateDatasetHandler)
// 模型相关路由
ai.POST("/model/create", aiApi.CreateModelHandler)
// 资源相关路由
ai.GET("/resource/specs", aiApi.GetResourceSpecsHandler)
ai.GET("/resource/get", aiApi.TrainResourcesHandler)
ai.GET("/resource/list", aiApi.AllTrainResourcesHandler)
ai.GET("/resource/train/get", aiApi.TrainResourcesHandler)
ai.GET("/resource/train/list", aiApi.AllTrainResourcesHandler)
// 任务相关路由
ai.POST("/task/train", aiApi.CreateTrainTaskHandler)
ai.POST("/task/sync", aiApi.TaskResultSyncHandler)
ai.GET("/task/log", aiApi.TaskLogHandler)
ai.GET("/task/train/detail", aiApi.TrainTaskDetailHandler)
ai.GET("/task/infer/detail", aiApi.InferTaskDetailHandler)
// 算法相关路由
ai.GET("/task/train", aiApi.CreateTrainTaskHandler)
}
}

View File

@ -149,12 +149,12 @@ type ResourceSpec struct {
cardType string
}
func New(ip, user, pwd string, id platform.Id) (*Octopus, error) {
func New(ip, user, pwd, clusterName string, id platform.Id) (*Octopus, error) {
token, err := common.NewToken(ip, user, pwd)
if err != nil {
return nil, err
}
cfg, err := config.LoadConfig("../../jcs/config/config.yaml")
cfg, err := config.LoadConfig("../jcs/config/config.yaml")
if err != nil {
return nil, err
}
@ -164,6 +164,7 @@ func New(ip, user, pwd string, id platform.Id) (*Octopus, error) {
user: user,
pwd: pwd,
platform: &platform.Platform{
Name: clusterName,
Type: platform.Octopus,
Id: id,
},
@ -240,7 +241,7 @@ func (a *Algorithm) Infer(ctx context.Context) (algorithm.Algorithms, error) {
return private, nil
}
func (a *Algorithm) Create(ctx context.Context, param *algorithm.CreateParam) (interface{}, error) {
func (a *Algorithm) Create(ctx context.Context, param *algorithm.CreateParam) (*algorithm.CreateResp, error) {
// check name
if param.Name == "" {
return nil, errors.New("Name is required")
@ -347,7 +348,7 @@ func bind(name, upUrl string, packageId, userId int, jcsClient *jcs.Jcs) (interf
return nil, nil
}
func (a *Algorithm) create(ctx context.Context, name string, packageId, userId int) (interface{}, error) {
func (a *Algorithm) create(ctx context.Context, name string, packageId, userId int) (*algorithm.CreateResp, error) {
token, err := a.opt.token.Get()
if err != nil {
return nil, err
@ -435,7 +436,12 @@ func (a *Algorithm) create(ctx context.Context, name string, packageId, userId i
}
}
return nil, nil
resp := &algorithm.CreateResp{
Id: create.Payload.AlgorithmId,
Name: name,
}
return resp, nil
}
func (a *Algorithm) Features() *algorithm.Features {
@ -569,7 +575,7 @@ func (d *Dataset) Private(ctx context.Context) (dataset.Datasets, error) {
return specs, nil
}
func (d *Dataset) Create(ctx context.Context, param *dataset.CreateParam) (interface{}, error) {
func (d *Dataset) Create(ctx context.Context, param *dataset.CreateParam) (*dataset.CreateResp, error) {
// check name
if param.Name == "" {
return nil, errors.New("Name is required")
@ -602,7 +608,7 @@ func (d *Dataset) Create(ctx context.Context, param *dataset.CreateParam) (inter
return resp, nil
}
func (d *Dataset) create(ctx context.Context, name, desc string, packageId, userId int) (interface{}, error) {
func (d *Dataset) create(ctx context.Context, name, desc string, packageId, userId int) (*dataset.CreateResp, error) {
token, err := d.opt.token.Get()
if err != nil {
return nil, err
@ -685,7 +691,12 @@ func (d *Dataset) create(ctx context.Context, name, desc string, packageId, user
}
}
return nil, nil
resp := &dataset.CreateResp{
Id: create.Payload.Id,
Name: name,
}
return resp, nil
}
func (d *Dataset) Features() *dataset.Features {
@ -1104,7 +1115,7 @@ func (r *Resource) GetResourceSpecs(ctx context.Context, resrcType string) (*res
clusterId := r.opt.platform.Id
res := &resource.ResourceSpecs{
ClusterId: strconv.FormatInt(int64(clusterId), 10),
ClusterId: string(clusterId),
Tag: resrcType,
}
@ -1665,7 +1676,7 @@ func (m *Model) Infer(ctx context.Context) (aiModel.Models, error) {
return nil, errors.New("implement me")
}
func (m *Model) Create(ctx context.Context, param *aiModel.CreateParam) (interface{}, error) {
func (m *Model) Create(ctx context.Context, param *aiModel.CreateParam) (*aiModel.CreateResp, error) {
return nil, errors.New("implement me")
}

View File

@ -5,7 +5,7 @@ import (
"net/http"
)
func Response(c *gin.Context, code int, msg interface{}, data interface{}) {
func Response(c *gin.Context, code int, msg string, data interface{}) {
c.JSON(http.StatusOK, map[string]interface{}{
"code": code,
"msg": msg,

View File

@ -204,8 +204,8 @@ type BaseResp struct {
Data interface{} `json:"data"`
}
func New(username, pwd, token string, id platform.Id, datasetRepo string) (*OpenI, error) {
cfg, err := config.LoadConfig("..\\..\\jcs\\config\\config.yaml")
func New(username, pwd, token, clusterName string, id platform.Id, datasetRepo string) (*OpenI, error) {
cfg, err := config.LoadConfig("../jcs/config/config.yaml")
if err != nil {
return nil, err
}
@ -216,6 +216,7 @@ func New(username, pwd, token string, id platform.Id, datasetRepo string) (*Open
token: token,
pwd: pwd,
platform: &platform.Platform{
Name: clusterName,
Type: platform.OpenI,
Id: id,
},
@ -293,7 +294,7 @@ func (m *Model) Features() *aiModel.Features {
return m.ft
}
func (m *Model) Create(ctx context.Context, param *aiModel.CreateParam) (interface{}, error) {
func (m *Model) Create(ctx context.Context, param *aiModel.CreateParam) (*aiModel.CreateResp, error) {
if param.Param == nil {
return nil, errors.New("sync params is nil")
}
@ -343,7 +344,7 @@ func (m *Model) createModelRecord(name, repo, desc string) (resp *model.CreateLo
return bind, nil
}
func (m *Model) bindModel(name, repo, desc string, packageId, userId int) (*BaseResp, error) {
func (m *Model) bindModel(name, repo, desc string, packageId, userId int) (*aiModel.CreateResp, error) {
//创建模型记录
bind, err := m.createModelRecord(name, repo, desc)
if err != nil {
@ -390,15 +391,11 @@ func (m *Model) bindModel(name, repo, desc string, packageId, userId int) (*Base
return nil, err
}
jsonData := map[string]string{
"id": mResp.Id,
"name": name,
}
resp := &BaseResp{
Code: http.StatusOK,
Msg: Success,
Data: jsonData,
resp := &aiModel.CreateResp{
Id: mResp.Id,
Name: name,
}
return resp, nil
}
@ -1269,7 +1266,7 @@ func (r *Resource) GetResourceSpecs(ctx context.Context, resrcType string) (*res
var resources []interface{}
clusterId := r.opt.platform.Id
res := &resource.ResourceSpecs{
ClusterId: strconv.FormatInt(int64(clusterId), 10),
ClusterId: string(clusterId),
Tag: resrcType,
}
@ -1539,7 +1536,7 @@ func (d *Dataset) Infer(ctx context.Context) (dataset.Datasets, error) {
return datasets, nil
}
func (d *Dataset) Create(ctx context.Context, param *dataset.CreateParam) (interface{}, error) {
func (d *Dataset) Create(ctx context.Context, param *dataset.CreateParam) (*dataset.CreateResp, error) {
// check name
if param.Name == "" {
return nil, errors.New("Name is required")
@ -1580,7 +1577,7 @@ func (d *Dataset) Create(ctx context.Context, param *dataset.CreateParam) (inter
return resp, nil
}
func (d *Dataset) bindDataset(ctx context.Context, name, repo string, packageId, userId int) (resp *BaseResp, err error) {
func (d *Dataset) bindDataset(ctx context.Context, name, repo string, packageId, userId int) (resp *dataset.CreateResp, err error) {
var (
pkgId = packageId
@ -1663,25 +1660,19 @@ func (d *Dataset) bindDataset(ctx context.Context, name, repo string, packageId,
return nil, err
}
jsonData := map[string]string{
"id": upResp.Data.Id,
"name": upResp.Data.Name,
}
resp = &BaseResp{
Code: http.StatusOK,
Msg: Success,
Data: jsonData,
resp = &dataset.CreateResp{
Id: upResp["id"],
Name: upResp["name"],
}
return
}
func (d *Dataset) uploadFile2Dataset(param *model.DatasetUploadFileParam, fileHeaders []*types.FileInput) (*model.UploadFile2DatasetResp, error) {
func (d *Dataset) uploadFile2Dataset(param *model.DatasetUploadFileParam, fileHeaders []*types.FileInput) (map[string]string, error) {
// customName即绑定数据集时名称
if param.CustomName == "" {
return nil, errors.New("customName为必填字段")
}
resp := model.UploadFile2DatasetResp{}
files, err := toMultipartFileHeaders(fileHeaders)
if err != nil {
return nil, err
@ -1692,17 +1683,7 @@ func (d *Dataset) uploadFile2Dataset(param *model.DatasetUploadFileParam, fileHe
return nil, err
}
marshal, err := json.Marshal(upResp)
if err != nil {
return nil, err
}
err = json.Unmarshal(marshal, resp.Data)
if err != nil {
return nil, err
}
return &resp, nil
return upResp, nil
}
func createMultipartFileHeader(w *multipart.Writer, param, fileName, contentType string, reader io.Reader) error {
@ -1812,7 +1793,7 @@ func (a *Algorithm) Infer(ctx context.Context) (algorithm.Algorithms, error) {
return infer, nil
}
func (a *Algorithm) Create(ctx context.Context, param *algorithm.CreateParam) (interface{}, error) {
func (a *Algorithm) Create(ctx context.Context, param *algorithm.CreateParam) (*algorithm.CreateResp, error) {
// check name
if param.Name == "" {
return nil, errors.New("Name is required")
@ -1874,7 +1855,7 @@ func (a *Algorithm) createRepo(ctx context.Context, name, desc, branch string) (
return repo, nil
}
func (a *Algorithm) bindAlgorithm(ctx context.Context, name, desc, branch, bootFile string, packageId, userId int) (resp *BaseResp, err error) {
func (a *Algorithm) bindAlgorithm(ctx context.Context, name, desc, branch, bootFile string, packageId, userId int) (resp *algorithm.CreateResp, err error) {
var (
pkgId = packageId
@ -1988,14 +1969,9 @@ func (a *Algorithm) bindAlgorithm(ctx context.Context, name, desc, branch, bootF
return nil, err
}
jsonData := map[string]string{
"id": filepath.Join(paramNameRepo, paramBranch, paramBootFile), //(repoName/branchName/bootFile)
"name": paramNameRepo,
}
resp = &BaseResp{
Code: http.StatusOK,
Msg: Success,
Data: jsonData,
resp = &algorithm.CreateResp{
Id: filepath.Join(paramNameRepo, paramBranch, paramBootFile), //(repoName/branchName/bootFile)
Name: paramNameRepo,
}
return
}