Merge remote-tracking branch 'gitlink/master'

# Conflicts:
#	internal/types/types.go
This commit is contained in:
jagger 2025-06-04 19:45:55 +08:00
commit a5eda62487
15 changed files with 79 additions and 37 deletions

View File

@ -121,7 +121,6 @@ type Cloud {
}
type PodsListReq {
ClusterName string `form:"clusterName"`
}
type PodsListResp {
Data []interface{} `json:"data"`

View File

@ -102,7 +102,7 @@ type remoteResp {
type (
clustersLoadReq {
ClusterName string `form:"clusterName"`
}
clustersLoadResp {
Data interface{} `json:"data"`

View File

@ -154,6 +154,7 @@ type (
// 调度资源信息:/queryResources
QueryResourcesReq{
Type string `json:"type"`
ClusterIDs []string `json:"clusterIDs,optional"`
}

View File

@ -42,11 +42,16 @@ func AddCronGroup(svc *svc.ServiceContext) {
svc.Cron.AddFunc("0 5/5 * * * *", func() {
queryResource := schedule.NewQueryResourcesLogic(svc.HttpClient.R().Context(), svc)
rus, err := queryResource.QueryResourcesByClusterId(nil)
trainResrc, err := queryResource.QueryResourcesByClusterId(nil, "Train")
if err != nil {
logx.Error(err)
}
svc.Scheduler.AiService.LocalCache[schedule.QUERY_RESOURCES] = rus
svc.Scheduler.AiService.LocalCache[schedule.QUERY_TRAIN_RESOURCES] = trainResrc
inferResrc, err := queryResource.QueryResourcesByClusterId(nil, "Inference")
if err != nil {
logx.Error(err)
}
svc.Scheduler.AiService.LocalCache[schedule.QUERY_INFERENCE_RESOURCES] = inferResrc
})
//更新hpc任务状态

View File

@ -22,6 +22,11 @@ type ParticipantResp struct {
Data *v1.PodList `json:"data"` // 改成结构体
}
type ClusterInfo struct {
Name string `json:"name"`
Server string `json:"server"`
}
func NewPodsListLogic(ctx context.Context, svcCtx *svc.ServiceContext) *PodsListLogic {
return &PodsListLogic{
Logger: logx.WithContext(ctx),
@ -33,14 +38,14 @@ func NewPodsListLogic(ctx context.Context, svcCtx *svc.ServiceContext) *PodsList
func (l *PodsListLogic) PodsList(req *types.PodsListReq) (resp *types.PodsListResp, err error) {
resp = &types.PodsListResp{}
// query cluster http url.
var apiServerList []string
l.svcCtx.DbEngin.Raw("select server from t_adapter where resource_type = '01'").Scan(&apiServerList)
for _, server := range apiServerList {
var clusterInfoList []ClusterInfo
l.svcCtx.DbEngin.Raw("select ta.server,tc.name from t_adapter ta,t_cluster tc where ta.id = tc.adapter_id and ta.resource_type = '01'").Scan(&clusterInfoList)
for _, clusterInfo := range clusterInfoList {
participantResp := ParticipantResp{}
param := map[string]string{
"clusterName": req.ClusterName,
"clusterName": clusterInfo.Name,
}
httputils.HttpGetWithResult(param, server+"/api/v1/pod/list", &participantResp)
httputils.HttpGetWithResult(param, clusterInfo.Server+"/api/v1/pod/list", &participantResp)
resp.Data = append(resp.Data, participantResp.Data)
}

View File

@ -53,6 +53,8 @@ func (l *ScreenPageTaskLogic) ScreenPageTask(req *types.PageTaskReq) (resp *type
return nil, result.NewDefaultError(err.Error())
}
// 运行卡时数
// 查询任务列表
if err := db.Limit(limit).Offset(offset).Order("created_time desc").Find(&list).Error; err != nil {
return nil, result.NewDefaultError(err.Error())

View File

@ -2,6 +2,7 @@ package monitoring
import (
"context"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/models"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/tracker"
"time"
@ -17,6 +18,11 @@ type ClustersLoadLogic struct {
svcCtx *svc.ServiceContext
}
type ClusterInfo struct {
ClusterName string `json:"clusterName"`
Metrics []tracker.Metric `json:"metrics"`
}
func NewClustersLoadLogic(ctx context.Context, svcCtx *svc.ServiceContext) *ClustersLoadLogic {
return &ClustersLoadLogic{
Logger: logx.WithContext(ctx),
@ -27,8 +33,20 @@ func NewClustersLoadLogic(ctx context.Context, svcCtx *svc.ServiceContext) *Clus
func (l *ClustersLoadLogic) ClustersLoad(req *types.ClustersLoadReq) (resp *types.ClustersLoadResp, err error) {
resp = &types.ClustersLoadResp{}
metrics := []string{"cluster_cpu_utilisation", "cluster_cpu_avail", "cluster_cpu_total", "cluster_memory_total", "cluster_memory_avail", "cluster_memory_utilisation", "cluster_disk_utilisation", "cluster_disk_avail", "cluster_disk_total", "cluster_pod_utilisation", "cluster_node_count"}
result := l.svcCtx.PromClient.GetNamedMetrics(metrics, time.Now(), tracker.ClusterOption{ClusterName: req.ClusterName})
// 查询集群列表
var clustersModel []models.ComputeCluster
l.svcCtx.DbEngin.Raw("select * from t_cluster where label = 'kubernetes'").Scan(&clustersModel)
var result []ClusterInfo
for _, cluster := range clustersModel {
metrics := []string{"cluster_cpu_utilisation", "cluster_cpu_avail", "cluster_cpu_total", "cluster_memory_total", "cluster_memory_avail", "cluster_memory_utilisation", "cluster_disk_utilisation", "cluster_disk_avail", "cluster_disk_total", "cluster_pod_utilisation", "cluster_node_count"}
data := l.svcCtx.PromClient.GetNamedMetrics(metrics, time.Now(), tracker.ClusterOption{ClusterName: cluster.Name.String})
clusterInfo := ClusterInfo{
ClusterName: cluster.Name.String,
Metrics: data,
}
result = append(result, clusterInfo)
}
resp.Data = result
return resp, nil
}

View File

@ -15,8 +15,9 @@ import (
)
const (
ADAPTERID = "1777144940459986944" // 异构适配器id
QUERY_RESOURCES = "query_resources"
ADAPTERID = "1777144940459986944" // 异构适配器id
QUERY_TRAIN_RESOURCES = "train_resources"
QUERY_INFERENCE_RESOURCES = "inference_resources"
)
type QueryResourcesLogic struct {
@ -41,25 +42,29 @@ func (l *QueryResourcesLogic) QueryResources(req *types.QueryResourcesReq) (resp
if err != nil {
return nil, err
}
resources, ok := l.svcCtx.Scheduler.AiService.LocalCache[QUERY_RESOURCES]
if ok {
specs, ok := resources.([]*collector.ResourceSpec)
if ok {
results := handleEmptyResourceUsage(cs.List, specs)
resp.Data = results
return resp, nil
}
var resources interface{}
switch req.Type {
case "Train":
resources, _ = l.svcCtx.Scheduler.AiService.LocalCache[QUERY_TRAIN_RESOURCES]
case "Inference":
resources, _ = l.svcCtx.Scheduler.AiService.LocalCache[QUERY_INFERENCE_RESOURCES]
default:
resources, _ = l.svcCtx.Scheduler.AiService.LocalCache[QUERY_TRAIN_RESOURCES]
}
rus, err := l.QueryResourcesByClusterId(cs.List)
specs, ok := resources.([]*collector.ResourceSpec)
if ok {
results := handleEmptyResourceUsage(cs.List, specs)
resp.Data = results
return resp, nil
}
rus, err := l.QueryResourcesByClusterId(cs.List, req.Type)
if err != nil {
return nil, err
}
if checkCachingCondition(cs.List, rus) {
l.svcCtx.Scheduler.AiService.LocalCache[QUERY_RESOURCES] = rus
}
results := handleEmptyResourceUsage(cs.List, rus)
resp.Data = results
@ -77,7 +82,7 @@ func (l *QueryResourcesLogic) QueryResources(req *types.QueryResourcesReq) (resp
return nil, errors.New("no clusters found ")
}
rus, err := l.QueryResourcesByClusterId(clusters)
rus, err := l.QueryResourcesByClusterId(clusters, req.Type)
if err != nil {
return nil, err
}
@ -89,7 +94,7 @@ func (l *QueryResourcesLogic) QueryResources(req *types.QueryResourcesReq) (resp
return resp, nil
}
func (l *QueryResourcesLogic) QueryResourcesByClusterId(clusterinfos []types.ClusterInfo) ([]*collector.ResourceSpec, error) {
func (l *QueryResourcesLogic) QueryResourcesByClusterId(clusterinfos []types.ClusterInfo, resrcType string) ([]*collector.ResourceSpec, error) {
var clusters []types.ClusterInfo
if len(clusterinfos) == 0 {
cs, err := l.svcCtx.Scheduler.AiStorages.GetClustersByAdapterId(ADAPTERID)
@ -121,7 +126,7 @@ func (l *QueryResourcesLogic) QueryResourcesByClusterId(clusterinfos []types.Clu
return
}
u, err = col.GetResourceSpecs(l.ctx)
u, err = col.GetResourceSpecs(l.ctx, resrcType)
if err != nil {
done <- true
return

View File

@ -218,7 +218,7 @@ func (l *ScheduleCreateTaskLogic) getAssignedClustersByStrategy(resources *types
var resCount int
for i := 0; i < QUERY_RESOURCE_RETRY; i++ {
defer time.Sleep(time.Second)
qResources, err := l.queryResource.QueryResourcesByClusterId(nil)
qResources, err := l.queryResource.QueryResourcesByClusterId(nil, "Train")
if err != nil {
continue
}

View File

@ -14,7 +14,7 @@ type AiCollector interface {
UploadAlgorithmCode(ctx context.Context, resourceType string, card string, taskType string, dataset string, algorithm string, code string) error
GetComputeCards(ctx context.Context) ([]string, error)
GetUserBalance(ctx context.Context) (float64, error)
GetResourceSpecs(ctx context.Context) (*ResourceSpec, error)
GetResourceSpecs(ctx context.Context, resrcType string) (*ResourceSpec, error)
}
type ResourceSpec struct {

View File

@ -996,7 +996,7 @@ func (m *ModelArtsLink) CheckImageExist(ctx context.Context, option *option.Infe
return errors.New("failed to find Image ")
}
func (m *ModelArtsLink) GetResourceSpecs(ctx context.Context) (*collector.ResourceSpec, error) {
func (m *ModelArtsLink) GetResourceSpecs(ctx context.Context, resrcType string) (*collector.ResourceSpec, error) {
var wg sync.WaitGroup
//查询modelarts资源规格
req := &modelarts.GetResourceFlavorsReq{}

View File

@ -1279,7 +1279,7 @@ func (o *OctopusLink) CheckModelExistence(ctx context.Context, name string, mtyp
return true
}
func (o *OctopusLink) GetResourceSpecs(ctx context.Context) (*collector.ResourceSpec, error) {
func (o *OctopusLink) GetResourceSpecs(ctx context.Context, resrcType string) (*collector.ResourceSpec, error) {
res := &collector.ResourceSpec{
ClusterId: strconv.FormatInt(o.participantId, 10),
Resources: make([]interface{}, 0),

View File

@ -770,7 +770,14 @@ func (o *OpenI) GetUserBalance(ctx context.Context) (float64, error) {
return 0, errors.New("failed to implement")
}
func (o *OpenI) GetResourceSpecs(ctx context.Context) (*collector.ResourceSpec, error) {
func (o *OpenI) GetResourceSpecs(ctx context.Context, resrcType string) (*collector.ResourceSpec, error) {
var jobType string
if resrcType == "Inference" {
jobType = ONLINEINFERENCE
} else if resrcType == "Train" {
jobType = TRAIN
}
var resources []interface{}
res := &collector.ResourceSpec{
ClusterId: strconv.FormatInt(o.participantId, 10),
@ -795,7 +802,7 @@ func (o *OpenI) GetResourceSpecs(ctx context.Context) (*collector.ResourceSpec,
param := model.TaskCreationRequiredParam{
UserName: o.userName,
RepoName: TESTREPO,
JobType: TRAIN,
JobType: jobType,
ComputeSource: ComputeSource[i],
ClusterType: C2NET,
}

View File

@ -1103,7 +1103,7 @@ func (s *ShuguangAi) CheckModelExistence(ctx context.Context, name string, mtype
return resp.Data.Exist
}
func (s *ShuguangAi) GetResourceSpecs(ctx context.Context) (*collector.ResourceSpec, error) {
func (s *ShuguangAi) GetResourceSpecs(ctx context.Context, resrcType string) (*collector.ResourceSpec, error) {
return nil, nil
//var timeout = 5
//var wg sync.WaitGroup

View File

@ -118,6 +118,6 @@ func (o Template) GetUserBalance(ctx context.Context) (float64, error) {
}
// GetResourceSpecs 查询资源规格
func (o Template) GetResourceSpecs(ctx context.Context) (*collector.ResourceSpec, error) {
func (o Template) GetResourceSpecs(ctx context.Context, resrcType string) (*collector.ResourceSpec, error) {
return nil, errors.New("failed to implement")
}