pcm-coordinator/internal/scheduler/schedulers/aiScheduler.go

486 lines
12 KiB
Go

/*
Copyright (c) [2023] [pcm]
[pcm-coordinator] is licensed under Mulan PSL v2.
You can use this software according to the terms and conditions of the Mulan PSL v2.
You may obtain a copy of Mulan PSL v2 at:
http://license.coscl.org.cn/MulanPSL2
THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
EITHER EXPaRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
See the Mulan PSL v2 for more details.
*/
package schedulers
import (
"context"
"encoding/json"
"errors"
"fmt"
"github.com/zeromicro/go-zero/core/logx"
"gitlink.org.cn/JointCloud/pcm-ac/hpcAC"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/common"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/schedulers/option"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/collector"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/executor"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/utils/jcs"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/strategy"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/strategy/param"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/storeLink"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/constants"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/models"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/response"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/utils"
"gitlink.org.cn/JointCloud/pcm-modelarts/client/modelartsservice"
"gitlink.org.cn/JointCloud/pcm-octopus/octopus"
"gitlink.org.cn/JointCloud/pcm-openi/model"
"strconv"
"strings"
"sync"
)
type AiScheduler struct {
yamlString string
task *response.TaskInfo
*scheduler.Scheduler
option *option.AiOption
ctx context.Context
}
type AiResult struct {
AdapterId string
TaskName string
JobId string
ClusterId string
Strategy string
Replica int32
Card string
Msg string
Output string
}
func NewAiScheduler(ctx context.Context, val string, scheduler *scheduler.Scheduler, option *option.AiOption) (*AiScheduler, error) {
return &AiScheduler{ctx: ctx, yamlString: val, Scheduler: scheduler, option: option}, nil
}
func (as *AiScheduler) GetNewStructForDb(task *response.TaskInfo, resource string, participantId int64) (interface{}, error) {
ai := models.Ai{
AdapterId: participantId,
TaskId: task.TaskId,
Status: "Saved",
YamlString: as.yamlString,
}
utils.Convert(task.Metadata, &ai)
return ai, nil
}
func (as *AiScheduler) PickOptimalStrategy() (strategy.Strategy, error) {
if as.option.ComputeCard != "" {
m, ok := as.AiService.AiCollectorAdapterMap[as.option.AdapterId]
if ok {
for _, id := range as.option.ClusterIds {
cm, ok := m[id]
if ok {
cards, err := cm.GetComputeCards(as.ctx)
if err != nil {
return nil, err
}
if common.Contains(cards, as.option.ComputeCard) {
return &strategy.SingleAssignment{Cluster: &strategy.AssignedCluster{ClusterId: id, Replicas: 1}}, nil
}
}
}
}
}
if len(as.option.ClusterIds) == 1 {
return &strategy.SingleAssignment{Cluster: &strategy.AssignedCluster{ClusterId: as.option.ClusterIds[0], Replicas: 1}}, nil
}
resources, err := as.findClustersWithResources()
if err != nil {
return nil, err
}
if len(resources) == 0 {
return nil, errors.New("no cluster has resources")
}
if len(resources) == 1 {
var cluster strategy.AssignedCluster
cluster.ClusterId = resources[0].ClusterId
cluster.Replicas = 1
return &strategy.SingleAssignment{Cluster: &cluster}, nil
}
params := &param.Params{Resources: resources}
switch as.option.StrategyName {
case strategy.REPLICATION:
var clusterIds []string
for _, resource := range resources {
if resource == nil {
continue
}
clusterIds = append(clusterIds, resource.ClusterId)
}
strategy := strategy.NewReplicationStrategy(clusterIds, 1)
return strategy, nil
case strategy.RESOURCES_PRICING:
strategy := strategy.NewPricingStrategy(&param.ResourcePricingParams{Params: params, Replicas: 1})
return strategy, nil
case strategy.DYNAMIC_RESOURCES:
strategy := strategy.NewDynamicResourcesStrategy(params.Resources, as.option, 1)
return strategy, nil
case strategy.STATIC_WEIGHT:
//todo resources should match cluster StaticWeightMap
strategy := strategy.NewStaticWeightStrategy(as.option.ClusterToStaticWeight, as.option.Replica)
return strategy, nil
case strategy.RANDOM:
strategy := strategy.NewRandomStrategy(as.option.ClusterIds, as.option.Replica)
return strategy, nil
}
return nil, errors.New("no strategy has been chosen")
}
func (as *AiScheduler) AssignTask(clusters []*strategy.AssignedCluster, mode int) (interface{}, error) {
if clusters == nil {
return nil, errors.New("clusters is nil")
}
for i := len(clusters) - 1; i >= 0; i-- {
if clusters[i].Replicas == 0 {
clusters = append(clusters[:i], clusters[i+1:]...)
}
}
if len(clusters) == 0 {
return nil, errors.New("clusters is nil")
}
var wg sync.WaitGroup
var results []*AiResult
var mu sync.Mutex
var errs []interface{}
var taskNum int32
for _, cluster := range clusters {
taskNum += cluster.Replicas
}
var ch = make(chan *AiResult, taskNum)
var errCh = make(chan interface{}, taskNum)
executorMap := as.AiService.AiExecutorAdapterMap[as.option.AdapterId]
for _, cluster := range clusters {
c := cluster
for i := 0; i < int(c.Replicas); i++ {
wg.Add(1)
go func() {
opt, _ := cloneAiOption(as.option)
// decide opt params by mode
updateAiOptionByMode(c, opt, mode)
resp, err := executorMap[c.ClusterId].Execute(as.ctx, opt, mode)
if err != nil {
e := struct {
err error
clusterId string
}{
err: err,
clusterId: c.ClusterId,
}
errCh <- e
wg.Done()
return
}
result := &AiResult{}
mu.Lock()
result, _ = convertType(resp)
mu.Unlock()
result.AdapterId = opt.AdapterId
result.TaskName = opt.TaskName
result.Replica = c.Replicas
result.ClusterId = c.ClusterId
result.Strategy = as.option.StrategyName
result.Card = opt.ComputeCard
result.Output = opt.Output
ch <- result
wg.Done()
}()
}
}
wg.Wait()
close(ch)
close(errCh)
for e := range errCh {
errs = append(errs, e)
}
for s := range ch {
results = append(results, s)
}
err := as.handleErrors(errs, clusters, results, mode)
if err != nil {
return nil, err
}
return results, nil
}
func (as *AiScheduler) handleErrors(errs []interface{}, clusters []*strategy.AssignedCluster, results []*AiResult, mode int) error {
if len(errs) != 0 {
var synergystatus int64
if len(clusters) > 1 {
synergystatus = 1
}
var taskId int64
switch mode {
case executor.SUBMIT_MODE_JOINT_CLOUD:
tid, err := as.CreateTask(as.option.TaskName, "", 0, synergystatus, as.option.StrategyName, "", "", "", nil)
if err != nil {
return err
}
taskId = tid
case executor.SUBMIT_MODE_STORAGE_SCHEDULE:
taskId = as.option.TaskId
}
// aiTasks
adapterName, err := as.AiStorages.GetAdapterNameById(as.option.AdapterId)
if err != nil {
return err
}
var errmsg string
for _, err := range errs {
e := (err).(struct {
err error
clusterId string
})
msg := fmt.Sprintf("clusterId: %v , error: %v \n", e.clusterId, e.err.Error())
errmsg += msg
clusterName, _ := as.AiStorages.GetClusterNameById(e.clusterId)
err := as.AiStorages.SaveAiTask(taskId, as.option, adapterName, e.clusterId, clusterName, "", constants.Failed, msg)
if err != nil {
return errors.New("database add failed: " + err.Error())
}
//report msg
report := &jcs.TrainReportMessage{
Type: "Train",
TaskName: "",
TaskID: strconv.FormatInt(taskId, 10),
Status: false,
Message: msg,
ClusterID: e.clusterId,
Output: "",
}
//report status
_ = jcs.StatusReport(as.AiService.Conf.JcsMiddleware.JobStatusReportUrl, report)
logx.Errorf(errors.New(errmsg).Error())
return errors.New(errmsg)
}
for _, s := range results {
as.option.ComputeCard = s.Card //execute card
clusterName, _ := as.AiStorages.GetClusterNameById(s.ClusterId)
if s.Msg != "" {
msg := fmt.Sprintf("clusterId: %v , error: %v \n", s.ClusterId, s.Msg)
errmsg += msg
err := as.AiStorages.SaveAiTask(taskId, as.option, adapterName, s.ClusterId, clusterName, "", constants.Failed, msg)
if err != nil {
return errors.New("database add failed: " + err.Error())
}
} else {
msg := fmt.Sprintf("clusterId: %v , submitted successfully, jobId: %v \n", s.ClusterId, s.JobId)
errmsg += msg
err := as.AiStorages.SaveAiTask(taskId, as.option, adapterName, s.ClusterId, clusterName, s.JobId, constants.Saved, msg)
if err != nil {
return errors.New("database add failed: " + err.Error())
}
}
//add report msg
report := &jcs.TrainReportMessage{
Type: "Train",
TaskName: "",
TaskID: strconv.FormatInt(taskId, 10),
Status: false,
Message: s.Msg,
ClusterID: s.ClusterId,
Output: "",
}
//report status
_ = jcs.StatusReport(as.AiService.Conf.JcsMiddleware.JobStatusReportUrl, report)
}
logx.Errorf(errors.New(errmsg).Error())
return errors.New(errmsg)
}
return nil
}
func updateAiOptionByMode(cluster *strategy.AssignedCluster, opt *option.AiOption, mode int) {
switch mode {
case executor.SUBMIT_MODE_STORAGE_SCHEDULE:
opt.Cmd = cluster.Cmd
opt.Envs = cluster.Envs
opt.Params = cluster.Params
opt.ImageId = cluster.ImageId
opt.AlgorithmId = cluster.CodeId
opt.DatasetsId = cluster.DatasetId
opt.ResourcesRequired = cluster.ResourcesRequired
// assume params include output for modelarts
for _, param := range cluster.Params {
s := strings.Split(param, storeLink.COMMA)
if s[0] == "output" {
opt.Output = s[1]
}
}
default:
}
}
func (as *AiScheduler) findClustersWithResources() ([]*collector.ResourceStats, error) {
var wg sync.WaitGroup
var clustersNum = len(as.AiService.AiCollectorAdapterMap[as.option.AdapterId])
var ch = make(chan *collector.ResourceStats, clustersNum)
var errCh = make(chan interface{}, clustersNum)
var resourceSpecs []*collector.ResourceStats
var errs []interface{}
for s, resourceCollector := range as.AiService.AiCollectorAdapterMap[as.option.AdapterId] {
wg.Add(1)
rc := resourceCollector
id := s
go func() {
spec, err := rc.GetResourceStats(as.ctx)
if err != nil {
e := struct {
err error
clusterId string
}{
err: err,
clusterId: id,
}
errCh <- e
wg.Done()
return
}
ch <- spec
wg.Done()
}()
}
wg.Wait()
close(ch)
close(errCh)
for s := range ch {
resourceSpecs = append(resourceSpecs, s)
}
for e := range errCh {
errs = append(errs, e)
}
if len(errs) == clustersNum {
return nil, errors.New("get resources failed")
}
if len(errs) != 0 {
var msg string
for _, err := range errs {
e := (err).(struct {
err error
clusterId string
})
msg += fmt.Sprintf("clusterId: %v , error: %v \n", e.clusterId, e.err.Error())
}
//return nil, errors.New(msg)
}
return resourceSpecs, nil
}
func convertType(in interface{}) (*AiResult, error) {
var result AiResult
switch (in).(type) {
case *hpcAC.SubmitTaskAiResp:
resp := (in).(*hpcAC.SubmitTaskAiResp)
if resp.Code == "0" {
result.JobId = resp.Data
} else {
result.Msg = resp.Msg
}
return &result, nil
case *octopus.CreateTrainJobResp:
resp := (in).(*octopus.CreateTrainJobResp)
if resp.Success {
result.JobId = resp.Payload.JobId
} else {
result.Msg = resp.Error.Message
}
return &result, nil
case *modelartsservice.CreateTrainingJobResp:
resp := (in).(*modelartsservice.CreateTrainingJobResp)
if resp.ErrorMsg != "" {
result.Msg = resp.ErrorMsg
} else {
result.JobId = resp.Metadata.Id
}
return &result, nil
case model.CreateTask:
resp := (in).(model.CreateTask)
if resp.Code != 0 {
result.Msg = resp.Msg
} else {
result.JobId = strconv.Itoa(resp.Data.Id)
}
return &result, nil
default:
return nil, errors.New("ai task response failed")
}
}
func cloneAiOption(opt *option.AiOption) (*option.AiOption, error) {
origJSON, err := json.Marshal(opt)
if err != nil {
return nil, err
}
clone := option.AiOption{}
if err = json.Unmarshal(origJSON, &clone); err != nil {
return nil, err
}
return &clone, nil
}