diff --git a/api/etc/pcm.yaml b/api/etc/pcm.yaml index 2a27ffba8..ee51794e7 100644 --- a/api/etc/pcm.yaml +++ b/api/etc/pcm.yaml @@ -5,8 +5,8 @@ Port: 8999 Timeout: 50000 DB: - DataSource: root:uJpLd6u-J?HC1@(47.92.88.143:3306)/pcm?parseTime=true&loc=Local - # DataSource: root:uJpLd6u-J?HC1@(47.92.88.143:3306)/pcm?parseTime=true&loc=Local + DataSource: root:uJpLd6u-J?HC1@(10.206.0.12:3306)/pcm?parseTime=true&loc=Local +# DataSource: root:uJpLd6u-J?HC1@(47.92.88.143:3306)/pcm?parseTime=true&loc=Local Redis: Host: 10.206.0.12:6379 Pass: redisPW123 diff --git a/api/internal/logic/cloud/commitgeneraltasklogic.go b/api/internal/logic/cloud/commitgeneraltasklogic.go index e2bf7c5b2..cf8842b91 100644 --- a/api/internal/logic/cloud/commitgeneraltasklogic.go +++ b/api/internal/logic/cloud/commitgeneraltasklogic.go @@ -4,6 +4,7 @@ import ( "bytes" "context" "github.com/pkg/errors" + clientCore "gitlink.org.cn/JointCloud/pcm-coordinator/api/client" "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/schedulers" "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/schedulers/option" "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/constants" @@ -15,7 +16,6 @@ import ( "k8s.io/apimachinery/pkg/runtime" syaml "k8s.io/apimachinery/pkg/runtime/serializer/yaml" kyaml "k8s.io/apimachinery/pkg/util/yaml" - "sigs.k8s.io/yaml" "strconv" "strings" "time" @@ -41,71 +41,98 @@ func NewCommitGeneralTaskLogic(ctx context.Context, svcCtx *svc.ServiceContext) } func (l *CommitGeneralTaskLogic) CommitGeneralTask(req *types.GeneralTaskReq) error { - var yamlStr []string - for _, s := range req.ReqBody { - j2, err := yaml.YAMLToJSON([]byte(s)) - if err != nil { - logx.Errorf("Failed to convert yaml to JSON, err: %v", err) - return err + tx := l.svcCtx.DbEngin.Begin() + // 执行回滚或者提交操作 + defer func() { + if p := recover(); p != nil { + tx.Rollback() + logx.Error(p) + } else if tx.Error != nil { + logx.Info("rollback, error", tx.Error) + tx.Rollback() + } else { + tx = tx.Commit() + logx.Info("commit success") } - yamlStr = append(yamlStr, string(j2)) - } - result := strings.Join(yamlStr, ",") - //TODO The namespace is fixed to ns-admin for the time being. Later, the namespace is obtained based on the user - taskModel := models.Task{ - Status: constants.Saved, - Name: req.Name, - CommitTime: time.Now(), - YamlString: "[" + result + "]", - } - // Save the task data to the database - tx := l.svcCtx.DbEngin.Create(&taskModel) - if tx.Error != nil { - return tx.Error - } - + }() + //TODO adapter + adapterId, _ := strconv.ParseUint(req.AdapterIds[0], 10, 64) var clusters []*models.CloudModel - err := l.svcCtx.DbEngin.Raw("SELECT * FROM `t_cluster` where adapter_id in ? and id in ?", req.AdapterIds, req.ClusterIds).Scan(&clusters).Error + err := tx.Raw("SELECT * FROM `t_cluster` where adapter_id in ? and id in ?", req.AdapterIds, req.ClusterIds).Scan(&clusters).Error if err != nil { logx.Errorf("CommitGeneralTask() => sql execution error: %v", err) return errors.Errorf("the cluster does not match the drive resources. Check the data") } taskCloud := cloud.TaskCloudModel{} - //TODO 执行策略返回集群跟 Replica opt := &option.CloudOption{} utils.Convert(&req, &opt) - sc, err := schedulers.NewCloudScheduler(l.ctx, "", l.svcCtx.Scheduler, opt, l.svcCtx.DbEngin, l.svcCtx.PromClient) - if err != nil { - return err - } + sc, _ := schedulers.NewCloudScheduler(l.ctx, "", l.svcCtx.Scheduler, opt, tx, l.svcCtx.PromClient) + results, err := l.svcCtx.Scheduler.AssignAndSchedule(sc) if err != nil { + logx.Errorf("AssignAndSchedule() => execution error: %v", err) return err } rs := (results).([]*schedulers.CloudResult) + + var synergyStatus int64 + if len(rs) > 1 { + synergyStatus = 1 + } + var strategy int64 + sqlStr := `select t_dict_item.item_value + from t_dict + left join t_dict_item on t_dict.id = t_dict_item.dict_id + where item_text = ? + and t_dict.dict_code = 'schedule_Strategy'` + //查询调度策略 + err = tx.Raw(sqlStr, req.Strategy).Scan(&strategy).Error + taskModel := models.Task{ + Id: utils.GenSnowflakeID(), + Status: constants.Pending, + Name: req.Name, + CommitTime: time.Now(), + YamlString: strings.Join(req.ReqBody, "\n---\n"), + TaskTypeDict: 0, + SynergyStatus: synergyStatus, + Strategy: strategy, + } + var taskClouds []cloud.TaskCloudModel for _, r := range rs { for _, s := range req.ReqBody { sStruct := UnMarshalK8sStruct(s, int64(r.Replica)) unString, _ := sStruct.MarshalJSON() taskCloud.Id = utils.GenSnowflakeIDUint() taskCloud.TaskId = uint(taskModel.Id) - adapterId, _ := strconv.ParseUint(req.AdapterIds[0], 10, 64) clusterId, _ := strconv.ParseUint(r.ClusterId, 10, 64) taskCloud.AdapterId = uint(adapterId) taskCloud.ClusterId = uint(clusterId) taskCloud.ClusterName = r.ClusterName - taskCloud.Status = "Pending" + taskCloud.Status = constants.Pending taskCloud.YamlString = string(unString) taskCloud.Kind = sStruct.GetKind() taskCloud.Namespace = sStruct.GetNamespace() - tx = l.svcCtx.DbEngin.Create(&taskCloud) - if tx.Error != nil { - logx.Errorf("CommitGeneralTask() create taskCloud => sql execution error: %v", err) - return tx.Error - } + taskClouds = append(taskClouds, taskCloud) } } + adapterName := "" + tx.Table("t_adapter").Select("name").Where("id=?", adapterId).Find(&adapterName) + noticeInfo := clientCore.NoticeInfo{ + AdapterId: int64(adapterId), + AdapterName: adapterName, + NoticeType: "create", + TaskName: req.Name, + Incident: "任务创建中", + CreatedTime: time.Now(), + } + db := tx.Table("task").Create(&taskModel) + db = tx.Table("task_cloud").Create(&taskClouds) + db = tx.Table("t_notice").Create(¬iceInfo) + if db.Error != nil { + logx.Errorf("Task creation failure, err: %v", db.Error) + return errors.New("task creation failure") + } return nil } diff --git a/api/internal/logic/core/pulltaskinfologic.go b/api/internal/logic/core/pulltaskinfologic.go index ef9b86d9d..9581659ed 100644 --- a/api/internal/logic/core/pulltaskinfologic.go +++ b/api/internal/logic/core/pulltaskinfologic.go @@ -5,6 +5,7 @@ import ( "github.com/jinzhu/copier" clientCore "gitlink.org.cn/JointCloud/pcm-coordinator/api/client" "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/models" + "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/models/cloud" "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/utils" "gorm.io/gorm" @@ -54,7 +55,7 @@ func (l *PullTaskInfoLogic) PullTaskInfo(req *clientCore.PullTaskInfoReq) (*clie } } case 0: - var cloudModelList []models.Cloud + var cloudModelList []cloud.TaskCloudModel err := findModelList(req.AdapterId, l.svcCtx.DbEngin, &cloudModelList) if err != nil { return nil, err diff --git a/pkg/constants/task.go b/pkg/constants/task.go index daf8879f4..0ec079f3a 100644 --- a/pkg/constants/task.go +++ b/pkg/constants/task.go @@ -26,4 +26,5 @@ const ( WaitRestart = "WaitRestart" WaitPause = "WaitPause" WaitStart = "WaitStart" + Pending = "Pending" ) diff --git a/pkg/models/cloud/task_cloud.go b/pkg/models/cloud/task_cloud.go index 13e8c0453..3dec32bc3 100644 --- a/pkg/models/cloud/task_cloud.go +++ b/pkg/models/cloud/task_cloud.go @@ -6,18 +6,17 @@ import ( ) type TaskCloudModel struct { - Id uint `json:"id" gorm:"primarykey;not null;comment:id"` - TaskId uint `json:"taskId" gorm:"not null;comment:task表id"` - AdapterId uint `json:"adapterId" gorm:"not null;comment:适配器id"` - ClusterId uint `json:"clusterId" gorm:"not null;comment:集群id"` - ClusterName string `json:"clusterName" gorm:"not null;comment:集群名称"` - Kind string `json:"kind" gorm:"comment:种类"` - Status string `json:"status" gorm:"comment:状态"` - StartTime time.Time `json:"startTime" gorm:"comment:开始时间"` - YamlString string `json:"yamlString" gorm:"not null;comment:入参"` - Result string `json:"result" gorm:"comment:运行结果"` - Namespace string `json:"namespace" gorm:"comment:命名空间"` - Replica int `json:"replica" gorm:"not null;comment:副本数"` + Id uint `json:"id" gorm:"primarykey;not null;comment:id"` + TaskId uint `json:"taskId" gorm:"not null;comment:task表id"` + AdapterId uint `json:"adapterId" gorm:"not null;comment:适配器id"` + ClusterId uint `json:"clusterId" gorm:"not null;comment:集群id"` + ClusterName string `json:"clusterName" gorm:"not null;comment:集群名称"` + Kind string `json:"kind" gorm:"comment:种类"` + Status string `json:"status" gorm:"comment:状态"` + StartTime *time.Time `json:"startTime" gorm:"comment:开始时间"` + YamlString string `json:"yamlString" gorm:"not null;comment:入参"` + Result string `json:"result" gorm:"comment:运行结果"` + Namespace string `json:"namespace" gorm:"comment:命名空间"` base.BaseModel }