新增查询所有模型接口
This commit is contained in:
parent
4658a843a2
commit
6d14924693
|
@ -74,6 +74,19 @@ func (s *JobService) QueryRunningModels(ctx *gin.Context) {
|
|||
ctx.JSON(http.StatusOK, OK(resp))
|
||||
}
|
||||
|
||||
func (s *JobService) GetAllModels(ctx *gin.Context) {
|
||||
log := logger.WithField("HTTP", "JobService.QueryRunningModels")
|
||||
|
||||
resp, err := s.svc.JobSetSvc().QueryRunningModels()
|
||||
if err != nil {
|
||||
log.Warnf("get available nodes: %s", err.Error())
|
||||
ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "get available nodes failed"))
|
||||
return
|
||||
}
|
||||
|
||||
ctx.JSON(http.StatusOK, OK(resp))
|
||||
}
|
||||
|
||||
func (s *JobService) ECSNodeRunningInfo(ctx *gin.Context) {
|
||||
log := logger.WithField("HTTP", "JobService.ECSNodeRunningInfo")
|
||||
|
||||
|
|
|
@ -42,6 +42,7 @@ func (s *Server) initRouters() {
|
|||
s.engine.POST("/job/createInstance", s.JobSvc().CreateInstance)
|
||||
s.engine.GET("/job/queryRunningModels", s.JobSvc().QueryRunningModels)
|
||||
s.engine.GET("/job/getECSNodeRunningInfo", s.JobSvc().ECSNodeRunningInfo)
|
||||
s.engine.GET("/job/getAllModels", s.JobSvc().GetAllModels)
|
||||
s.engine.POST("/jobSet/localFileUploaded", s.JobSetSvc().LocalFileUploaded)
|
||||
s.engine.GET("/jobSet/getServiceList", s.JobSetSvc().GetServiceList)
|
||||
}
|
||||
|
|
|
@ -42,6 +42,22 @@ func (svc *JobSetService) QueryRunningModels() (*schsdk.RunningModelResp, error)
|
|||
return resp, nil
|
||||
}
|
||||
|
||||
func (svc *JobSetService) GetAllModels() (*mgrmq.GetAllModelsResp, error) {
|
||||
|
||||
mgrCli, err := schglb.ManagerMQPool.Acquire()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("get available nodes: %w", err)
|
||||
}
|
||||
defer schglb.ManagerMQPool.Release(mgrCli)
|
||||
|
||||
resp, err := mgrCli.GetAllModels(&mgrmq.GetAllModels{})
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("submitting job set to manager: %w", err)
|
||||
}
|
||||
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
func (svc *JobSetService) ECSNodeRunningInfo(req *schsdk.ECSNodeRunningInfoReq) (*schsdk.ECSNodeRunningInfoResp, error) {
|
||||
|
||||
mgrCli, err := schglb.ManagerMQPool.Acquire()
|
||||
|
|
|
@ -81,6 +81,11 @@ type CCResourceInfo struct {
|
|||
Memory int64 `json:"memory"`
|
||||
}
|
||||
|
||||
type Models struct {
|
||||
ModelID schsdk.ModelID `json:"modelID" db:"modelID"`
|
||||
ModelName schsdk.ModelName `json:"modelName" db:"modelName"`
|
||||
}
|
||||
|
||||
func (i *CCResourceInfo) Scan(src interface{}) error {
|
||||
data, ok := src.([]uint8)
|
||||
if !ok {
|
||||
|
|
|
@ -0,0 +1,21 @@
|
|||
package db
|
||||
|
||||
import (
|
||||
"github.com/jmoiron/sqlx"
|
||||
schmod "gitlink.org.cn/cloudream/scheduler/common/models"
|
||||
)
|
||||
|
||||
type ModelsDB struct {
|
||||
*DB
|
||||
}
|
||||
|
||||
func (db *DB) Models() *ModelsDB {
|
||||
return &ModelsDB{DB: db}
|
||||
}
|
||||
|
||||
func (*ModelsDB) GetAll(ctx SQLContext) ([]schmod.Models, error) {
|
||||
var ret []schmod.Models
|
||||
err := sqlx.Select(ctx, &ret, "select * from Models")
|
||||
|
||||
return ret, err
|
||||
}
|
|
@ -0,0 +1,33 @@
|
|||
package manager
|
||||
|
||||
import (
|
||||
"gitlink.org.cn/cloudream/common/pkgs/mq"
|
||||
schmod "gitlink.org.cn/cloudream/scheduler/common/models"
|
||||
)
|
||||
|
||||
type ModelsService interface {
|
||||
GetAllModels(msg *GetAllModels) (*GetAllModelsResp, *mq.CodeMessage)
|
||||
}
|
||||
|
||||
// 获取所有的算力中心信息
|
||||
var _ = Register(Service.GetAllModels)
|
||||
|
||||
type GetAllModels struct {
|
||||
mq.MessageBodyBase
|
||||
}
|
||||
type GetAllModelsResp struct {
|
||||
mq.MessageBodyBase
|
||||
Models []schmod.Models `json:"models"`
|
||||
}
|
||||
|
||||
func NewGetAllModels() *GetAllModels {
|
||||
return &GetAllModels{}
|
||||
}
|
||||
func NewGetAllModelsResp(ccs []schmod.Models) *GetAllModelsResp {
|
||||
return &GetAllModelsResp{
|
||||
Models: ccs,
|
||||
}
|
||||
}
|
||||
func (c *Client) GetAllModels(msg *GetAllModels, opts ...mq.RequestOption) (*GetAllModelsResp, error) {
|
||||
return mq.Request(Service.GetAllModels, c.roundTripper, msg, opts...)
|
||||
}
|
|
@ -19,6 +19,8 @@ type Service interface {
|
|||
ImageService
|
||||
|
||||
JobService
|
||||
|
||||
ModelsService
|
||||
}
|
||||
|
||||
type Server struct {
|
||||
|
|
|
@ -208,8 +208,6 @@ func (s *NormalJobExecuting) listen(rtx jobmgr.JobStateRunContext, jo *jobmgr.Jo
|
|||
return fmt.Errorf("getting executor client: %w", err)
|
||||
}
|
||||
evt := v1.Value.(*event.Update)
|
||||
//obsPath = strings.Replace(obsPath, "\\", "/", -1)
|
||||
//evt.Command = strings.Replace(evt.Command, "@lora_path@", "/mnt/obs"+obsPath, -1)
|
||||
operateResp, err := client.OperateTask(executor.NewTaskOperateInfo(task.ID(), evt.Operate, evt.Command))
|
||||
if err != nil {
|
||||
return fmt.Errorf("operate task: %w", err)
|
||||
|
|
|
@ -120,16 +120,6 @@ func (svc *Service) CreateInstance(instInfo *mgrmq.CreateInstance) (*mgrmq.Creat
|
|||
return mq.ReplyOK(mgrmq.NewCreateInstanceResp(result.JobID, result.FilesUploadScheme))
|
||||
}
|
||||
|
||||
func (svc *Service) QueryRunningModels(msg *mgrmq.AvailableNodes) (*schsdk.RunningModelResp, *mq.CodeMessage) {
|
||||
availableNodes := jobmgr.GetAvailableNodes()
|
||||
return mq.ReplyOK(mgrmq.NewAvailableNodesResp(availableNodes))
|
||||
}
|
||||
|
||||
func (svc *Service) ECSNodeRunningInfo(req *schsdk.ECSNodeRunningInfoReq) (*schsdk.ECSNodeRunningInfoResp, *mq.CodeMessage) {
|
||||
info := jobmgr.GetNodeUsageRateInfo(req.CustomModelName, req.ModelID)
|
||||
return mq.ReplyOK(schsdk.NewECSNodeRunningInfoResp(info))
|
||||
}
|
||||
|
||||
// 任务集中某个文件上传完成
|
||||
func (svc *Service) JobSetLocalFileUploaded(msg *mgrmq.JobSetLocalFileUploaded) (*mgrmq.JobSetLocalFileUploadedResp, *mq.CodeMessage) {
|
||||
logger.WithField("LocalPath", msg.LocalPath).
|
||||
|
|
|
@ -0,0 +1,30 @@
|
|||
package mq
|
||||
|
||||
import (
|
||||
"gitlink.org.cn/cloudream/common/consts/errorcode"
|
||||
"gitlink.org.cn/cloudream/common/pkgs/logger"
|
||||
"gitlink.org.cn/cloudream/common/pkgs/mq"
|
||||
schsdk "gitlink.org.cn/cloudream/common/sdks/scheduler"
|
||||
mgrmq "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq/manager"
|
||||
"gitlink.org.cn/cloudream/scheduler/manager/internal/jobmgr"
|
||||
)
|
||||
|
||||
func (svc *Service) QueryRunningModels(msg *mgrmq.AvailableNodes) (*schsdk.RunningModelResp, *mq.CodeMessage) {
|
||||
availableNodes := jobmgr.GetAvailableNodes()
|
||||
return mq.ReplyOK(mgrmq.NewAvailableNodesResp(availableNodes))
|
||||
}
|
||||
|
||||
func (svc *Service) ECSNodeRunningInfo(req *schsdk.ECSNodeRunningInfoReq) (*schsdk.ECSNodeRunningInfoResp, *mq.CodeMessage) {
|
||||
info := jobmgr.GetNodeUsageRateInfo(req.CustomModelName, req.ModelID)
|
||||
return mq.ReplyOK(schsdk.NewECSNodeRunningInfoResp(info))
|
||||
}
|
||||
|
||||
func (svc *Service) GetAllModels() (*mgrmq.GetAllModelsResp, *mq.CodeMessage) {
|
||||
models, err := svc.db.Models().GetAll(svc.db.SQLCtx())
|
||||
if err != nil {
|
||||
logger.Warnf("getting all models: %s", err.Error())
|
||||
return nil, mq.Failed(errorcode.OperationFailed, "get all models failed")
|
||||
}
|
||||
|
||||
return mq.ReplyOK(mgrmq.NewGetAllModelsResp(models))
|
||||
}
|
Loading…
Reference in New Issue