pcm-coordinator/api/internal/pkg/scheduler/cloudScheduler.go

105 lines
3.2 KiB
Go

package scheduler
import (
"bytes"
"encoding/json"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/algo"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/types"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/model"
"gitlink.org.cn/jcce-pcm/utils/tool"
"gorm.io/gorm"
"io"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
syaml "k8s.io/apimachinery/pkg/runtime/serializer/yaml"
kyaml "k8s.io/apimachinery/pkg/util/yaml"
)
type cloudScheduler struct {
}
func NewCloudScheduler() *cloudScheduler {
return &cloudScheduler{}
}
func (cs *cloudScheduler) pickOptimalStrategy(task *algo.Task, providers ...*algo.Provider) (*algo.Task, error) {
//参数为空返回 nil
if len(providers) == 0 || task == nil {
return nil, nil
}
//调度算法
strategy := algo.NewK8sStrategy(task, providers...)
taskResult, err := algo.ScheduleWithFullCollaboration(strategy, strategy.ProviderList)
if err != nil {
return nil, err
}
return taskResult, nil
}
func (cs *cloudScheduler) getNewStructForDb(task *types.TaskInfo, participantId int64) (interface{}, error) {
bytes, err := json.Marshal(task.Metadata)
if err != nil {
return nil, err
}
cloud := cs.UnMarshalK8sStruct(string(bytes), task.TaskId)
cloud.Id = tool.GenSnowflakeID()
cloud.YamlString = string(bytes)
cloud.ParticipantId = participantId
return cloud, nil
}
func (cs *cloudScheduler) UnMarshalK8sStruct(yamlString string, taskId int64) model.Cloud {
var cloud model.Cloud
d := kyaml.NewYAMLOrJSONDecoder(bytes.NewBufferString(yamlString), 4096)
var err error
for {
var rawObj runtime.RawExtension
err = d.Decode(&rawObj)
if err == io.EOF {
break
}
if err != nil {
}
obj := &unstructured.Unstructured{}
syaml.NewDecodingSerializer(unstructured.UnstructuredJSONScheme).Decode(rawObj.Raw, nil, obj)
if err != nil {
}
unstructuredMap, err := runtime.DefaultUnstructuredConverter.ToUnstructured(obj)
if err != nil {
}
unstructureObj := &unstructured.Unstructured{Object: unstructuredMap}
cloud = model.Cloud{
TaskId: taskId,
ApiVersion: unstructureObj.GetAPIVersion(),
Name: unstructureObj.GetName(),
Kind: unstructureObj.GetKind(),
Namespace: unstructureObj.GetNamespace(),
Status: "Saved",
}
// 命名空间为空 设置默认值
if len(unstructureObj.GetNamespace()) == 0 {
cloud.Namespace = "default"
}
}
return cloud
}
func (cs *cloudScheduler) genTaskAndProviders(task *types.TaskInfo, dbEngin *gorm.DB) (*algo.Task, []*algo.Provider) {
var proParams []providerParams
sqlstr := "SELECT SUM(a.disk_avail) as disk_avail,SUM(a.mem_avail) as mem_avail,SUM(a.cpu_total * a.cpu_usable) as cpu_avail,participant_id from (SELECT * from sc_node_avail_info where created_time in (SELECT MAX(created_time) as time from sc_node_avail_info where deleted_flag = 0 GROUP BY participant_id,node_name)) a GROUP BY a.participant_id"
dbEngin.Raw(sqlstr).Scan(&proParams)
var providerList []*algo.Provider
for _, p := range proParams {
provider := algo.NewProvider(p.Participant_id, p.Cpu_avail, p.Mem_avail, p.Disk_avail, 0.0, 0.0, 0.0)
providerList = append(providerList, provider)
}
t := algo.NewTask(0, 1, 2, 75120000, 301214500, 1200, 2, 6, 2000)
return t, providerList
}