updated startall apis

This commit is contained in:
tzwang 2024-07-31 16:22:47 +08:00
parent 12ca4d7bdd
commit 50c594f817
7 changed files with 210 additions and 5 deletions

View File

@ -50,7 +50,7 @@ func (l *DeployInstanceListLogic) DeployInstanceList(req *types.DeployInstanceLi
}
go updater.UpdateDeployInstanceStatus(l.svcCtx, ins, true)
go updater.UpdateDeployTaskStatus(l.svcCtx)
//count total
var total int64
err = tx.Count(&total).Error

View File

@ -4,7 +4,6 @@ import (
"context"
"errors"
"github.com/zeromicro/go-zero/core/logx"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/updater"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/svc"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/types"
"strconv"
@ -41,7 +40,10 @@ func (l *StartAllByDeployTaskIdLogic) StartAllByDeployTaskId(req *types.StartAll
}
}
go updater.UpdateDeployInstanceStatusBatch(l.svcCtx, list)
err = l.svcCtx.Scheduler.AiStorages.UpdateDeployTaskById(id)
if err != nil {
return nil, err
}
return resp, nil
}

View File

@ -3,7 +3,6 @@ package inference
import (
"context"
"errors"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/updater"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/svc"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/types"
"strconv"
@ -42,7 +41,10 @@ func (l *StopAllByDeployTaskIdLogic) StopAllByDeployTaskId(req *types.StopAllByD
}
}
go updater.UpdateDeployInstanceStatusBatch(l.svcCtx, list)
err = l.svcCtx.Scheduler.AiStorages.UpdateDeployTaskById(id)
if err != nil {
return nil, err
}
return resp, nil
}

View File

@ -421,6 +421,50 @@ func (s *AiStorage) GetInferDeployInstanceById(id int64) (*models.AiInferDeployI
return &deployIns, nil
}
func (s *AiStorage) GetDeployTaskById(id int64) (*models.AiDeployInstanceTask, error) {
var task models.AiDeployInstanceTask
tx := s.DbEngin.Raw("select * from ai_deploy_instance_task where `id` = ?", id).Scan(&task)
if tx.Error != nil {
logx.Errorf(tx.Error.Error())
return nil, tx.Error
}
return &task, nil
}
func (s *AiStorage) GetAllDeployTasks() ([]*models.AiDeployInstanceTask, error) {
var tasks []*models.AiDeployInstanceTask
tx := s.DbEngin.Raw("select * from ai_deploy_instance_task").Scan(&tasks)
if tx.Error != nil {
logx.Errorf(tx.Error.Error())
return nil, tx.Error
}
return tasks, nil
}
func (s *AiStorage) UpdateDeployTask(task *models.AiDeployInstanceTask, needUpdateTime bool) error {
if needUpdateTime {
task.UpdateTime = time.Now().Format(time.RFC3339)
}
tx := s.DbEngin.Table("ai_deploy_instance_task").Updates(task)
if tx.Error != nil {
logx.Errorf(tx.Error.Error())
return tx.Error
}
return nil
}
func (s *AiStorage) UpdateDeployTaskById(id int64) error {
task, err := s.GetDeployTaskById(id)
if err != nil {
return err
}
err = s.UpdateDeployTask(task, true)
if err != nil {
return err
}
return nil
}
func (s *AiStorage) GetInstanceListByDeployTaskId(id int64) ([]*models.AiInferDeployInstance, error) {
var list []*models.AiInferDeployInstance
tx := s.DbEngin.Raw("select * from ai_infer_deploy_instance where `deploy_instance_task_id` = ?", id).Scan(&list)

View File

@ -7,6 +7,7 @@ import (
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/models"
"net/http"
"strconv"
"time"
)
func UpdateDeployInstanceStatusBatch(svc *svc.ServiceContext, insList []*models.AiInferDeployInstance) {
@ -27,6 +28,44 @@ func UpdateDeployInstanceStatusBatch(svc *svc.ServiceContext, insList []*models.
}
}
func UpdateDeployTaskStatus(svc *svc.ServiceContext) {
list, err := svc.Scheduler.AiStorages.GetAllDeployTasks()
if err != nil {
return
}
ins := list[0]
for i := range list {
uTime, _ := time.Parse(time.RFC3339, ins.UpdateTime)
latest, _ := time.Parse(time.RFC3339, list[i].UpdateTime)
if latest.After(uTime) {
ins = list[i]
}
}
updateDeployTaskStatus(svc, ins)
}
func updateDeployTaskStatus(svc *svc.ServiceContext, ins *models.AiDeployInstanceTask) {
list, err := svc.Scheduler.AiStorages.GetInstanceListByDeployTaskId(ins.Id)
if err != nil {
return
}
for i := len(list) - 1; i >= 0; i-- {
if list[i].Status == constants.Running || list[i].Status == constants.Stopped {
list = append(list[:i], list[i+1:]...)
}
}
if len(list) == 0 {
return
}
for _, instance := range list {
go UpdateDeployInstanceStatus(svc, instance, false)
}
}
func UpdateDeployInstanceStatus(svc *svc.ServiceContext, instance *models.AiInferDeployInstance, updatetime bool) {
amap, found := svc.Scheduler.AiService.InferenceAdapterMap[strconv.FormatInt(instance.AdapterId, 10)]
if !found {

View File

@ -0,0 +1,29 @@
package models
import "github.com/zeromicro/go-zero/core/stores/sqlx"
var _ AiDeployInstanceTaskModel = (*customAiDeployInstanceTaskModel)(nil)
type (
// AiDeployInstanceTaskModel is an interface to be customized, add more methods here,
// and implement the added methods in customAiDeployInstanceTaskModel.
AiDeployInstanceTaskModel interface {
aiDeployInstanceTaskModel
withSession(session sqlx.Session) AiDeployInstanceTaskModel
}
customAiDeployInstanceTaskModel struct {
*defaultAiDeployInstanceTaskModel
}
)
// NewAiDeployInstanceTaskModel returns a model for the database table.
func NewAiDeployInstanceTaskModel(conn sqlx.SqlConn) AiDeployInstanceTaskModel {
return &customAiDeployInstanceTaskModel{
defaultAiDeployInstanceTaskModel: newAiDeployInstanceTaskModel(conn),
}
}
func (m *customAiDeployInstanceTaskModel) withSession(session sqlx.Session) AiDeployInstanceTaskModel {
return NewAiDeployInstanceTaskModel(sqlx.NewSqlConnFromSession(session))
}

View File

@ -0,0 +1,89 @@
// Code generated by goctl. DO NOT EDIT.
package models
import (
"context"
"database/sql"
"fmt"
"strings"
"github.com/zeromicro/go-zero/core/stores/builder"
"github.com/zeromicro/go-zero/core/stores/sqlc"
"github.com/zeromicro/go-zero/core/stores/sqlx"
"github.com/zeromicro/go-zero/core/stringx"
)
var (
aiDeployInstanceTaskFieldNames = builder.RawFieldNames(&AiDeployInstanceTask{})
aiDeployInstanceTaskRows = strings.Join(aiDeployInstanceTaskFieldNames, ",")
aiDeployInstanceTaskRowsExpectAutoSet = strings.Join(stringx.Remove(aiDeployInstanceTaskFieldNames, "`id`", "`create_at`", "`create_time`", "`created_at`", "`update_at`", "`update_time`", "`updated_at`"), ",")
aiDeployInstanceTaskRowsWithPlaceHolder = strings.Join(stringx.Remove(aiDeployInstanceTaskFieldNames, "`id`", "`create_at`", "`create_time`", "`created_at`", "`update_at`", "`update_time`", "`updated_at`"), "=?,") + "=?"
)
type (
aiDeployInstanceTaskModel interface {
Insert(ctx context.Context, data *AiDeployInstanceTask) (sql.Result, error)
FindOne(ctx context.Context, id int64) (*AiDeployInstanceTask, error)
Update(ctx context.Context, data *AiDeployInstanceTask) error
Delete(ctx context.Context, id int64) error
}
defaultAiDeployInstanceTaskModel struct {
conn sqlx.SqlConn
table string
}
AiDeployInstanceTask struct {
Id int64 `db:"id"`
Name string `db:"name"`
ModelName string `db:"model_name"`
ModelType string `db:"model_type"`
Desc string `db:"desc"`
CreateTime string `db:"create_time"`
UpdateTime string `db:"update_time"`
}
)
func newAiDeployInstanceTaskModel(conn sqlx.SqlConn) *defaultAiDeployInstanceTaskModel {
return &defaultAiDeployInstanceTaskModel{
conn: conn,
table: "`ai_deploy_instance_task`",
}
}
func (m *defaultAiDeployInstanceTaskModel) Delete(ctx context.Context, id int64) error {
query := fmt.Sprintf("delete from %s where `id` = ?", m.table)
_, err := m.conn.ExecCtx(ctx, query, id)
return err
}
func (m *defaultAiDeployInstanceTaskModel) FindOne(ctx context.Context, id int64) (*AiDeployInstanceTask, error) {
query := fmt.Sprintf("select %s from %s where `id` = ? limit 1", aiDeployInstanceTaskRows, m.table)
var resp AiDeployInstanceTask
err := m.conn.QueryRowCtx(ctx, &resp, query, id)
switch err {
case nil:
return &resp, nil
case sqlc.ErrNotFound:
return nil, ErrNotFound
default:
return nil, err
}
}
func (m *defaultAiDeployInstanceTaskModel) Insert(ctx context.Context, data *AiDeployInstanceTask) (sql.Result, error) {
query := fmt.Sprintf("insert into %s (%s) values (?, ?, ?, ?)", m.table, aiDeployInstanceTaskRowsExpectAutoSet)
ret, err := m.conn.ExecCtx(ctx, query, data.Name, data.ModelName, data.ModelType, data.Desc)
return ret, err
}
func (m *defaultAiDeployInstanceTaskModel) Update(ctx context.Context, data *AiDeployInstanceTask) error {
query := fmt.Sprintf("update %s set %s where `id` = ?", m.table, aiDeployInstanceTaskRowsWithPlaceHolder)
_, err := m.conn.ExecCtx(ctx, query, data.Name, data.ModelName, data.ModelType, data.Desc, data.Id)
return err
}
func (m *defaultAiDeployInstanceTaskModel) tableName() string {
return m.table
}