Merge pull request 'update service' (#523) from tzwang/pcm-coordinator:master into master

This commit is contained in:
tzwang 2025-07-16 17:40:15 +08:00
commit 574c41e7f8
4 changed files with 279 additions and 0 deletions

163
internal/participant/ai.go Normal file
View File

@ -0,0 +1,163 @@
package participant
import (
"github.com/go-resty/resty/v2"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/database"
"net/http"
)
const (
// 算法路由
AlgorithmById = "/ai/algorithm/get" //根据Id查询算法列表
AlgorithmsList = "/ai/algorithm/list" //所有算法列表
AlgorithmCreateById = "/ai/algorithm/create" //根据Id创建算法
// 数据集路由
DatasetCreateById = "/ai/dataset/create" //根据Id创建数据集
// 模型相关路由
ModelCreateById = "/ai/model/create" //根据Id创建模型
// 资源相关路由
ResourceSpecList = "/ai/resource/specs" //所有资源列表,根据参数 train or infer 查询资源
ResourceTrainingById = "/ai/resource/train/get" //根据Id查询资源列表
ResourceTrainingList = "/ai/resource/train/list" //所有训练资源列表
// 任务相关路由
TaskCreateTrain = "/ai/task/train"
TaskResultSync = "/ai/task/sync"
TaskLog = "/ai/task/log"
TaskTrainingDetail = "/ai/task/train/detail"
TaskInferenceDetail = "/ai/task/infer/detail"
Localhost = "http://localhost:8080"
)
type Ai struct {
store *database.AiStorage
}
func NewAi() (*Ai, error) {
InitClient()
return &Ai{}, nil
}
func (a *Ai) AlgorithmById(platformId string) (resp *Resp, err error) {
respErr := &RespErr{}
_, err = Request(Localhost+AlgorithmById, http.MethodGet, func(req *resty.Request) {
req.SetQueryParams(map[string]string{
"pfId": platformId,
}).SetError(&respErr).SetResult(&resp)
})
if err != nil {
return nil, err
}
return
}
func (a *Ai) AlgorithmCreateById(platformId string, param *CreateParam) (resp *Resp, err error) {
respErr := &RespErr{}
_, err = Request(Localhost+AlgorithmCreateById, http.MethodPost, func(req *resty.Request) {
req.SetQueryParams(map[string]string{
"pfId": platformId,
}).SetBody(param).SetError(&respErr).SetResult(&resp)
})
if err != nil {
return nil, err
}
return
}
func (a *Ai) DatasetCreateById(platformId string, param *CreateParam) (resp *Resp, err error) {
respErr := &RespErr{}
_, err = Request(Localhost+DatasetCreateById, http.MethodPost, func(req *resty.Request) {
req.SetQueryParams(map[string]string{
"pfId": platformId,
}).SetBody(param).SetError(&respErr).SetResult(&resp)
})
if err != nil {
return nil, err
}
return
}
func (a *Ai) ModelCreateById(platformId string, param *CreateParam) (resp *Resp, err error) {
respErr := &RespErr{}
_, err = Request(Localhost+ModelCreateById, http.MethodPost, func(req *resty.Request) {
req.SetQueryParams(map[string]string{
"pfId": platformId,
}).SetBody(param).SetError(&respErr).SetResult(&resp)
})
if err != nil {
return nil, err
}
return
}
func (a *Ai) TaskCreateTrain(platformId string, param *TaskCreateParam) (resp *Resp, err error) {
respErr := &RespErr{}
_, err = Request(Localhost+TaskCreateTrain, http.MethodPost, func(req *resty.Request) {
req.SetQueryParams(map[string]string{
"pfId": platformId,
}).SetBody(param).SetError(&respErr).SetResult(&resp)
})
if err != nil {
return nil, err
}
return
}
func (a *Ai) TaskResultSync(platformId string, param *TaskResultSyncParam) (resp *Resp, err error) {
respErr := &RespErr{}
_, err = Request(Localhost+TaskResultSync, http.MethodPost, func(req *resty.Request) {
req.SetQueryParams(map[string]string{
"pfId": platformId,
}).SetBody(param).SetError(&respErr).SetResult(&resp)
})
if err != nil {
return nil, err
}
return
}
func (a *Ai) TaskLog(platformId string, taskId string) (resp *Resp, err error) {
respErr := &RespErr{}
_, err = Request(Localhost+TaskLog, http.MethodGet, func(req *resty.Request) {
req.SetQueryParams(map[string]string{
"pfId": platformId,
"taskId": taskId,
}).SetError(&respErr).SetResult(&resp)
})
if err != nil {
return nil, err
}
return
}
func (a *Ai) TaskTrainingDetail(platformId string, taskId string) (resp *Resp, err error) {
respErr := &RespErr{}
_, err = Request(Localhost+TaskTrainingDetail, http.MethodGet, func(req *resty.Request) {
req.SetQueryParams(map[string]string{
"pfId": platformId,
"taskId": taskId,
}).SetError(&respErr).SetResult(&resp)
})
if err != nil {
return nil, err
}
return
}
func (a *Ai) TaskInferenceDetail(platformId string, taskId string) (resp *Resp, err error) {
respErr := &RespErr{}
_, err = Request(Localhost+TaskInferenceDetail, http.MethodGet, func(req *resty.Request) {
req.SetQueryParams(map[string]string{
"pfId": platformId,
"taskId": taskId,
}).SetError(&respErr).SetResult(&resp)
})
if err != nil {
return nil, err
}
return
}

View File

@ -0,0 +1,81 @@
package participant
import (
"crypto/tls"
"errors"
"fmt"
"net/http"
"time"
"github.com/go-resty/resty/v2"
)
type ReqCallback func(req *resty.Request)
var (
NoRedirectClient *resty.Client
RestyClient *resty.Client
HttpClient *http.Client
)
var UserAgent = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/87.0.4280.88 Safari/537.36"
var DefaultTimeout = time.Second * 300
func InitClient() {
NoRedirectClient = resty.New().SetRedirectPolicy(
resty.RedirectPolicyFunc(func(req *http.Request, via []*http.Request) error {
return http.ErrUseLastResponse
}),
).SetTLSClientConfig(&tls.Config{InsecureSkipVerify: true})
NoRedirectClient.SetHeader("user-agent", UserAgent)
RestyClient = NewRestyClient()
HttpClient = NewHttpClient()
}
func NewRestyClient() *resty.Client {
client := resty.New().
SetHeader("user-agent", UserAgent).
SetRetryCount(3).
SetRetryResetReaders(true).
SetTimeout(DefaultTimeout).
SetTLSClientConfig(&tls.Config{InsecureSkipVerify: true})
return client
}
func NewHttpClient() *http.Client {
return &http.Client{
Timeout: time.Hour * 48,
Transport: &http.Transport{
Proxy: http.ProxyFromEnvironment,
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
},
}
}
func Request(url string, method string, callback ReqCallback) ([]byte, error) {
respErr := &RespErr{}
req := RestyClient.R().
SetHeaders(map[string]string{
"Content-Type": "application/json",
}).
SetError(&respErr)
if callback != nil {
callback(req)
}
res, err := req.Execute(method, url)
if err != nil {
return nil, err
}
if respErr.Message != "" {
return nil, errors.New(respErr.Message)
}
if res.StatusCode() != http.StatusOK && res.StatusCode() != http.StatusCreated {
return nil, errors.New(fmt.Sprintf("msg: %s, status: %d", res.String(), res.StatusCode()))
}
return res.Body(), nil
}

View File

@ -0,0 +1,27 @@
package participant
type RespErr struct {
Code int32 `json:"code"`
Message string `json:"message"`
}
type Resp struct {
Code int32 `json:"code"`
Message string `json:"message"`
Data interface{} `json:"data"`
}
type CreateParam struct {
Name string `json:"name" binding:"required"`
Desc string `json:"desc"`
Src interface{} `json:"src,omitempty"`
Param interface{} `json:"param,omitempty"`
}
type TaskCreateParam struct {
}
type TaskResultSyncParam struct {
Src interface{} `json:"src,omitempty"`
Param interface{} `json:"param,omitempty"`
}

View File

@ -23,6 +23,7 @@ import (
"github.com/zeromicro/go-zero/zrpc"
"gitlink.org.cn/JointCloud/pcm-ac/hpcacclient"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/config"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/participant"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/database"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service"
@ -56,6 +57,7 @@ type ServiceContext struct {
AlertClient *alert.AlertmanagerAPI
HttpClient *resty.Client
Scheduler *scheduler.Scheduler
Ai *participant.Ai
}
func NewServiceContext(c config.Config) *ServiceContext {
@ -121,6 +123,11 @@ func NewServiceContext(c config.Config) *ServiceContext {
panic(err)
}
scheduler := scheduler.NewSchdlr(aiService, storage, hpcStorage, hpcService)
ai, err := participant.NewAi()
if err != nil {
logx.Error(err.Error())
panic(err)
}
return &ServiceContext{
DbEngin: dbEngin,
Cron: cron.New(cron.WithSeconds()),
@ -136,5 +143,6 @@ func NewServiceContext(c config.Config) *ServiceContext {
AlertClient: alertClient,
HttpClient: httpClient,
Scheduler: scheduler,
Ai: ai,
}
}