This commit is contained in:
tzwang 2025-03-07 15:58:23 +08:00
commit a8bd2234f1
17 changed files with 96 additions and 67 deletions

View File

@ -902,6 +902,8 @@ type (
RegionDict string `json:"regionDict,optional"`
RegionName string `json:"regionName,optional"`
Environment map[string]string `json:"environment,optional"`
CostType string `json:"costType,optional"`
Price int `json:"price,optional"`
}
ClusterInfo {
Id string `json:"id,omitempty" db:"id"`
@ -1366,7 +1368,7 @@ type (
}
)
type ResourcePrice struct {
type ResourcePrice {
ID int64 `json:"id" gorm:"column:id;primaryKey;autoIncrement"`
ResourceID int64 `json:"resourceId" gorm:"column:resource_id"`
Price int `json:"price" gorm:"column:price"`
@ -1374,7 +1376,7 @@ type ResourcePrice struct {
ResourceType string `json:"resourceType" gorm:"column:resource_type"`
}
type ResourceCostRecord struct {
type ResourceCostRecord {
ID int64 `json:"id" gorm:"column:id;primaryKey;autoIncrement"`
ResourcePriceID int64 `json:"resourcePriceId" gorm:"column:resource_price_id"`
UserId int64 `json:"userId" gorm:"column:user_id"`

View File

@ -30,8 +30,9 @@ type (
StdInput string `json:"stdInput,optional"`
ClusterType string `json:"clusterType,optional"`
Partition string `json:"partition"`
UserId int64 `json:"userId,optional"`
Token string `json:"token,optional"`
UserId int64 `json:"userId,optional"`
Token string `json:"token,optional"`
UserIp string `json:"userIp,optional"`
}
commitHpcTaskResp {
@ -155,24 +156,24 @@ type QueueAsset {
type (
/******************instance center*************************/
HpcInstanceCenterReq{
InstanceType int32 `form:"instanceType,optional"`
InstanceClass string `form:"instanceClass,optional"`
InstanceName string `form:"instanceName,optional"`
HpcInstanceCenterReq {
InstanceType int32 `form:"instanceType,optional"`
InstanceClass string `form:"instanceClass,optional"`
InstanceName string `form:"instanceName,optional"`
PageInfo
}
HpcInstanceCenterResp {
InstanceCenterList []HpcInstanceCenterList `json:"instanceCenterList" copier:"InstanceCenterList"`
TotalCount int `json:"totalCount"`
TotalCount int `json:"totalCount"`
}
HpcInstanceCenterList {
LogoPath string `json:"logo_path"`
InstanceName string `json:"instance_name"`
InstanceType int32 `json:"instance_type"`
InstanceClass string `json:"instance_class"`
InstanceClassChinese string `json:"instance_class_chinese"`
Description string `json:"description"`
Version string `json:"version"`
LogoPath string `json:"logo_path"`
InstanceName string `json:"instance_name"`
InstanceType int32 `json:"instance_type"`
InstanceClass string `json:"instance_class"`
InstanceClassChinese string `json:"instance_class_chinese"`
Description string `json:"description"`
Version string `json:"version"`
}
/******************instance center*************************/

View File

@ -7,6 +7,7 @@ import (
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/types"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/models"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/repository/result"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/utils"
"k8s.io/apimachinery/pkg/util/json"
"net/http"
)
@ -18,6 +19,9 @@ func CommitHpcTaskHandler(svcCtx *svc.ServiceContext) http.HandlerFunc {
result.ParamErrorResult(r, w, err)
return
}
// 获取ip信息
ip := utils.GetClientIP(r)
req.UserIp = ip
// 获取token信息
token := r.Header.Get("Authorization")
req.Token = token

View File

@ -2,6 +2,7 @@ package schedule
import (
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/repository/result"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/utils"
"net/http"
"github.com/zeromicro/go-zero/rest/httpx"
@ -17,7 +18,9 @@ func ScheduleCreateTaskHandler(svcCtx *svc.ServiceContext) http.HandlerFunc {
result.ParamErrorResult(r, w, err)
return
}
// 获取ip信息
ip := utils.GetClientIP(r)
req.UserIp = ip
// 获取token信息
token := r.Header.Get("Authorization")
req.Token = token

View File

@ -6,6 +6,7 @@ import (
"fmt"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/svc"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/types"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/constants"
tool "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/utils"
"gorm.io/gorm"
"io/ioutil"
@ -63,7 +64,13 @@ func (l *CreateClusterLogic) CreateCluster(req *types.ClusterCreateReq) (resp *t
logx.Errorf(tx.Error.Error())
return nil, errors.New("cluster create failed")
}
// 创建资源价格信息
resourcePrice := &types.ResourcePrice{
Price: req.Price,
ResourceType: constants.CLUSTER,
CostType: req.CostType,
}
tx = l.svcCtx.DbEngin.Table("resource_cost").Create(resourcePrice)
// push cluster info to adapter
go func() {
var adapterServer string

View File

@ -134,8 +134,13 @@ func (l *CommitGeneralTaskLogic) CommitGeneralTask(req *types.GeneralTaskReq) er
db := tx.Table("task").Create(&taskModel)
db = tx.Table("task_cloud").Create(&taskClouds)
db = tx.Table("t_notice").Create(&noticeInfo)
if db.Error != nil {
logx.Errorf("Task creation failure, err: %v", db.Error)
return errors.New("task creation failure")
}
// 数据上链
bytes, err := json.Marshal(taskModel)
bytes, _ := json.Marshal(taskModel)
if err != nil {
return err
}
@ -148,10 +153,6 @@ func (l *CommitGeneralTaskLogic) CommitGeneralTask(req *types.GeneralTaskReq) er
Token: req.Token,
Args: []string{strconv.FormatInt(taskModel.Id, 10), string(bytes)},
})
if db.Error != nil {
logx.Errorf("Task creation failure, err: %v", db.Error)
return errors.New("task creation failure")
}
return nil
}

View File

@ -97,10 +97,6 @@ func (l *CommitHpcTaskLogic) CommitHpcTask(req *types.CommitHpcTaskReq) (resp *t
logx.Info("提交job到指定集群")
jobId, err := submitJob(&hpcInfo, &clusterInfo, server)
//if err != nil {
// return nil, err
//}
//logx.Info("提交job到指定集群完成")
// 保存操作记录
noticeInfo := clientCore.NoticeInfo{
AdapterId: clusterInfo.AdapterId,
@ -122,6 +118,7 @@ func (l *CommitHpcTaskLogic) CommitHpcTask(req *types.CommitHpcTaskReq) (resp *t
// 数据上链
bytes, _ := json.Marshal(taskModel)
remoteUtil.Evidence(remoteUtil.EvidenceParam{
UserIp: req.UserIp,
Url: l.svcCtx.Config.BlockChain.Url,
ContractAddress: l.svcCtx.Config.BlockChain.ContractAddress,
FunctionName: l.svcCtx.Config.BlockChain.FunctionName,

View File

@ -166,7 +166,7 @@ func (l *ScheduleCreateTaskLogic) ScheduleCreateTask(req *types.CreateTaskReq) (
// filter data distribution
clustersWithDataDistributes := generateFilteredDataDistributes(assignedClusters, req.DataDistributes)
taskId, err := l.createTask(taskName, req.Description, req.JobResources.ScheduleStrategy, clustersWithDataDistributes, req.Token)
taskId, err := l.createTask(taskName, req.Description, req.JobResources.ScheduleStrategy, clustersWithDataDistributes, req.Token, req.UserIp)
if err != nil {
return nil, err
}
@ -196,7 +196,7 @@ func (l *ScheduleCreateTaskLogic) ScheduleCreateTask(req *types.CreateTaskReq) (
// filter data distribution
clustersWithDataDistributes := generateFilteredDataDistributes(assignedClusters, req.DataDistributes)
taskId, err := l.createTask(taskName, req.Description, req.JobResources.ScheduleStrategy, clustersWithDataDistributes, req.Token)
taskId, err := l.createTask(taskName, req.Description, req.JobResources.ScheduleStrategy, clustersWithDataDistributes, req.Token, req.UserIp)
if err != nil {
return nil, err
}
@ -332,7 +332,7 @@ func copyParams(clusters []*strategy.AssignedCluster, clusterInfos []*types.JobC
return result
}
func (l *ScheduleCreateTaskLogic) createTask(taskName string, desc string, strategyName string, clustersWithDataDistributes *ClustersWithDataDistributes, token string) (int64, error) {
func (l *ScheduleCreateTaskLogic) createTask(taskName string, desc string, strategyName string, clustersWithDataDistributes *ClustersWithDataDistributes, token string, userIp string) (int64, error) {
var synergyStatus int64
if len(clustersWithDataDistributes.Clusters) > 1 {
synergyStatus = 1
@ -343,7 +343,7 @@ func (l *ScheduleCreateTaskLogic) createTask(taskName string, desc string, strat
fmt.Printf("Error while Marshaling. %v", err)
}
taskId, err := l.svcCtx.Scheduler.CreateTask(taskName, desc, synergyStatus, strategyName, string(y), token, &l.svcCtx.Config)
taskId, err := l.svcCtx.Scheduler.CreateTask(taskName, desc, synergyStatus, strategyName, string(y), token, userIp, &l.svcCtx.Config)
if err != nil {
return 0, err
}

View File

@ -152,7 +152,7 @@ func updateClustersByScheduledDatas(taskId int64, clustersWithDataDistributes *C
}{}
err := json.Unmarshal([]byte(c.JsonData), &jsonData)
if err != nil {
return nil, fmt.Errorf("pass-in jsonData convert failed, task %d, cluster %s, datatype %s", taskId, cluster.ClusterId, "dataset")
return nil, fmt.Errorf("pass-in jsonData convert failed, task %d, cluster %s, datatype: %s", taskId, cluster.ClusterId, "dataset")
}
cluster.DatasetId = jsonData.Id
}
@ -174,7 +174,7 @@ func updateClustersByScheduledDatas(taskId int64, clustersWithDataDistributes *C
}{}
err := json.Unmarshal([]byte(c.JsonData), &jsonData)
if err != nil {
return nil, fmt.Errorf("pass-in jsonData convert failed, task %d, cluster %s, datatype %s", taskId, cluster.ClusterId, "image")
return nil, fmt.Errorf("pass-in jsonData convert failed, task %d, cluster %s, datatype: %s", taskId, cluster.ClusterId, "image")
}
cluster.ImageId = jsonData.Id
}
@ -196,7 +196,7 @@ func updateClustersByScheduledDatas(taskId int64, clustersWithDataDistributes *C
}{}
err := json.Unmarshal([]byte(c.JsonData), &jsonData)
if err != nil {
return nil, fmt.Errorf("pass-in jsonData convert failed, task %d, cluster %s, datatype %s", taskId, cluster.ClusterId, "code")
return nil, fmt.Errorf("pass-in jsonData convert failed, task %d, cluster %s, datatype: %s", taskId, cluster.ClusterId, "code")
}
cluster.CodeId = jsonData.Id
}
@ -218,7 +218,7 @@ func updateClustersByScheduledDatas(taskId int64, clustersWithDataDistributes *C
}{}
err := json.Unmarshal([]byte(c.JsonData), &jsonData)
if err != nil {
return nil, fmt.Errorf("pass-in jsonData convert failed, task %d, cluster %s, datatype %s", taskId, cluster.ClusterId, "model")
return nil, fmt.Errorf("pass-in jsonData convert failed, task %d, cluster %s, datatype: %s", taskId, cluster.ClusterId, "model")
}
cluster.ModelId = jsonData.Id
}
@ -245,7 +245,7 @@ func updateClustersByScheduledDatas(taskId int64, clustersWithDataDistributes *C
}{}
err := json.Unmarshal([]byte(c.JsonData), &jsonData)
if err != nil {
return nil, fmt.Errorf("db yaml jsonData convert failed, task %d, cluster %s, datatype %s", taskId, cluster.ClusterId, "dataset")
return nil, fmt.Errorf("db yaml jsonData convert failed, task %d, cluster %s, datatype: %s", taskId, cluster.ClusterId, "dataset")
}
cluster.DatasetId = jsonData.Id
}
@ -266,7 +266,7 @@ func updateClustersByScheduledDatas(taskId int64, clustersWithDataDistributes *C
}{}
err := json.Unmarshal([]byte(c.JsonData), &jsonData)
if err != nil {
return nil, fmt.Errorf("db yaml jsonData convert failed, task %d, cluster %s, datatype %s", taskId, cluster.ClusterId, "image")
return nil, fmt.Errorf("db yaml jsonData convert failed, task %d, cluster %s, datatype: %s", taskId, cluster.ClusterId, "image")
}
cluster.ImageId = jsonData.Id
}
@ -287,7 +287,7 @@ func updateClustersByScheduledDatas(taskId int64, clustersWithDataDistributes *C
}{}
err := json.Unmarshal([]byte(c.JsonData), &jsonData)
if err != nil {
return nil, fmt.Errorf("db yaml jsonData convert failed, task %d, cluster %s, datatype %s", taskId, cluster.ClusterId, "code")
return nil, fmt.Errorf("db yaml jsonData convert failed, task %d, cluster %s, datatype: %s", taskId, cluster.ClusterId, "code")
}
cluster.CodeId = jsonData.Id
}
@ -308,7 +308,7 @@ func updateClustersByScheduledDatas(taskId int64, clustersWithDataDistributes *C
}{}
err := json.Unmarshal([]byte(c.JsonData), &jsonData)
if err != nil {
return nil, fmt.Errorf("jsonData convert failed, task %d, cluster %s, datatype %s", taskId, cluster.ClusterId, "model")
return nil, fmt.Errorf("jsonData convert failed, task %d, cluster %s, datatype: %s", taskId, cluster.ClusterId, "model")
}
cluster.ModelId = jsonData.Id
}

View File

@ -65,7 +65,7 @@ func (l *ScheduleSubmitLogic) ScheduleSubmit(req *types.ScheduleReq) (resp *type
synergystatus = 1
}
taskId, err := l.svcCtx.Scheduler.CreateTask(req.AiOption.TaskName, "", synergystatus, req.AiOption.Strategy, "", req.Token, &l.svcCtx.Config)
taskId, err := l.svcCtx.Scheduler.CreateTask(req.AiOption.TaskName, "", synergystatus, req.AiOption.Strategy, "", req.Token, "", &l.svcCtx.Config)
if err != nil {
return nil, err
}

View File

@ -188,13 +188,13 @@ func (s *Scheduler) SaveToDb() error {
return nil
}
func (s *Scheduler) CreateTask(taskName string, desc string, synergyCode int64, strategyName string, yaml string, token string, config *config.Config) (int64, error) {
func (s *Scheduler) CreateTask(taskName string, desc string, synergyCode int64, strategyName string, yaml string, token string, userIp string, config *config.Config) (int64, error) {
strategyCode, err := s.AiStorages.GetStrategyCode(strategyName)
if err != nil {
return 0, err
}
id, err := s.AiStorages.SaveTask(taskName, desc, strategyCode, synergyCode, "10", yaml, blockchain.SaveToChain(token, config))
id, err := s.AiStorages.SaveTask(taskName, desc, strategyCode, synergyCode, "10", yaml, blockchain.SaveToChain(token, userIp, config))
if err != nil {
return 0, err
}

View File

@ -241,7 +241,7 @@ func (as *AiScheduler) handleErrors(errs []interface{}, clusters []*strategy.Ass
var taskId int64
switch mode {
case executor.SUBMIT_MODE_JOINT_CLOUD:
tid, err := as.CreateTask(as.option.TaskName, "", synergystatus, as.option.StrategyName, "", "", nil)
tid, err := as.CreateTask(as.option.TaskName, "", synergystatus, as.option.StrategyName, "", "", "", nil)
if err != nil {
return err
}

View File

@ -8,7 +8,7 @@ import (
"strconv"
)
func SaveToChain(token string, config *config.Config) func(task models.Task, id int64) error {
func SaveToChain(token string, userIp string, config *config.Config) func(task models.Task, id int64) error {
return func(task models.Task, id int64) error {
if token == "" {
return nil
@ -19,6 +19,7 @@ func SaveToChain(token string, config *config.Config) func(task models.Task, id
return err
}
err = remoteUtil.Evidence(remoteUtil.EvidenceParam{
UserIp: userIp,
Url: config.BlockChain.Url,
ContractAddress: config.BlockChain.ContractAddress,
FunctionName: config.BlockChain.FunctionName,

View File

@ -204,7 +204,7 @@ func (m *ModelArtsLink) SubmitTask(ctx context.Context, imageId string, cmd stri
},
Spec: &modelarts.SpecsC{
Resource: &modelarts.ResourceCreateTraining{
FlavorId: "modelarts.kat1.xlarge",
FlavorId: resourceId,
NodeCount: 1,
},
},
@ -647,7 +647,13 @@ func (m *ModelArtsLink) Execute(ctx context.Context, option *option.AiOption, mo
if ascendNum == v.FlavorInfo.Npu.UnitNum {
option.ResourceId = v.FlavorId
break
} else if ascendNum == 3 {
} else if ascendNum <= 1 {
option.ResourceId = "modelarts.kat1.xlarge"
break
} else if ascendNum == 2 {
option.ResourceId = "modelarts.kat1.2xlarge"
break
} else if ascendNum > 2 && ascendNum <= 4 {
option.ResourceId = "modelarts.kat1.4xlarge"
break
} else if ascendNum >= 5 && ascendNum <= 8 {

View File

@ -28,7 +28,6 @@ import (
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/constants"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/utils"
"mime/multipart"
"net/http"
"strconv"
"strings"
"sync"
@ -1232,26 +1231,26 @@ func (s *ShuguangAi) GetResourceSpecs(ctx context.Context) (*collector.ResourceS
var cpureqed atomic.Int64
var dcureqed atomic.Int64
var jwg sync.WaitGroup
for _, j := range jobList.Jobs {
jwg.Add(1)
job := j
go func() {
defer jwg.Done()
h := http.Request{}
jreq := &hpcAC.JobDetailReq{
JobId: job.JobId,
}
detail, err := s.aCRpc.GetJobDetail(h.Context(), jreq)
if err != nil || detail.Data == nil {
return
}
cpureqed.Add(int64(detail.Data.ProcNumReq))
dcureqed.Add(int64(detail.Data.DcuNumReq))
}()
}
jwg.Wait()
//var jwg sync.WaitGroup
//for _, j := range jobList.Jobs {
// jwg.Add(1)
// job := j
// go func() {
// defer jwg.Done()
// h := http.Request{}
// jreq := &hpcAC.JobDetailReq{
// JobId: job.JobId,
// }
// detail, err := s.aCRpc.GetJobDetail(h.Context(), jreq)
// if err != nil || detail.Data == nil {
// return
// }
//
// cpureqed.Add(int64(detail.Data.ProcNumReq))
// dcureqed.Add(int64(detail.Data.DcuNumReq))
// }()
//}
//jwg.Wait()
for v := range ch {
switch v.Type {

View File

@ -835,6 +835,8 @@ type ClusterCreateReq struct {
RegionDict string `json:"regionDict,optional"`
RegionName string `json:"regionName,optional"`
Environment map[string]string `json:"environment,optional"`
CostType string `json:"costType,optional"`
Price int `json:"price,optional"`
}
type ClusterInfo struct {
@ -1323,6 +1325,7 @@ type CommitHpcTaskReq struct {
Partition string `json:"partition"`
UserId int64 `json:"userId,optional"`
Token string `json:"token,optional"`
UserIp string `json:"userIp,optional"`
}
type CommitHpcTaskResp struct {

5
pkg/constants/cost.go Normal file
View File

@ -0,0 +1,5 @@
package constants
const (
CLUSTER = "0"
)