调度结构修改4

This commit is contained in:
tzwang 2023-08-24 16:39:32 +08:00
parent 5f029e5098
commit 4e6978ce83
9 changed files with 64 additions and 92 deletions

View File

@ -46,7 +46,7 @@ func NewK8sStrategy(task *Task, providers ...*Provider) *k8sStrategy {
return &k8sStrategy{ProviderList: providerList, Task: task, StrategyList: strategyList}
}
func (ps k8sStrategy) computeMaxScore() (*Task, error) {
func (ps *k8sStrategy) computeMaxScore() (*Task, error) {
maxStrategy := NewStrategy()
var maxprofit float64

View File

@ -31,6 +31,12 @@ func (l *ScheduleAiMq) Consume(_, val string) error {
}
schdl.MatchLabels(l.svcCtx.DbEngin)
// 调度算法
err = schdl.AssignAndSchedule()
if err != nil {
return err
}
// 存储数据
err = schdl.SaveToDb(l.svcCtx.DbEngin)
if err != nil {

View File

@ -1,16 +1,9 @@
package kq
import (
"bytes"
"context"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/pkg/scheduler"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/svc"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/model"
"io"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
syaml "k8s.io/apimachinery/pkg/runtime/serializer/yaml"
kyaml "k8s.io/apimachinery/pkg/util/yaml"
)
/*
@ -29,51 +22,23 @@ func NewScheduleCloudMq(ctx context.Context, svcCtx *svc.ServiceContext) *Schedu
}
}
func UnMarshalK8sStruct(yamlString string, taskId int64) model.Cloud {
var cloud model.Cloud
d := kyaml.NewYAMLOrJSONDecoder(bytes.NewBufferString(yamlString), 4096)
var err error
for {
var rawObj runtime.RawExtension
err = d.Decode(&rawObj)
if err == io.EOF {
break
}
if err != nil {
}
obj := &unstructured.Unstructured{}
syaml.NewDecodingSerializer(unstructured.UnstructuredJSONScheme).Decode(rawObj.Raw, nil, obj)
if err != nil {
}
unstructuredMap, err := runtime.DefaultUnstructuredConverter.ToUnstructured(obj)
if err != nil {
}
unstructureObj := &unstructured.Unstructured{Object: unstructuredMap}
cloud = model.Cloud{
TaskId: taskId,
ApiVersion: unstructureObj.GetAPIVersion(),
Name: unstructureObj.GetName(),
Kind: unstructureObj.GetKind(),
Namespace: unstructureObj.GetNamespace(),
Status: "Saved",
}
}
return cloud
}
func (l *ScheduleCloudMq) Consume(_, val string) error {
// 接受消息, 根据标签筛选过滤
cloudScheduler := scheduler.NewCloudScheduler()
scheduler, err := scheduler.NewScheduler(cloudScheduler, val)
schdl, err := scheduler.NewScheduler(cloudScheduler, val)
if err != nil {
return err
}
schdl.MatchLabels(l.svcCtx.DbEngin)
// 调度算法
err = schdl.AssignAndSchedule()
if err != nil {
return err
}
scheduler.MatchLabels(l.svcCtx.DbEngin)
// 存储数据
err = scheduler.SaveToDb(l.svcCtx.DbEngin)
err = schdl.SaveToDb(l.svcCtx.DbEngin)
if err != nil {
return err
}

View File

@ -31,6 +31,12 @@ func (l *ScheduleHpcMq) Consume(_, val string) error {
}
schdl.MatchLabels(l.svcCtx.DbEngin)
// 调度算法
err = schdl.AssignAndSchedule()
if err != nil {
return err
}
// 存储数据
err = schdl.SaveToDb(l.svcCtx.DbEngin)
if err != nil {

View File

@ -15,17 +15,17 @@ func NewAiScheduler(val string) *aiScheduler {
return &aiScheduler{yamlString: val}
}
func (cs *aiScheduler) getNewStructForDb(task *types.TaskInfo, participantIds []int64) (interface{}, error) {
func (as *aiScheduler) getNewStructForDb(task *types.TaskInfo, participantId int64) (interface{}, error) {
ai := model.Ai{
ParticipantId: participantIds[0],
ParticipantId: participantId,
TaskId: task.TaskId,
Status: "Saved",
YamlString: cs.yamlString,
YamlString: as.yamlString,
}
tool.Convert(task.Metadata, &ai)
return ai, nil
}
func (cs *aiScheduler) pickOptimalStrategy(task *algo.Task, providers ...*algo.Provider) (*algo.Task, error) {
func (as *aiScheduler) pickOptimalStrategy(task *algo.Task, providers ...*algo.Provider) (*algo.Task, error) {
return nil, nil
}

View File

@ -30,7 +30,7 @@ func (cs *cloudScheduler) pickOptimalStrategy(task *algo.Task, providers ...*alg
return taskResult, nil
}
func (cs *cloudScheduler) getNewStructForDb(task *types.TaskInfo, participantIds []int64) (interface{}, error) {
func (cs *cloudScheduler) getNewStructForDb(task *types.TaskInfo, participantId int64) (interface{}, error) {
bytes, err := json.Marshal(task.Metadata)
if err != nil {
return nil, err
@ -38,9 +38,7 @@ func (cs *cloudScheduler) getNewStructForDb(task *types.TaskInfo, participantIds
cloud := cs.UnMarshalK8sStruct(string(bytes), task.TaskId)
cloud.Id = tool.GenSnowflakeID()
cloud.YamlString = string(bytes)
if len(participantIds) != 0 {
cloud.ParticipantId = participantIds[0]
}
cloud.ParticipantId = participantId
return cloud, nil
}

View File

@ -8,28 +8,10 @@ import (
)
type scheduleService interface {
getNewStructForDb(task *types.TaskInfo, participantIds []int64) (interface{}, error)
getNewStructForDb(task *types.TaskInfo, participantId int64) (interface{}, error)
pickOptimalStrategy(task *algo.Task, providers ...*algo.Provider) (*algo.Task, error)
}
//func MatchLabels(dbEngin *gorm.DB, task *types.TaskInfo) ([]int64, error) {
// var ids []int64
// count := 0
// for key := range task.MatchLabels {
// var participantId []int64
// dbEngin.Raw("select participant_id from sc_participant_label_info where `key` = ? and value = ?", key, task.MatchLabels[key]).Scan(&participantId)
// if count == 0 {
// ids = participantId
// }
// if len(participantId) == 0 || len(ids) == 0 {
// return nil, nil
// }
// ids = intersect(ids, participantId)
// count++
// }
// return micsSlice(ids, 1), nil
//}
// 求交集
func intersect(slice1, slice2 []int64) []int64 {
m := make(map[int64]int)

View File

@ -15,17 +15,17 @@ func NewHpcScheduler(val string) *hpcScheduler {
return &hpcScheduler{yamlString: val}
}
func (h *hpcScheduler) getNewStructForDb(task *types.TaskInfo, participantIds []int64) (interface{}, error) {
func (h *hpcScheduler) getNewStructForDb(task *types.TaskInfo, participantId int64) (interface{}, error) {
hpc := model.Hpc{
TaskId: task.TaskId,
Status: "Saved",
//ParticipantId: participantId[0],
YamlString: h.yamlString,
TaskId: task.TaskId,
Status: "Saved",
ParticipantId: participantId,
YamlString: h.yamlString,
}
tool.Convert(task.Metadata, &hpc)
return hpc, nil
}
func (cs *hpcScheduler) pickOptimalStrategy(task *algo.Task, providers ...*algo.Provider) (*algo.Task, error) {
func (h *hpcScheduler) pickOptimalStrategy(task *algo.Task, providers ...*algo.Provider) (*algo.Task, error) {
return nil, nil
}

View File

@ -4,6 +4,7 @@ import (
"encoding/json"
"github.com/pkg/errors"
"github.com/zeromicro/go-zero/core/logx"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/algo"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/types"
"gorm.io/gorm"
)
@ -24,38 +25,48 @@ func NewScheduler(scheduleService scheduleService, val string) (*scheduler, erro
}
func (s *scheduler) MatchLabels(dbEngin *gorm.DB) {
//if len(task.MatchLabels) != 0 {
// participantId, err := scheduler.MatchLabels(l.svcCtx.DbEngin, task)
// if err != nil {
// return err
// }
//}
// 已指定 ParticipantId
if s.task.ParticipantId != 0 {
return
}
var ids []int64
count := 0
for key := range s.task.MatchLabels {
var participantId []int64
dbEngin.Raw("select participant_id from sc_participant_label_info where `key` = ? and value = ?", key, s.task.MatchLabels[key]).Scan(&participantId)
var participantIds []int64
dbEngin.Raw("select participant_id from sc_participant_label_info where `key` = ? and value = ?", key, s.task.MatchLabels[key]).Scan(&participantIds)
if count == 0 {
ids = participantId
ids = participantIds
}
//if len(participantId) == 0 || len(ids) == 0 {
// return nil, nil
//}
ids = intersect(ids, participantId)
ids = intersect(ids, participantIds)
count++
}
s.participantIds = micsSlice(ids, 1)
}
func (s *scheduler) AssignAndSchedule() {
func (s *scheduler) AssignAndSchedule() error {
task, providerList := s.genTaskAndProviders()
strategy, err := s.scheduleService.pickOptimalStrategy(task, providerList...)
if err != nil {
return err
}
if strategy == nil {
s.task.ParticipantId = s.participantIds[0]
}
return nil
}
func (s *scheduler) SaveToDb(dbEngin *gorm.DB) error {
if len(s.participantIds) == 0 {
return errors.New("participantIds 为空")
if s.task.ParticipantId == 0 {
return errors.New("participantId 为空")
}
structForDb, err := s.scheduleService.getNewStructForDb(s.task, s.participantIds)
structForDb, err := s.scheduleService.getNewStructForDb(s.task, s.task.ParticipantId)
if err != nil {
return err
}
@ -66,3 +77,7 @@ func (s *scheduler) SaveToDb(dbEngin *gorm.DB) error {
}
return nil
}
func (s *scheduler) genTaskAndProviders() (*algo.Task, []*algo.Provider) {
return nil, nil
}