新增获取模型列表等接口

This commit is contained in:
JeshuaRen 2024-08-30 17:07:09 +08:00
parent ae31802f19
commit 776748b0ff
21 changed files with 522 additions and 256 deletions

View File

@ -20,8 +20,8 @@ type CreateInstanceResp struct {
}
type CreateInstanceReq struct {
JobID schsdk.JobID `json:"jobID" binding:"required"`
DataSet schsdk.JobFileInfo `json:"dataset" binding:"required"`
JobSetID schsdk.JobSetID `json:"jobSetID" binding:"required"`
DataSet schsdk.JobFileInfo `json:"dataset" binding:"required"`
}
func (s *Server) JobSvc() *JobService {
@ -47,7 +47,7 @@ func (s *JobService) CreateInstance(ctx *gin.Context) {
return
}
jobID, filesUploadScheme, err := s.svc.JobSetSvc().CreateInstance(req.JobID, req.DataSet)
jobID, filesUploadScheme, err := s.svc.JobSetSvc().CreateInstance(req.JobSetID, req.DataSet)
if err != nil {
log.Warnf("create job instance: %s", err.Error())
ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "create job instance failed"))
@ -61,24 +61,37 @@ func (s *JobService) CreateInstance(ctx *gin.Context) {
}
func (s *JobService) GetAvailableNodes(ctx *gin.Context) {
log := logger.WithField("HTTP", "JobService.GetAvailableNodes")
func (s *JobService) QueryRunningModels(ctx *gin.Context) {
log := logger.WithField("HTTP", "JobService.QueryRunningModels")
//bodyData, err := io.ReadAll(ctx.Request.Body)
//if err != nil {
// log.Warnf("reading request body: %s", err.Error())
// ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "read request body failed"))
// return
//}
//
//req, err := serder.JSONToObjectEx[CreateInstanceReq](bodyData)
//if err != nil {
// log.Warnf("parsing request body: %s", err.Error())
// ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "parse request body failed"))
// return
//}
resp, err := s.svc.JobSetSvc().GetAvailableNodes()
resp, err := s.svc.JobSetSvc().QueryRunningModels()
if err != nil {
log.Warnf("get available nodes: %s", err.Error())
ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "get available nodes failed"))
return
}
ctx.JSON(http.StatusOK, OK(resp))
}
func (s *JobService) ECSNodeRunningInfo(ctx *gin.Context) {
log := logger.WithField("HTTP", "JobService.ECSNodeRunningInfo")
bodyData, err := io.ReadAll(ctx.Request.Body)
if err != nil {
log.Warnf("reading request body: %s", err.Error())
ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "read request body failed"))
return
}
req, err := serder.JSONToObjectEx[*schsdk.ECSNodeRunningInfoReq](bodyData)
if err != nil {
log.Warnf("parsing request body: %s", err.Error())
ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "parse request body failed"))
return
}
resp, err := s.svc.JobSetSvc().ECSNodeRunningInfo(req)
if err != nil {
log.Warnf("get available nodes: %s", err.Error())
ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "get available nodes failed"))

View File

@ -40,7 +40,8 @@ func (s *Server) Serve() error {
func (s *Server) initRouters() {
s.engine.POST("/jobSet/submit", s.JobSetSvc().Submit)
s.engine.POST("/job/createInstance", s.JobSvc().CreateInstance)
s.engine.GET("/job/getAvailableNodes", s.JobSvc().GetAvailableNodes)
s.engine.GET("/job/queryRunningModels", s.JobSvc().QueryRunningModels)
s.engine.GET("/job/getECSNodeRunningInfo", s.JobSvc().ECSNodeRunningInfo)
s.engine.POST("/jobSet/localFileUploaded", s.JobSetSvc().LocalFileUploaded)
s.engine.GET("/jobSet/getServiceList", s.JobSetSvc().GetServiceList)
}

View File

@ -8,7 +8,7 @@ import (
)
// Create 创建多实例任务中的实例任务
func (svc *JobSetService) CreateInstance(jobID schsdk.JobID, dataSet schsdk.JobFileInfo) (schsdk.JobID, schsdk.JobFilesUploadScheme, error) {
func (svc *JobSetService) CreateInstance(jobID schsdk.JobSetID, dataSet schsdk.JobFileInfo) (schsdk.JobID, schsdk.JobFilesUploadScheme, error) {
scheme := new(schsdk.JobFilesUploadScheme)
@ -26,7 +26,7 @@ func (svc *JobSetService) CreateInstance(jobID schsdk.JobID, dataSet schsdk.JobF
return resp.InstanceID, resp.UploadScheme, nil
}
func (svc *JobSetService) GetAvailableNodes() (*schsdk.AvailableNodesResp, error) {
func (svc *JobSetService) QueryRunningModels() (*schsdk.RunningModelResp, error) {
mgrCli, err := schglb.ManagerMQPool.Acquire()
if err != nil {
@ -34,7 +34,23 @@ func (svc *JobSetService) GetAvailableNodes() (*schsdk.AvailableNodesResp, error
}
defer schglb.ManagerMQPool.Release(mgrCli)
resp, err := mgrCli.GetAvailableNodes(&mgrmq.AvailableNodes{})
resp, err := mgrCli.QueryRunningModels(&mgrmq.AvailableNodes{})
if err != nil {
return nil, fmt.Errorf("submitting job set to manager: %w", err)
}
return resp, nil
}
func (svc *JobSetService) ECSNodeRunningInfo(req *schsdk.ECSNodeRunningInfoReq) (*schsdk.ECSNodeRunningInfoResp, error) {
mgrCli, err := schglb.ManagerMQPool.Acquire()
if err != nil {
return nil, fmt.Errorf("get available nodes: %w", err)
}
defer schglb.ManagerMQPool.Release(mgrCli)
resp, err := mgrCli.ECSNodeRunningInfo(req)
if err != nil {
return nil, fmt.Errorf("submitting job set to manager: %w", err)
}

View File

@ -97,12 +97,14 @@ func (c *HttpClient) GetReportInfo() (*http.Response, error) {
type TaskOperateInfo struct {
TaskID string
Operate string
Command string
}
func NewTaskOperateInfo(taskID string, command string) *TaskOperateInfo {
func NewTaskOperateInfo(taskID string, operate string, command string) *TaskOperateInfo {
return &TaskOperateInfo{
TaskID: taskID,
Operate: operate,
Command: command,
}
}

View File

@ -1,7 +1,6 @@
package task
import (
schsdk "gitlink.org.cn/cloudream/common/sdks/scheduler"
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
)
@ -9,29 +8,29 @@ type ScheduleCreateECS struct {
TaskInfoBase
UserID cdssdk.UserID `json:"userID"`
PackageID cdssdk.PackageID `json:"packageID"`
ModelID schsdk.ModelID `json:"modelID"`
Command string `json:"command"`
}
type ScheduleCreateECSStatus struct {
TaskStatusBase
Error string `json:"error"`
Address string `json:"address"`
ModelID schsdk.ModelID `json:"modelID"`
Error string `json:"error"`
Address string `json:"address"`
Operate string `json:"operate"`
}
func NewScheduleCreateECS(userID cdssdk.UserID, packageID cdssdk.PackageID, modelID schsdk.ModelID) *ScheduleCreateECS {
func NewScheduleCreateECS(userID cdssdk.UserID, packageID cdssdk.PackageID, command string) *ScheduleCreateECS {
return &ScheduleCreateECS{
UserID: userID,
PackageID: packageID,
ModelID: modelID,
Command: command,
}
}
func NewScheduleCreateECSStatus(address string, modelID schsdk.ModelID, err string) *ScheduleCreateECSStatus {
func NewScheduleCreateECSStatus(address string, operate string, err string) *ScheduleCreateECSStatus {
return &ScheduleCreateECSStatus{
Address: address,
ModelID: modelID,
Error: err,
Operate: operate,
}
}

View File

@ -18,7 +18,9 @@ type JobService interface {
CreateInstance(msg *CreateInstance) (*CreateInstanceResp, *mq.CodeMessage)
GetAvailableNodes(msg *AvailableNodes) (*schsdk.AvailableNodesResp, *mq.CodeMessage)
QueryRunningModels(msg *AvailableNodes) (*schsdk.RunningModelResp, *mq.CodeMessage)
ECSNodeRunningInfo(msg *schsdk.ECSNodeRunningInfoReq) (*schsdk.ECSNodeRunningInfoResp, *mq.CodeMessage)
}
// 提交任务集
@ -26,7 +28,9 @@ var _ = Register(Service.SubmitJobSet)
var _ = Register(Service.CreateInstance)
var _ = Register(Service.GetAvailableNodes)
var _ = Register(Service.QueryRunningModels)
var _ = Register(Service.ECSNodeRunningInfo)
type SubmitJobSet struct {
mq.MessageBodyBase
@ -55,8 +59,8 @@ func (c *Client) SubmitJobSet(msg *SubmitJobSet, opts ...mq.RequestOption) (*Sub
type CreateInstance struct {
mq.MessageBodyBase
JobID schsdk.JobID
DataSet schsdk.JobFileInfo
JobSetID schsdk.JobSetID
DataSet schsdk.JobFileInfo
}
type CreateInstanceResp struct {
@ -65,10 +69,10 @@ type CreateInstanceResp struct {
UploadScheme schsdk.JobFilesUploadScheme `json:"uploadScheme"`
}
func NewCreateInstance(jobID schsdk.JobID, dataSet schsdk.JobFileInfo) *CreateInstance {
func NewCreateInstance(jobSetID schsdk.JobSetID, dataSet schsdk.JobFileInfo) *CreateInstance {
return &CreateInstance{
JobID: jobID,
DataSet: dataSet,
JobSetID: jobSetID,
DataSet: dataSet,
}
}
@ -79,9 +83,9 @@ func NewCreateInstanceResp(InstanceID schsdk.JobID, UploadScheme schsdk.JobFiles
}
}
func NewAvailableNodesResp(nodes map[schsdk.ModelID]schsdk.AvailableNodes) *schsdk.AvailableNodesResp {
return &schsdk.AvailableNodesResp{
AvailableNodes: nodes,
func NewAvailableNodesResp(nodes map[string]schsdk.RunningModelInfo) *schsdk.RunningModelResp {
return &schsdk.RunningModelResp{
RunningModels: nodes,
}
}
@ -93,8 +97,12 @@ type AvailableNodes struct {
mq.MessageBodyBase
}
func (c *Client) GetAvailableNodes(msg *AvailableNodes, opts ...mq.RequestOption) (*schsdk.AvailableNodesResp, error) {
return mq.Request(Service.GetAvailableNodes, c.roundTripper, msg, opts...)
func (c *Client) QueryRunningModels(msg *AvailableNodes, opts ...mq.RequestOption) (*schsdk.RunningModelResp, error) {
return mq.Request(Service.QueryRunningModels, c.roundTripper, msg, opts...)
}
func (c *Client) ECSNodeRunningInfo(msg *schsdk.ECSNodeRunningInfoReq, opts ...mq.RequestOption) (*schsdk.ECSNodeRunningInfoResp, error) {
return mq.Request(Service.ECSNodeRunningInfo, c.roundTripper, msg, opts...)
}
// JobSet中需要使用的一个文件上传完成

View File

@ -19,7 +19,6 @@ func (s *DefaultPreScheduler) calcResourceScore(jobResource schsdk.JobResourcesI
if err != nil {
return err
}
cc.Resource = *res
}

View File

@ -378,6 +378,13 @@ func (s *DefaultPreScheduler) scheduleForNormalOrMultiJob(jobSet *schsdk.JobSetI
return nil, ErrNoAvailableScheme
}
// 这里写死,用于测试,生成环境必须删除
for i := 0; i < len(allCCsArr); i++ {
if allCCsArr[i].CC.CCID == 4 {
targetNode = allCCsArr[i]
}
}
scheme := s.makeSchemeForNode(jobFiles, targetNode)
return &scheme, nil
}

View File

@ -8,8 +8,8 @@ import (
"io/ioutil"
"log"
"math/rand"
"path/filepath"
"strconv"
"strings"
"time"
schsdk "gitlink.org.cn/cloudream/common/sdks/scheduler"
@ -17,7 +17,9 @@ import (
)
func MakeJobOutputPath(userID cdssdk.UserID, jobID schsdk.JobID) string {
return filepath.Join("jobs", strconv.FormatInt(int64(userID), 10), string(jobID), "output")
path := "jobs/" + strconv.FormatInt(int64(userID), 10) + "/" + string(jobID) + "/output"
//return filepath.Join("jobs", strconv.FormatInt(int64(userID), 10), string(jobID), "output")
return path
}
func MakeResourcePackageName(jobID schsdk.JobID) string {
@ -36,6 +38,20 @@ func GenerateRandomID() string {
return hashedID
}
func SplitCommands(command string) []string {
var commands []string
command = strings.Replace(command, "\\r\\n", "\\r//n", -1)
cmdArr := strings.Split(command, "\\n")
for i := 0; i < len(cmdArr); i++ {
if strings.Contains(cmdArr[i], "\\r//n") {
cmdArr[i] = strings.Replace(cmdArr[i], "\\r//n", "\\r\\n", -1)
}
commands = append(commands, strings.Trim(cmdArr[i], " "))
}
return commands
}
func GetSSHClient(username string, password string, address string) *ssh.Client {
// SSH连接配置
sshConfig := &ssh.ClientConfig{

View File

@ -29,14 +29,11 @@ func AliConfig(configMap map[string]interface{}) {
return
}
//var req ecs.RunInstancesRequest
err = json.Unmarshal(jsonData, &requestParam)
if err != nil {
log.Error(err)
return
}
//requestParam = &req
config := &openapi.Config{}
// 您的AccessKey ID
@ -49,8 +46,9 @@ func AliConfig(configMap map[string]interface{}) {
}
// CreateServer 创建实例
func (a *AliCloud) CreateServer() (string, error) {
func (a *AliCloud) CreateServer() (string, string, error) {
var instanceID string
var instanceIDArr string
tryErr := func() (_e error) {
defer func() {
@ -66,7 +64,7 @@ func (a *AliCloud) CreateServer() (string, error) {
return _err
}
instanceID = tea.StringValue(util.ToJSONString(responces.Body.InstanceIdSets.InstanceIdSet[0]))
//instanceIDArr := tea.StringValue(util.ToJSONString(responces.Body.InstanceIdSets.InstanceIdSet))
instanceIDArr = tea.StringValue(util.ToJSONString(responces.Body.InstanceIdSets.InstanceIdSet))
log.Info(tea.String("--------------------创建实例成功实例ID:" + tea.StringValue(util.ToJSONString(responces.Body.InstanceIdSets.InstanceIdSet)) + "--------------------"))
return nil
@ -81,38 +79,42 @@ func (a *AliCloud) CreateServer() (string, error) {
error.Message = tea.String(tryErr.Error())
}
log.Info(tea.String("--------------------创建实例失败:" + tea.StringValue(util.ToJSONString(error.Code)) + "--------------------"))
return "", tryErr
return "", "", tryErr
}
// 获取实例IP
//ip, _ := getInstanceIP(instanceIDArr, *aliclient.RegionId)
//println("ip: " + ip)
ip, _ := getInstanceIP(instanceIDArr, *aliclient.RegionId)
println("ip: " + ip)
return instanceID, nil
return instanceID, ip, nil
}
// RunCommand 执行指令
func (a *AliCloud) RunCommand(commands []string, instanceID string) (string, error) {
func (a *AliCloud) RunCommand(commands []string, instanceID string, timeout int) (string, error) {
var result string
for i := 0; i < len(commands); i++ {
log.Info("start execute command")
if i == len(commands)-3 {
println()
}
commandId, err := runShellCommand(commands[i], instanceID, *aliclient.RegionId)
if err != nil {
return "", err
}
// 判断是否执行成功
log.Info("describe result")
_, result, err = describeInvocationResults(aliclient, instanceID, commandId, tea.String("utf-8"), 500)
_, result, err = describeInvocationResults(aliclient, instanceID, commandId, tea.String("utf-8"), timeout)
if err != nil {
log.Error("describeInvocationResults: " + err.Error())
return "", err
}
}
return result, nil
}
// DestroyServer 强制销毁实例
func (a *AliCloud) DestroyServer(instanceID string) (string, error) {
func (a *AliCloud) DeleteInstance(instanceID string) (string, error) {
result, err := aliclient.DeleteInstance(&ecs.DeleteInstanceRequest{
InstanceId: &instanceID,
Force: tea.Bool(true),
@ -123,6 +125,28 @@ func (a *AliCloud) DestroyServer(instanceID string) (string, error) {
return tea.StringValue(result.Body.RequestId), nil
}
func (a *AliCloud) StopInstance(instanceID string) (string, error) {
result, err := aliclient.StopInstance(&ecs.StopInstanceRequest{
InstanceId: &instanceID,
ForceStop: tea.Bool(true),
})
if err != nil {
return "", err
}
return tea.StringValue(result.Body.RequestId), nil
}
func (a *AliCloud) RebootInstances(instanceID string) (string, error) {
result, err := aliclient.RebootInstances(&ecs.RebootInstancesRequest{
InstanceId: []*string{&instanceID},
RegionId: aliclient.RegionId,
})
if err != nil {
return "", err
}
return tea.StringValue(result.Body.RequestId), nil
}
func runShellCommand(commandContent string, instanceID string, regionId string) (*string, error) {
// 从CDS下载文件
commandRequest := ecs.RunCommandRequest{
@ -130,6 +154,7 @@ func runShellCommand(commandContent string, instanceID string, regionId string)
CommandContent: tea.String(commandContent),
Type: tea.String("RunShellScript"),
RegionId: &regionId,
Timeout: tea.Int64(2000),
}
// 发起请求

View File

@ -2,9 +2,11 @@ package create_ecs
// CloudProvider 是一个接口,定义了创建服务器的方法
type CloudProvider interface {
CreateServer() (string, error)
RunCommand(commands []string, instanceID string) (string, error)
DestroyServer(instanceID string) (string, error)
CreateServer() (string, string, error)
RunCommand(commands []string, instanceID string, timeout int) (string, error)
DeleteInstance(instanceID string) (string, error)
StopInstance(instanceID string) (string, error)
RebootInstances(instanceID string) (string, error)
}
type CloudFactory interface {

View File

@ -12,6 +12,21 @@ import (
// HuaweiCloud实现了CloudProvider接口
type HuaweiCloud struct{}
func (a *HuaweiCloud) DeleteInstance(instanceID string) (string, error) {
//TODO implement me
panic("implement me")
}
func (a *HuaweiCloud) StopInstance(instanceID string) (string, error) {
//TODO implement me
panic("implement me")
}
func (a *HuaweiCloud) RebootInstances(instanceID string) (string, error) {
//TODO implement me
panic("implement me")
}
var serverbody model.PrePaidServer
var hwConfigMap map[string]interface{}
var hwclient ecs.EcsClient
@ -45,7 +60,7 @@ func HWCloudConfig(configMap map[string]interface{}) {
}
func (a *HuaweiCloud) CreateServer() (string, error) {
func (a *HuaweiCloud) CreateServer() (string, string, error) {
request := &model.CreateServersRequest{}
request.Body = &model.CreateServersRequestBody{
@ -58,10 +73,10 @@ func (a *HuaweiCloud) CreateServer() (string, error) {
fmt.Println(err)
}
//ids := response.ServerIds
return "", nil
return "", "", nil
}
func (a *HuaweiCloud) RunCommand(commands []string, instanceID string) (string, error) {
func (a *HuaweiCloud) RunCommand(commands []string, instanceID string, timeout int) (string, error) {
//TODO implement me
panic("implement me")
}

View File

@ -6,6 +6,8 @@ import (
pcmsdk "gitlink.org.cn/cloudream/common/sdks/pcm"
schglb "gitlink.org.cn/cloudream/scheduler/common/globals"
exectsk "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq/executor/task"
"gitlink.org.cn/cloudream/scheduler/common/utils"
"strings"
)
type PCMSubmitTask struct {
@ -24,7 +26,8 @@ func (t *PCMSubmitTask) Execute(task *Task, ctx TaskContext) {
defer log.Debugf("end")
//err := t.do(task, ctx)
err := error(nil)
err := t.finetuning_test(task, ctx)
//err := error(nil)
if err != nil {
//TODO 若任务失败上报的状态failed字段根据情况修改
task.SendStatus(exectsk.NewSubmitTaskStatus("failed", err.Error()))
@ -93,3 +96,24 @@ func (t *PCMSubmitTask) do(task *Task, ctx TaskContext) error {
func init() {
Register(NewPCMSubmitTask)
}
func (t *PCMSubmitTask) finetuning_test(task *Task, ctx TaskContext) error {
var commands []string
commandContent := "sed -i '/SCH_DATA_IN/d' ~/.bashrc && echo 'export SCH_DATA_IN=" + strings.Replace(t.Envs[0].Value, "\\", "/", -1) + "' >> ~/.bashrc"
commands = append(commands, commandContent)
commandContent = "sed -i '/SCH_DATA_OUT/d' ~/.bashrc && echo 'export SCH_DATA_OUT=" + strings.Replace(t.Envs[1].Value, "\\", "/", -1) + "' >> ~/.bashrc"
commands = append(commands, commandContent)
arr := utils.SplitCommands(t.CMD)
commands = append(commands, arr...)
//factory := create_ecs.GetFactory(config.CloudName)
//provider := factory.CreateProvider()
//_, err := provider.RunCommand(commands, "i-bp1ikwdsr5r9p5i9mggm", 2000)
//if err != nil {
// return err
//}
return nil
}

View File

@ -1,14 +1,14 @@
package task
import (
"fmt"
"gitlink.org.cn/cloudream/common/pkgs/logger"
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
schglb "gitlink.org.cn/cloudream/scheduler/common/globals"
schsdk "gitlink.org.cn/cloudream/common/sdks/scheduler"
"gitlink.org.cn/cloudream/scheduler/common/pkgs/mq/executor"
exectsk "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq/executor/task"
"gitlink.org.cn/cloudream/scheduler/common/utils"
"gitlink.org.cn/cloudream/scheduler/executor/internal/config"
"gitlink.org.cn/cloudream/scheduler/executor/internal/globals"
//"gitlink.org.cn/cloudream/scheduler/executor/internal/config"
"gitlink.org.cn/cloudream/scheduler/executor/internal/task/create_ecs"
)
@ -27,6 +27,7 @@ func (t *ScheduleCreateECS) Execute(task *Task, ctx TaskContext) {
log.Debugf("begin")
defer log.Debugf("end")
//task.SendStatus(exectsk.NewScheduleCreateECSStatus("address", ""))
err := t.do(task, ctx)
if err != nil {
log.Error(err)
@ -36,102 +37,101 @@ func (t *ScheduleCreateECS) Execute(task *Task, ctx TaskContext) {
log.Info("ScheduleCreateECS...")
}
var count = 1
func (t *ScheduleCreateECS) do(task *Task, ctx TaskContext) error {
stgCli, err := schglb.CloudreamStoragePool.Acquire()
if err != nil {
return fmt.Errorf("new cloudream storage client: %w", err)
}
defer schglb.CloudreamStoragePool.Release(stgCli)
resp, err := stgCli.Package().Get(cdssdk.PackageGetReq{
PackageID: t.PackageID,
UserID: t.UserID,
})
if err != nil {
return err
}
println(resp.Name)
CDSRcloneID := schglb.CloudreamStorageConfig.URL + "/object/download?userID=1&objectID=" + schglb.CDSRclone.CDSRcloneID
CDSRcloneConfigID := schglb.CloudreamStorageConfig.URL + "/object/download?userID=1&objectID=" + schglb.CDSRclone.CDSRcloneConfigID
println("CDSRcloneID: " + CDSRcloneID)
println("CDSRcloneConfigID: " + CDSRcloneConfigID)
var commands []string
commandContent := "yum install -y fuse3"
commands = append(commands, commandContent)
commandContent = "mkdir -p /opt/rclone/ \n mkdir -p /mnt/cds/"
commands = append(commands, commandContent)
commandContent = "cd /opt/rclone \n python3 -c 'import requests;response=requests.get(\"" + CDSRcloneID + "\",stream=True);response.raise_for_status();boundary=response.headers.get(\"Content-Type\").split(\"boundary=\")[-1].encode();content=response.content;body=[part.split(b\"\\r\\n\\r\\n\",1)[1].rsplit(b\"\\r\\n--\",1)[0] for part in content.split(b\"--\"+boundary+b\"\\r\\n\") if b\"filename=\" in part][0];open(\"rclone\",\"wb\").write(body);print(\"success\")'\n"
commands = append(commands, commandContent)
commandContent = "cd /opt/rclone \n python3 -c 'import requests;response=requests.get(\"" + CDSRcloneConfigID + "\",stream=True);response.raise_for_status();boundary=response.headers.get(\"Content-Type\").split(\"boundary=\")[-1].encode();content=response.content;body=[part.split(b\"\\r\\n\\r\\n\",1)[1].rsplit(b\"\\r\\n--\",1)[0] for part in content.split(b\"--\"+boundary+b\"\\r\\n\") if b\"filename=\" in part][0];open(\"rclone.conf\",\"wb\").write(body);print(\"success\")'\n"
commands = append(commands, commandContent)
commandContent = "cd /opt/rclone \n chmod +x rclone"
commands = append(commands, commandContent)
commandContent = "cd /opt/rclone \n nohup ./rclone mount cds: /mnt/cds --vfs-cache-mode full --vfs-read-wait 0 --vfs-read-chunk-size 128M --cache-db-purge -vv > rclone.log 2>&1 &"
commands = append(commands, commandContent)
commandContent = "cd /mnt/cds/bkt1/tiny_model/ \n sh execute.sh"
commands = append(commands, commandContent)
commands := utils.SplitCommands(t.Command)
// 创建云主机
factory := create_ecs.GetFactory(config.CloudName)
provider := factory.CreateProvider()
instanceID, err := provider.CreateServer()
if err != nil {
task.SendStatus(exectsk.NewScheduleCreateECSStatus("", t.ModelID, err.Error()))
return err
}
instanceID := "i-bp1e0vaetd39iyfhpdpc"
ecsIP := "47.96.5.29"
//err := error(nil)
//if count == 2 {
// instanceID = "i-bp1dokchih168087wqck"
// ecsIP = "118.31.50.157"
//}
//count = 2
address, err := provider.RunCommand(commands, instanceID)
if err != nil {
task.SendStatus(exectsk.NewScheduleCreateECSStatus("", t.ModelID, err.Error()))
return err
}
println(len(commands))
//println(err)
//instanceID, ecsIP, err := provider.CreateServer()
//if err != nil {
// task.SendStatus(exectsk.NewScheduleCreateECSStatus("", "", err.Error()))
// return err
//}
logger.Info("create ECS success, instance id: " + instanceID + ", ip: " + ecsIP)
//_, err = provider.RunCommand(commands, instanceID, 2000)
//if err != nil {
// task.SendStatus(exectsk.NewScheduleCreateECSStatus("", "", err.Error()))
// return err
//}
// 返回执行结果
task.SendStatus(exectsk.NewScheduleCreateECSStatus(address, t.ModelID, ""))
task.SendStatus(exectsk.NewScheduleCreateECSStatus("http://"+ecsIP+":5013/chat", schsdk.CreateECS, ""))
println("create ECS success, waiting msg...")
// 监听更新操作
for {
taskOperate, err := task.taskChan.Chan.Receive()
if err != nil {
task.SendStatus(exectsk.NewScheduleCreateECSStatus("", t.ModelID, err.Error()))
return err
task.SendStatus(exectsk.NewScheduleCreateECSStatus("", "", err.Error()))
continue
}
info, ok := taskOperate.(executor.TaskOperateInfo)
if !ok {
task.SendStatus(exectsk.NewScheduleCreateECSStatus("", t.ModelID, "invalid task operate info"))
return fmt.Errorf("invalid task operate info")
task.SendStatus(exectsk.NewScheduleCreateECSStatus("", "", "invalid task operate info"))
continue
}
switch info.Command {
case globals.RESTART:
var commands []string
commandContent := "yum install -y fuse3"
commands = append(commands, commandContent)
result, err := provider.RunCommand(commands, instanceID)
switch info.Operate {
case schsdk.RunECS:
_, err := provider.RebootInstances(instanceID)
if err != nil {
task.SendStatus(exectsk.NewScheduleCreateECSStatus("", t.ModelID, err.Error()))
return err
task.SendStatus(exectsk.NewScheduleCreateECSStatus("", "", err.Error()))
continue
}
task.SendStatus(exectsk.NewScheduleCreateECSStatus(result, t.ModelID, ""))
case globals.STOP:
println("STOP")
case globals.DESTROY:
result, err := provider.DestroyServer(instanceID)
task.SendStatus(exectsk.NewScheduleCreateECSStatus("", schsdk.RunECS, ""))
case schsdk.PauseECS:
_, err := provider.StopInstance(instanceID)
if err != nil {
task.SendStatus(exectsk.NewScheduleCreateECSStatus("", t.ModelID, err.Error()))
return err
task.SendStatus(exectsk.NewScheduleCreateECSStatus("", "", err.Error()))
continue
}
task.SendStatus(exectsk.NewScheduleCreateECSStatus(result, t.ModelID, ""))
task.SendStatus(exectsk.NewScheduleCreateECSStatus("", schsdk.PauseECS, ""))
case schsdk.DestroyECS:
_, err := provider.DeleteInstance(instanceID)
if err != nil {
task.SendStatus(exectsk.NewScheduleCreateECSStatus("", "", err.Error()))
continue
}
task.SendStatus(exectsk.NewScheduleCreateECSStatus("", schsdk.DestroyECS, ""))
case schsdk.OperateServer:
//executeCommands(provider, instanceID, task, info.Command)
default:
//executeCommands(provider, instanceID, task, info.Command)
}
}
}
func executeCommands(provider create_ecs.CloudProvider, instanceID string, task *Task, command string) {
commands := utils.SplitCommands(command)
_, err := provider.RunCommand(commands, instanceID, 2000)
if err != nil {
task.SendStatus(exectsk.NewScheduleCreateECSStatus("", "", err.Error()))
return
}
task.SendStatus(exectsk.NewScheduleCreateECSStatus("", "", ""))
}
func init() {
Register(NewScheduleCreateECS)
}

View File

@ -4,7 +4,6 @@ import (
"gitlink.org.cn/cloudream/common/pkgs/future"
"gitlink.org.cn/cloudream/common/pkgs/types"
schsdk "gitlink.org.cn/cloudream/common/sdks/scheduler"
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
"gitlink.org.cn/cloudream/common/utils/serder"
)
@ -46,9 +45,10 @@ type InstanceCreateInfo struct {
type InstanceUpdateInfo struct {
serder.Metadata `union:"Update"`
InstanceInfoBase
Type string `json:"type"`
Info schsdk.UpdateMultiInstanceJobInfo `json:"info"`
PackageID cdssdk.PackageID `json:"packageID"`
Type string `json:"type"`
Info schsdk.UpdateMultiInstanceJobInfo `json:"info"`
//PackageID cdssdk.PackageID `json:"packageID"`
//LoRAPackage string `json:"loraPackage"`
}
func NewInstanceOperate(info InstanceOperateInfo, future OperateInstanceFuture) *InstanceOperate {

View File

@ -6,6 +6,7 @@ type JobUpdateFuture = *future.SetValueFuture[UpdateResult]
type Update struct {
Command string
Operate string
Result JobUpdateFuture
}
@ -15,9 +16,10 @@ type UpdateResult struct {
Err error
}
func NewUpdate(command string, jobUpdateFuture JobUpdateFuture) *Update {
func NewUpdate(command string, operate string, jobUpdateFuture JobUpdateFuture) *Update {
return &Update{
Command: command,
Operate: operate,
Result: jobUpdateFuture,
}
}

View File

@ -12,6 +12,7 @@ import (
"gitlink.org.cn/cloudream/scheduler/manager/internal/executormgr"
jobTask "gitlink.org.cn/cloudream/scheduler/manager/internal/task"
"path/filepath"
"time"
"gitlink.org.cn/cloudream/common/pkgs/logger"
pcmsdk "gitlink.org.cn/cloudream/common/sdks/pcm"
@ -96,24 +97,6 @@ func (s *NormalJobExecuting) do(rtx jobmgr.JobStateRunContext, jo *jobmgr.Job) e
return fmt.Errorf("no resource found at computing center %v", targetCCID)
}
// TODO 判断是否是模型推理任务,如果是,则进行扩缩容管理
if modelJobInfo != nil {
// 发送扩容任务
ecs := exetsk.NewScheduleCreateECS(
userID,
packageID,
schsdk.ModelID(modelJobInfo.ModelID),
)
task, err := rtx.Mgr.ExecMgr.StartTask(ecs, ccInfo)
if err != nil {
log.Error(err.Error())
return err
}
return s.listen(rtx, jo, task, ccInfo)
}
stgCli, err := schglb.CloudreamStoragePool.Acquire()
if err != nil {
return fmt.Errorf("new cds client: %w", err)
@ -127,6 +110,24 @@ func (s *NormalJobExecuting) do(rtx jobmgr.JobStateRunContext, jo *jobmgr.Job) e
return fmt.Errorf("request to cds: %w", err)
}
// TODO 判断是否是模型推理任务,如果是,则进行扩缩容管理
if modelJobInfo != nil {
// 发送扩容任务
ecs := exetsk.NewScheduleCreateECS(
userID,
packageID,
// modelJobInfo.Command是模型更新的脚本
runtime.Command+"\\n"+modelJobInfo.Command,
)
task, err := rtx.Mgr.ExecMgr.StartTask(ecs, ccInfo)
if err != nil {
log.Error(err.Error())
return err
}
return s.listen(rtx, jo, task, ccInfo, *modelJobInfo)
}
// 判断算力中心是否支持环境变量配置如果不支持则读取脚本内容并拼接在Command参数后面
var envs []schsdk.KVPair
var params []string
@ -192,7 +193,7 @@ func (s *NormalJobExecuting) do(rtx jobmgr.JobStateRunContext, jo *jobmgr.Job) e
}
}
func (s *NormalJobExecuting) listen(rtx jobmgr.JobStateRunContext, jo *jobmgr.Job, task *jobTask.JobTask[mgrmq.ExecutorTaskStatus], ccInfo schmod.ComputingCenter) error {
func (s *NormalJobExecuting) listen(rtx jobmgr.JobStateRunContext, jo *jobmgr.Job, task *jobTask.JobTask[mgrmq.ExecutorTaskStatus], ccInfo schmod.ComputingCenter, modelJobInfo schsdk.ModelJobInfo) error {
log := logger.WithType[NormalJobExecuting]("State").WithField("TaskID", task.ID())
waitFut := event.BeginWaitType[*event.Update](rtx.EventSet)
@ -207,7 +208,9 @@ func (s *NormalJobExecuting) listen(rtx jobmgr.JobStateRunContext, jo *jobmgr.Jo
return fmt.Errorf("getting executor client: %w", err)
}
evt := v1.Value.(*event.Update)
operateResp, err := client.OperateTask(executor.NewTaskOperateInfo(task.ID(), evt.Command))
//obsPath = strings.Replace(obsPath, "\\", "/", -1)
//evt.Command = strings.Replace(evt.Command, "@lora_path@", "/mnt/obs"+obsPath, -1)
operateResp, err := client.OperateTask(executor.NewTaskOperateInfo(task.ID(), evt.Operate, evt.Command))
if err != nil {
return fmt.Errorf("operate task: %w", err)
}
@ -225,14 +228,33 @@ func (s *NormalJobExecuting) listen(rtx jobmgr.JobStateRunContext, jo *jobmgr.Jo
case msg := <-taskFut.Chan():
switch v2 := msg.Value.Status.(type) {
case *exetsk.ScheduleCreateECSStatus:
// 扩容任务,将结果放到池子中
node := schsdk.NodeInfo{
InstanceID: jo.JobID,
Address: schsdk.Address(v2.Address),
if v2.Error != "" {
logger.Error("update task fail, error: " + v2.Error)
continue
}
switch v2.Operate {
case schsdk.CreateECS:
// 扩容任务,将结果放到池子中
node := schsdk.NodeInfo{
InstanceID: jo.JobID,
Address: schsdk.Address(v2.Address),
Status: schsdk.RunStatus,
}
jobmgr.SetNodeData(jo.JobSetID, modelJobInfo, node)
log.Infof("node expansion: %v", v2.Address)
case schsdk.DestroyECS:
// 缩容任务,将节点从节点中移除
jobmgr.RemoveNodeFromRunningModels(modelJobInfo, jo.JobID)
case schsdk.PauseECS:
// 更新节点状态
jobmgr.UpdateNodeFromRunningModels(modelJobInfo, jo.JobID, schsdk.StopStatus)
case schsdk.RunECS:
// 更新节点状态
jobmgr.UpdateNodeFromRunningModels(modelJobInfo, jo.JobID, schsdk.RunStatus)
}
jobmgr.SetNodeData(schsdk.JobID(jo.JobSetID), v2.ModelID, node)
log.Infof("node expansion: %v", v2.Address)
case error:
fmt.Println("Received error:", v2.Error())
default:
@ -287,6 +309,8 @@ func (s *DataReturnJobExecuting) do(rtx jobmgr.JobStateRunContext, jo *jobmgr.Jo
}
packageName := utils.MakeResourcePackageName(reJob.TargetJobID)
logger.Info("TargetJobOutputPath: " + reJob.TargetJobOutputPath + ", and packageName: " + packageName)
time.Sleep(30 * time.Second)
task, err := rtx.Mgr.ExecMgr.StartTask(exetsk.NewStorageCreatePackage(
userID, // TOOD 用户ID
ccInfo.CDSStorageID,

View File

@ -54,29 +54,31 @@ func (s *MultiInstanceRunning) do(rtx jobmgr.JobStateRunContext, jo *jobmgr.Job)
case *event.InstanceCreateInfo:
createInstance(rtx, info, s.preScheduler, jo, multInstJob, instanceFuture)
case *event.InstanceUpdateInfo:
updateInstance(rtx, info, multInstJob, instanceFuture)
subJobs := info.Info.SubJobs
// 微调任务特殊处理
if info.Info.UpdateType == schsdk.FineTuning {
multInstJob.Info.ModelJobInfo.Command = info.Info.Runtime.Command
subJobs = multInstJob.SubJobs
}
updateInstance(rtx, info, subJobs, instanceFuture)
}
}
}
func updateInstance(rtx jobmgr.JobStateRunContext, updateInfo *event.InstanceUpdateInfo, parentJob *job.MultiInstanceJob, updateInstanceFuture event.OperateInstanceFuture) {
// 更新策略
strategy := updateInfo.Info.UpdateStrategy
println("update strategy: " + strategy)
func updateInstance(rtx jobmgr.JobStateRunContext, updateInfo *event.InstanceUpdateInfo, subJobs []schsdk.JobID, updateInstanceFuture event.OperateInstanceFuture) {
var failJobs []string
var wg sync.WaitGroup
for i := 0; i < len(parentJob.SubJobs); i++ {
for i := 0; i < len(subJobs); i++ {
// 发送请求进行任务更新
instanceID := parentJob.SubJobs[i]
instanceID := subJobs[i]
wg.Add(1)
go func() {
defer wg.Done()
fut := future.NewSetValue[event.UpdateResult]()
rtx.Mgr.PostEvent(instanceID, event.NewUpdate("update", fut))
rtx.Mgr.PostEvent(instanceID, event.NewUpdate(updateInfo.Info.Runtime.Command, updateInfo.Info.Operate, fut))
_, err := fut.Wait(context.TODO())
if err != nil {

View File

@ -9,10 +9,10 @@ import (
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
schglb "gitlink.org.cn/cloudream/scheduler/common/globals"
jobmod "gitlink.org.cn/cloudream/scheduler/common/models/job"
exectsk "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq/executor/task"
"gitlink.org.cn/cloudream/scheduler/manager/internal/jobmgr"
"gitlink.org.cn/cloudream/scheduler/manager/internal/jobmgr/event"
"gitlink.org.cn/cloudream/scheduler/manager/internal/jobmgr/job"
"strings"
)
type MultiInstanceUpdate struct {
@ -26,7 +26,11 @@ func NewMultiInstanceUpdate(originalJob jobmod.JobDump) *MultiInstanceUpdate {
}
func (s *MultiInstanceUpdate) Run(rtx jobmgr.JobStateRunContext, job *jobmgr.Job) {
s.do(rtx, job)
err := s.do(rtx, job)
if err != nil {
logger.Error("update multi instance failed: %s", err)
return
}
}
func (s *MultiInstanceUpdate) do(rtx jobmgr.JobStateRunContext, jo *jobmgr.Job) error {
@ -41,73 +45,55 @@ func (s *MultiInstanceUpdate) do(rtx jobmgr.JobStateRunContext, jo *jobmgr.Job)
cancel()
}()
var pkgID cdssdk.PackageID
// 等待回源任务完成
if rt, ok := updateJob.Info.Files.Code.(*schsdk.DataReturnJobFileInfo); ok {
evt, ok := event.WaitTypeAnd[*event.JobCompleted](ctx, rtx.EventSet, func(val *event.JobCompleted) bool {
return val.Job.GetInfo().GetLocalJobID() == rt.DataReturnLocalJobID
var fullPath string
instanceJob := jo.Body.(*job.UpdateMultiInstanceJob)
if instanceJob.Info.UpdateType == schsdk.FineTuning {
var dtrJob *job.DataReturnJob
// 等待回源任务完成
if rt, ok := updateJob.Info.Files.Code.(*schsdk.DataReturnJobFileInfo); ok {
evt, ok := event.WaitTypeAnd[*event.JobCompleted](ctx, rtx.EventSet, func(val *event.JobCompleted) bool {
return val.Job.GetInfo().GetLocalJobID() == rt.DataReturnLocalJobID
})
if !ok {
return jobmgr.ErrJobCancelled
}
if evt.Err != nil {
return fmt.Errorf("depended job %s was failed", evt.Job.JobID)
}
dtrJob, ok = evt.Job.Body.(*job.DataReturnJob)
if !ok {
return fmt.Errorf("job %s is not a DataReturn job(which is %T)", evt.Job.JobID, evt.Job)
}
}
stgCli, err := schglb.CloudreamStoragePool.Acquire()
if err != nil {
return fmt.Errorf("new cloudream storage client: %w", err)
}
defer schglb.CloudreamStoragePool.Release(stgCli)
ccInfo, err := rtx.Mgr.DB.ComputingCenter().GetByID(rtx.Mgr.DB.SQLCtx(), dtrJob.TargetJobCCID)
if err != nil {
return fmt.Errorf("getting computing center info: %w", err)
}
userID := cdssdk.UserID(1)
getStg, err := stgCli.StorageGet(cdssdk.StorageGet{
UserID: userID,
StorageID: ccInfo.CDSStorageID,
})
if !ok {
return jobmgr.ErrJobCancelled
}
if evt.Err != nil {
return fmt.Errorf("depended job %s was failed", evt.Job.JobID)
}
rtJob, ok := evt.Job.Body.(*job.DataReturnJob)
if !ok {
return fmt.Errorf("job %s is not a DataReturn job(which is %T)", evt.Job.JobID, evt.Job)
}
pkgID = rtJob.DataReturnPackageID
}
// 获取包对象列表
stgCli, err := schglb.CloudreamStoragePool.Acquire()
if err != nil {
return fmt.Errorf("new cloudream storage client: %w", err)
}
defer schglb.CloudreamStoragePool.Release(stgCli)
// TODO UserID
pkgObjs, err := stgCli.Object().GetPackageObjects(cdssdk.ObjectGetPackageObjects{UserID: 1, PackageID: pkgID})
if err != nil {
return fmt.Errorf("getting package objects: %w", err)
}
// 获取原始任务信息
originalMultiInstanceJobBody := s.originalJob.Body.(*jobmod.MultiInstanceJobDump)
originalPackageID := originalMultiInstanceJobBody.Files.Code.PackageID
var objArr []cdssdk.MovingObject
for _, obj := range pkgObjs.Objects {
objArr = append(objArr, cdssdk.MovingObject{
ObjectID: obj.ObjectID,
PackageID: originalPackageID,
Path: obj.Path,
loadPackageResp, err := stgCli.StorageLoadPackage(cdssdk.StorageLoadPackageReq{
UserID: userID,
PackageID: dtrJob.DataReturnPackageID,
StorageID: getStg.StorageID,
})
}
// TODO UserID
objMoveParam := cdssdk.ObjectMove{
UserID: 1,
Movings: objArr,
}
ccInfo, err := rtx.Mgr.DB.ComputingCenter().GetByID(rtx.Mgr.DB.SQLCtx(), originalMultiInstanceJobBody.TargetCCID)
if err != nil {
return fmt.Errorf("getting computing center info: %w", err)
}
// 将增量包合并到原有包中
taskStatus, err := rtx.Mgr.ExecMgr.StartTask(exectsk.NewStorageMoveObject(objMoveParam), ccInfo)
if err != nil {
return fmt.Errorf("moving package: %w", err)
}
statusFut := taskStatus.Receive()
status := <-statusFut.Chan()
moveStatus := status.Value.Status.(*exectsk.StorageMoveObjectStatus)
if moveStatus.Error != "" {
return fmt.Errorf("moving package: %s", moveStatus.Error)
logger.Info("load pacakge path: " + loadPackageResp.FullPath)
fullPath = loadPackageResp.FullPath
}
// 发送事件更新各个instance
updateJob.Info.Runtime.Command = strings.Replace(updateJob.Info.Runtime.Command, "$1", fullPath, -1)
updateInfo := event.InstanceUpdateInfo{
Info: updateJob.Info,
}

View File

@ -1,31 +1,137 @@
package jobmgr
import (
"github.com/patrickmn/go-cache"
"gitlink.org.cn/cloudream/common/pkgs/logger"
schsdk "gitlink.org.cn/cloudream/common/sdks/scheduler"
"strconv"
"time"
)
var nodeMap = make(map[schsdk.ModelID]schsdk.AvailableNodes)
func SetNodeData(jobID schsdk.JobID, modelID schsdk.ModelID, node schsdk.NodeInfo) {
value, ok := nodeMap[modelID]
var runningModels = make(map[string]schsdk.RunningModelInfo)
// SetNodeData 新增节点
func SetNodeData(jobSetID schsdk.JobSetID, modelJobInfo schsdk.ModelJobInfo, node schsdk.NodeInfo) {
key := string(modelJobInfo.CustomModelName) + "_" + string(modelJobInfo.ModelID)
value, ok := runningModels[key]
// 如果不存在
if !ok {
nodes := []schsdk.NodeInfo{node}
value = schsdk.AvailableNodes{
JobID: jobID,
Nodes: nodes,
value = schsdk.RunningModelInfo{
JobSetID: jobSetID,
Nodes: nodes,
ModelID: modelJobInfo.ModelID,
// 这里的model name应该从数据库中查询
ModelName: "",
CustomModelName: modelJobInfo.CustomModelName,
}
nodeMap[modelID] = value
runningModels[key] = value
return
}
// 如果存在
value.Nodes = append(value.Nodes, node)
runningModels[key] = value
}
// RemoveNodeFromRunningModels 移除节点
func RemoveNodeFromRunningModels(modelJobInfo schsdk.ModelJobInfo, instanceID schsdk.JobID) {
key := string(modelJobInfo.CustomModelName) + "_" + string(modelJobInfo.ModelID)
value, ok := runningModels[key]
if !ok {
return
}
// 如果存在
value.Nodes = append(value.Nodes, node)
nodeMap[modelID] = value
for i := 0; i < len(value.Nodes); i++ {
node := value.Nodes[i]
if node.InstanceID == instanceID {
value.Nodes = append(value.Nodes[:i], value.Nodes[i+1:]...)
runningModels[key] = value
logger.Info("remove node success from running models, job id: " + instanceID)
break
}
}
}
func GetAvailableNodes() map[schsdk.ModelID]schsdk.AvailableNodes {
func UpdateNodeFromRunningModels(modelJobInfo schsdk.ModelJobInfo, instanceID schsdk.JobID, status string) {
key := string(modelJobInfo.CustomModelName) + "_" + string(modelJobInfo.ModelID)
value, ok := runningModels[key]
if !ok {
return
}
return nodeMap
for i := 0; i < len(value.Nodes); i++ {
node := value.Nodes[i]
if node.InstanceID == instanceID {
node.Status = status
logger.Info("update node success from running models, job id: " + instanceID)
break
}
}
}
func GetAvailableNodes() map[string]schsdk.RunningModelInfo {
return runningModels
}
var rateInfos []schsdk.NodeUsageRateInfo
// 模拟获取节点使用率
func GetNodeUsageRateInfo(customModelName schsdk.ModelName, modelID schsdk.ModelID) []schsdk.NodeUsageRateInfo {
key := string(customModelName) + "_" + string(modelID)
value, ok := runningModels[key]
if !ok {
return nil
}
for i := 0; i < len(value.Nodes); i++ {
node := value.Nodes[i]
//c := cachePool[node.NodeID]
//rates := getCacheData(c)
rateInfo := schsdk.NodeUsageRateInfo{
InstanceID: node.InstanceID,
Address: node.Address,
GPURate: []schsdk.UsageRate{
{
Timestamp: strconv.FormatInt(time.Now().Unix(), 10),
Number: "10.1",
},
{
Timestamp: strconv.FormatInt(time.Now().Unix(), 10),
Number: "20",
},
},
AccCardRate: []schsdk.UsageRate{
{
Timestamp: strconv.FormatInt(time.Now().Unix(), 10),
Number: "3",
},
{
Timestamp: strconv.FormatInt(time.Now().Unix(), 10),
Number: "4.55",
},
},
}
rateInfos = append(rateInfos, rateInfo)
}
//c := cache.New(5*time.Minute, 10*time.Minute)
return rateInfos
}
func getCacheData(c *cache.Cache) []schsdk.UsageRate {
var usageRates []schsdk.UsageRate
// 获取缓存中的所有项
items := c.Items()
// 遍历缓存项,将其放入 map 中
for key, item := range items {
usageRates = append(usageRates, schsdk.UsageRate{
Timestamp: key,
Number: string(item.Object.([]byte)),
})
}
return usageRates
}

View File

@ -95,7 +95,21 @@ func (svc *Service) CreateInstance(instInfo *mgrmq.CreateInstance) (*mgrmq.Creat
info := event.InstanceCreateInfo{
DataSet: instInfo.DataSet,
}
svc.jobMgr.PostEvent(instInfo.JobID, event.NewInstanceOperate(&info, fut))
instanceJobSets := svc.jobMgr.DumpJobSet(instInfo.JobSetID)
if len(instanceJobSets) == 0 {
return nil, mq.Failed(errorcode.OperationFailed, fmt.Sprintf("job set %s is not found", instInfo.JobSetID))
}
// 找到多实例任务本身
var jobID schsdk.JobID
for i := 0; i < len(instanceJobSets); i++ {
jobDump := instanceJobSets[i]
if _, ok := jobDump.Body.(*jobmod.MultiInstanceJobDump); ok {
jobID = jobDump.JobID
break
}
}
svc.jobMgr.PostEvent(jobID, event.NewInstanceOperate(&info, fut))
result, err := fut.Wait(context.TODO())
@ -106,11 +120,16 @@ func (svc *Service) CreateInstance(instInfo *mgrmq.CreateInstance) (*mgrmq.Creat
return mq.ReplyOK(mgrmq.NewCreateInstanceResp(result.JobID, result.FilesUploadScheme))
}
func (svc *Service) GetAvailableNodes(msg *mgrmq.AvailableNodes) (*schsdk.AvailableNodesResp, *mq.CodeMessage) {
func (svc *Service) QueryRunningModels(msg *mgrmq.AvailableNodes) (*schsdk.RunningModelResp, *mq.CodeMessage) {
availableNodes := jobmgr.GetAvailableNodes()
return mq.ReplyOK(mgrmq.NewAvailableNodesResp(availableNodes))
}
func (svc *Service) ECSNodeRunningInfo(req *schsdk.ECSNodeRunningInfoReq) (*schsdk.ECSNodeRunningInfoResp, *mq.CodeMessage) {
info := jobmgr.GetNodeUsageRateInfo(req.CustomModelName, req.ModelID)
return mq.ReplyOK(schsdk.NewECSNodeRunningInfoResp(info))
}
// 任务集中某个文件上传完成
func (svc *Service) JobSetLocalFileUploaded(msg *mgrmq.JobSetLocalFileUploaded) (*mgrmq.JobSetLocalFileUploadedResp, *mq.CodeMessage) {
logger.WithField("LocalPath", msg.LocalPath).