Merge pull request 'update service' (#524) from tzwang/pcm-coordinator:master into master

This commit is contained in:
tzwang 2025-07-17 11:25:19 +08:00
commit ec91218790
4 changed files with 96 additions and 16 deletions

View File

@ -90,4 +90,7 @@ BlockChain:
Type: "2"
JcsMiddleware:
JobStatusReportUrl: http://101.201.215.196:7891/jobSet/jobStatusReport
JobStatusReportUrl: http://101.201.215.196:7891/jobSet/jobStatusReport
Participant:
AdapterId: "1777144940456666666"

View File

@ -53,6 +53,8 @@ type Config struct {
Monitoring Monitoring
JcsMiddleware JcsMiddleware
Participant Participant
}
type Monitoring struct {
PromUrl string
@ -67,3 +69,7 @@ type SnowflakeConf struct {
type JcsMiddleware struct {
JobStatusReportUrl string
}
type Participant struct {
AdapterId string
}

View File

@ -1,9 +1,12 @@
package participant
import (
"errors"
"fmt"
"github.com/go-resty/resty/v2"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/database"
"net/http"
"sync"
)
const (
@ -29,22 +32,58 @@ const (
TaskLog = "/ai/task/log"
TaskTrainingDetail = "/ai/task/train/detail"
TaskInferenceDetail = "/ai/task/infer/detail"
Localhost = "http://localhost:8080"
)
type Ai struct {
store *database.AiStorage
store *database.AiStorage
idAddr sync.Map
}
func NewAi() (*Ai, error) {
func New(store *database.AiStorage, adapterId string) (*Ai, error) {
if store == nil {
return nil, errors.New("store cannot be nil")
}
a := &Ai{
store: store,
}
css, err := store.GetClustersByAdapterId(adapterId)
if err != nil {
return nil, fmt.Errorf("failed to get clusters: %w", err)
}
for _, info := range css.List {
a.idAddr.Store(info.Id, info.Server)
}
InitClient()
return &Ai{}, nil
return a, nil
}
func (a *Ai) UpdateAddr(id string, addr string) {
a.idAddr.Store(id, addr)
}
func (a *Ai) GetServerAddrById(id string) (string, bool) {
val, ok := a.idAddr.Load(id)
if !ok {
return "", false
}
addr, ok := val.(string)
if !ok {
return "", false
}
return addr, true
}
func (a *Ai) AlgorithmById(platformId string) (resp *Resp, err error) {
addr, ok := a.GetServerAddrById(platformId)
if !ok {
return nil, fmt.Errorf("clusterId not found: %s", platformId)
}
respErr := &RespErr{}
_, err = Request(Localhost+AlgorithmById, http.MethodGet, func(req *resty.Request) {
_, err = Request(addr+AlgorithmById, http.MethodGet, func(req *resty.Request) {
req.SetQueryParams(map[string]string{
"pfId": platformId,
}).SetError(&respErr).SetResult(&resp)
@ -56,8 +95,12 @@ func (a *Ai) AlgorithmById(platformId string) (resp *Resp, err error) {
}
func (a *Ai) AlgorithmCreateById(platformId string, param *CreateParam) (resp *Resp, err error) {
addr, ok := a.GetServerAddrById(platformId)
if !ok {
return nil, fmt.Errorf("clusterId not found: %s", platformId)
}
respErr := &RespErr{}
_, err = Request(Localhost+AlgorithmCreateById, http.MethodPost, func(req *resty.Request) {
_, err = Request(addr+AlgorithmCreateById, http.MethodPost, func(req *resty.Request) {
req.SetQueryParams(map[string]string{
"pfId": platformId,
}).SetBody(param).SetError(&respErr).SetResult(&resp)
@ -69,8 +112,12 @@ func (a *Ai) AlgorithmCreateById(platformId string, param *CreateParam) (resp *R
}
func (a *Ai) DatasetCreateById(platformId string, param *CreateParam) (resp *Resp, err error) {
addr, ok := a.GetServerAddrById(platformId)
if !ok {
return nil, fmt.Errorf("clusterId not found: %s", platformId)
}
respErr := &RespErr{}
_, err = Request(Localhost+DatasetCreateById, http.MethodPost, func(req *resty.Request) {
_, err = Request(addr+DatasetCreateById, http.MethodPost, func(req *resty.Request) {
req.SetQueryParams(map[string]string{
"pfId": platformId,
}).SetBody(param).SetError(&respErr).SetResult(&resp)
@ -82,8 +129,12 @@ func (a *Ai) DatasetCreateById(platformId string, param *CreateParam) (resp *Res
}
func (a *Ai) ModelCreateById(platformId string, param *CreateParam) (resp *Resp, err error) {
addr, ok := a.GetServerAddrById(platformId)
if !ok {
return nil, fmt.Errorf("clusterId not found: %s", platformId)
}
respErr := &RespErr{}
_, err = Request(Localhost+ModelCreateById, http.MethodPost, func(req *resty.Request) {
_, err = Request(addr+ModelCreateById, http.MethodPost, func(req *resty.Request) {
req.SetQueryParams(map[string]string{
"pfId": platformId,
}).SetBody(param).SetError(&respErr).SetResult(&resp)
@ -95,8 +146,12 @@ func (a *Ai) ModelCreateById(platformId string, param *CreateParam) (resp *Resp,
}
func (a *Ai) TaskCreateTrain(platformId string, param *TaskCreateParam) (resp *Resp, err error) {
addr, ok := a.GetServerAddrById(platformId)
if !ok {
return nil, fmt.Errorf("clusterId not found: %s", platformId)
}
respErr := &RespErr{}
_, err = Request(Localhost+TaskCreateTrain, http.MethodPost, func(req *resty.Request) {
_, err = Request(addr+TaskCreateTrain, http.MethodPost, func(req *resty.Request) {
req.SetQueryParams(map[string]string{
"pfId": platformId,
}).SetBody(param).SetError(&respErr).SetResult(&resp)
@ -108,8 +163,12 @@ func (a *Ai) TaskCreateTrain(platformId string, param *TaskCreateParam) (resp *R
}
func (a *Ai) TaskResultSync(platformId string, param *TaskResultSyncParam) (resp *Resp, err error) {
addr, ok := a.GetServerAddrById(platformId)
if !ok {
return nil, fmt.Errorf("clusterId not found: %s", platformId)
}
respErr := &RespErr{}
_, err = Request(Localhost+TaskResultSync, http.MethodPost, func(req *resty.Request) {
_, err = Request(addr+TaskResultSync, http.MethodPost, func(req *resty.Request) {
req.SetQueryParams(map[string]string{
"pfId": platformId,
}).SetBody(param).SetError(&respErr).SetResult(&resp)
@ -121,8 +180,12 @@ func (a *Ai) TaskResultSync(platformId string, param *TaskResultSyncParam) (resp
}
func (a *Ai) TaskLog(platformId string, taskId string) (resp *Resp, err error) {
addr, ok := a.GetServerAddrById(platformId)
if !ok {
return nil, fmt.Errorf("clusterId not found: %s", platformId)
}
respErr := &RespErr{}
_, err = Request(Localhost+TaskLog, http.MethodGet, func(req *resty.Request) {
_, err = Request(addr+TaskLog, http.MethodGet, func(req *resty.Request) {
req.SetQueryParams(map[string]string{
"pfId": platformId,
"taskId": taskId,
@ -135,8 +198,12 @@ func (a *Ai) TaskLog(platformId string, taskId string) (resp *Resp, err error) {
}
func (a *Ai) TaskTrainingDetail(platformId string, taskId string) (resp *Resp, err error) {
addr, ok := a.GetServerAddrById(platformId)
if !ok {
return nil, fmt.Errorf("clusterId not found: %s", platformId)
}
respErr := &RespErr{}
_, err = Request(Localhost+TaskTrainingDetail, http.MethodGet, func(req *resty.Request) {
_, err = Request(addr+TaskTrainingDetail, http.MethodGet, func(req *resty.Request) {
req.SetQueryParams(map[string]string{
"pfId": platformId,
"taskId": taskId,
@ -149,8 +216,12 @@ func (a *Ai) TaskTrainingDetail(platformId string, taskId string) (resp *Resp, e
}
func (a *Ai) TaskInferenceDetail(platformId string, taskId string) (resp *Resp, err error) {
addr, ok := a.GetServerAddrById(platformId)
if !ok {
return nil, fmt.Errorf("clusterId not found: %s", platformId)
}
respErr := &RespErr{}
_, err = Request(Localhost+TaskInferenceDetail, http.MethodGet, func(req *resty.Request) {
_, err = Request(addr+TaskInferenceDetail, http.MethodGet, func(req *resty.Request) {
req.SetQueryParams(map[string]string{
"pfId": platformId,
"taskId": taskId,

View File

@ -123,7 +123,7 @@ func NewServiceContext(c config.Config) *ServiceContext {
panic(err)
}
scheduler := scheduler.NewSchdlr(aiService, storage, hpcStorage, hpcService)
ai, err := participant.NewAi()
ai, err := participant.New(storage, c.Participant.AdapterId)
if err != nil {
logx.Error(err.Error())
panic(err)