新增曙光节点创建等功能
This commit is contained in:
parent
23fdd794e7
commit
b0804a3a0b
|
@ -5,14 +5,20 @@
|
|||
"outputDirectory": "log",
|
||||
"level": "debug"
|
||||
},
|
||||
"rabbitMQ": {
|
||||
"rabbitMQ2": {
|
||||
"address": "101.201.215.196:5672",
|
||||
"account": "cloudream",
|
||||
"password": "123456",
|
||||
"vhost": "/"
|
||||
},
|
||||
"rabbitMQ": {
|
||||
"address": "localhost:5672",
|
||||
"account": "cloudream",
|
||||
"password": "123456",
|
||||
"vhost": "/"
|
||||
},
|
||||
"cloudreamStorage": {
|
||||
"url": "http://120.46.183.86:7890"
|
||||
"url": "http://121.36.5.116:7890"
|
||||
},
|
||||
"pcm": {
|
||||
"url": "http://112.95.163.90:5676"
|
||||
|
|
|
@ -5,13 +5,19 @@
|
|||
"outputDirectory": "log",
|
||||
"level": "debug"
|
||||
},
|
||||
"rabbitMQ2": {
|
||||
"address": "101.201.215.196:5672",
|
||||
"account": "cloudream",
|
||||
"password": "123456",
|
||||
"vhost": "/"
|
||||
},
|
||||
"rabbitMQ": {
|
||||
"address": "127.0.0.1:5672",
|
||||
"address": "localhost:5672",
|
||||
"account": "cloudream",
|
||||
"password": "123456",
|
||||
"vhost": "/"
|
||||
},
|
||||
"cloudreamStorage": {
|
||||
"url": "http://localhost:7890"
|
||||
"url": "http://121.36.5.116:7890"
|
||||
}
|
||||
}
|
|
@ -5,14 +5,20 @@
|
|||
"outputDirectory": "log",
|
||||
"level": "debug"
|
||||
},
|
||||
"rabbitMQ": {
|
||||
"rabbitMQ2": {
|
||||
"address": "101.201.215.196:5672",
|
||||
"account": "cloudream",
|
||||
"password": "123456",
|
||||
"vhost": "/"
|
||||
},
|
||||
"rabbitMQ": {
|
||||
"address": "localhost:5672",
|
||||
"account": "cloudream",
|
||||
"password": "123456",
|
||||
"vhost": "/"
|
||||
},
|
||||
"cloudreamStorage": {
|
||||
"url": "http://120.46.183.86:7890"
|
||||
"url": "http://121.36.5.116:7890"
|
||||
},
|
||||
"unifyOps": {
|
||||
"url": "http://localhost:7891"
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
{
|
||||
"application": {
|
||||
"executorID": "1",
|
||||
"executorID": "2",
|
||||
"address": ":7895"
|
||||
},
|
||||
"logger": {
|
||||
|
@ -21,27 +21,84 @@
|
|||
"pcm": {
|
||||
"url": "http://localhost:7070"
|
||||
},
|
||||
"rclone": {
|
||||
"cds_rcloneID": "9471093",
|
||||
"cds_rcloneConfigID": "9471094"
|
||||
},
|
||||
"reportIntervalSec": 10,
|
||||
"createECS": {
|
||||
"cloud": "ali",
|
||||
"config": {
|
||||
"AccessKeyId": "LTAI5tMraAgfzhrF4PF79Js4",
|
||||
"AccessKeySecret": "aWTkvrBWZt58kvpop7MNwTDMinJFtj",
|
||||
"createECS-ali": {
|
||||
"cloud": "AliCloud",
|
||||
"auth_config": {
|
||||
"AccessKeyId": "LTAI5tJBqN3uRnzXeiiXTxkT",
|
||||
"AccessKeySecret": "dilS4SJ0I3SMWtY7h1ByHe3MOULuGA"
|
||||
},
|
||||
"ecs_config": {
|
||||
"DryRun": false,
|
||||
"RegionId": "cn-hangzhou",
|
||||
"ImageId": "aliyun_3_x64_20G_alibase_20221102.vhd",
|
||||
"InstanceType": "ecs.n1.small",
|
||||
"SecurityGroupId": "sg-bp1bf5fdq5eud6tkpdkw",
|
||||
"VSwitchId": "vsw-bp170tc427pqitcofaeyr",
|
||||
"ImageId": "aliyun_3_x64_20G_alibase_20240528.vhd",
|
||||
"InstanceType": "ecs.c8i.3xlarge",
|
||||
"SecurityGroupId": "sg-bp149yz2yypr2zp3acp8",
|
||||
"VSwitchId": "vsw-bp1i7k93le8oghskdggv3",
|
||||
"InstanceChargeType": "PostPaid",
|
||||
"InternetChargeType": "PayByTraffic",
|
||||
"SystemDisk": {
|
||||
"Category": "cloud_ssd",
|
||||
"Size": "40"
|
||||
"Category": "cloud_essd",
|
||||
"Size": "60"
|
||||
},
|
||||
"HostName": "ECS-test",
|
||||
"Password": "ECS@test1234",
|
||||
"InternetMaxBandwidthOut": 10
|
||||
"InternetMaxBandwidthOut": 100
|
||||
}
|
||||
},
|
||||
"createECS-hw": {
|
||||
"cloud": "HuaweiCloud",
|
||||
"auth_config": {
|
||||
"AccessKeyId": "LTAI5tMraAgfzhrF4PF79Js4",
|
||||
"AccessKeySecret": "aWTkvrBWZt58kvpop7MNwTDMinJFtj"
|
||||
},
|
||||
"ecs_config": {
|
||||
"Region": "cn-hangzhou"
|
||||
}
|
||||
},
|
||||
"createECS": {
|
||||
"cloud": "SugonCloud",
|
||||
"auth_config": {
|
||||
"user": "acgnnmfbwo",
|
||||
"password": "Pcl@2020",
|
||||
"orgid": "c8befbc1301665ba2dc5b2826f8dca1e",
|
||||
"clusterName": "华东一区【昆山】",
|
||||
"get_token_url": "https://ac.sugon.com/ac/openapi/v2/tokens"
|
||||
},
|
||||
"ecs_config": {
|
||||
"description": "",
|
||||
"rdma": false,
|
||||
"taskType": "ssh",
|
||||
"taskClassification": "interactive",
|
||||
"acceleratorType": "dcu",
|
||||
"version": "jupyterlab-xinference:vllm0.5-ubuntu20.04-dtk24.04-py3.10",
|
||||
"originalVersion": "",
|
||||
"imagePath": "11.11.100.6:5000/dcu/admin/jupyterlab/jupyterlab-xinference:vllm0.5-ubuntu20.04-dtk24.04-py3.10",
|
||||
"timeoutLimit": "01:00:00",
|
||||
"taskNumber": 1,
|
||||
"resourceGroup": "kshdtest",
|
||||
"useStartScript": true,
|
||||
"startScriptActionScope": "all",
|
||||
"startScriptContent": "start.sh",
|
||||
"useStartServiceCommand": false,
|
||||
"startServiceCommand": "aa",
|
||||
"cpuNumber": 8,
|
||||
"ramSize": 25600,
|
||||
"gpuNumber": 1,
|
||||
"env": "",
|
||||
"mountInfoList": [
|
||||
],
|
||||
"containerPortInfoList": [
|
||||
{
|
||||
"protocolType": "HTTP",
|
||||
"containerPort": "8081"
|
||||
}
|
||||
],
|
||||
"acceleratorDesc": "4*异构加速卡1"
|
||||
}
|
||||
}
|
||||
}
|
|
@ -97,11 +97,16 @@ type ModelResource struct {
|
|||
}
|
||||
|
||||
type ObjectStorage struct {
|
||||
ID int64 `json:"ID" db:"ID"`
|
||||
Name string `json:"name" db:"name"`
|
||||
Manufacturer string `json:"manufacturer" db:"manufacturer"`
|
||||
Region string `json:"region" db:"region"`
|
||||
AK string `json:"access_key_id" db:"access_key_id"`
|
||||
SK string `json:"secret_access_key" db:"secret_access_key"`
|
||||
Endpoint string `json:"endpoint" db:"endpoint"`
|
||||
Bucket string `json:"bucket" db:"bucket"`
|
||||
CDSStorageID cdssdk.StorageID `json:"CDSStorageID" db:"CDSStorageID"`
|
||||
MountType string `json:"mountType"`
|
||||
}
|
||||
|
||||
func (i *CCResourceInfo) Scan(src interface{}) error {
|
||||
|
@ -112,3 +117,9 @@ func (i *CCResourceInfo) Scan(src interface{}) error {
|
|||
|
||||
return serder.JSONToObject(data, i)
|
||||
}
|
||||
|
||||
const (
|
||||
HuaweiCloud = "HuaweiCloud"
|
||||
AliCloud = "AliCloud"
|
||||
SugonCloud = "SugonCloud"
|
||||
)
|
||||
|
|
|
@ -16,6 +16,6 @@ func (db *DB) ObjectStorage() *ObjectStorageDB {
|
|||
|
||||
func (*ObjectStorageDB) GetObjectStorageByStorageID(ctx SQLContext, CDSStorageID cdssdk.StorageID) (schmod.ObjectStorage, error) {
|
||||
var ret schmod.ObjectStorage
|
||||
err := sqlx.Get(ctx, &ret, "select access_key_id, secret_access_key, endpoint, bucket, CDSStorageID from ObjectStorage where CDSStorageID = ?", CDSStorageID)
|
||||
err := sqlx.Get(ctx, &ret, "select access_key_id, secret_access_key, endpoint, bucket, CDSStorageID, mountType from ObjectStorage where CDSStorageID = ?", CDSStorageID)
|
||||
return ret, err
|
||||
}
|
||||
|
|
|
@ -0,0 +1,293 @@
|
|||
package executor
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
schsdk "gitlink.org.cn/cloudream/common/sdks/scheduler"
|
||||
myhttp "gitlink.org.cn/cloudream/common/utils/http"
|
||||
"gitlink.org.cn/cloudream/common/utils/serder"
|
||||
"io"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"strings"
|
||||
)
|
||||
|
||||
type sugonHeader struct {
|
||||
Token string `json:"token"`
|
||||
}
|
||||
|
||||
func (c *HttpClient) CreateSugonInstance(token string, config map[string]interface{}) (string, error) {
|
||||
targetURL, err := url.JoinPath(c.baseURL + "/ai/openapi/v2/instance-service/task")
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
header := sugonHeader{
|
||||
Token: token,
|
||||
}
|
||||
body, err := json.Marshal(config)
|
||||
resp, err := myhttp.PostJSONRow(targetURL, myhttp.RequestParam{
|
||||
Body: body,
|
||||
Header: header,
|
||||
})
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
bodyBytes, err := io.ReadAll(resp.Body)
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("reading response body: %w", err)
|
||||
}
|
||||
contType := resp.Header.Get("Content-Type")
|
||||
if strings.Contains(contType, myhttp.ContentTypeJSON) {
|
||||
var codeResp response[any]
|
||||
if err := serder.JSONToObject(bodyBytes, &codeResp); err != nil {
|
||||
return "", fmt.Errorf("parsing response: %w", err)
|
||||
}
|
||||
|
||||
if codeResp.Code == "0" {
|
||||
return codeResp.Data.(string), nil
|
||||
}
|
||||
|
||||
return "", codeResp.ToError()
|
||||
}
|
||||
|
||||
return "", fmt.Errorf("unknow response content type: %s", contType)
|
||||
}
|
||||
|
||||
type GetInstanceIDReq struct {
|
||||
InstanceServiceName string `json:"instanceServiceName"`
|
||||
TaskType string `json:"taskType"`
|
||||
Start int `json:"start"`
|
||||
Limit int `json:"limit"`
|
||||
Status string `json:"status"`
|
||||
Sort string `json:"sort"`
|
||||
}
|
||||
|
||||
type instanceService struct {
|
||||
ID string `json:"id"`
|
||||
}
|
||||
|
||||
type GetInstanceIDResp struct {
|
||||
Code string `json:"code"`
|
||||
Msg string `json:"msg"`
|
||||
Data []instanceService `json:"data"`
|
||||
}
|
||||
|
||||
func (c *HttpClient) GetInstanceID(token string, instanceName string) (string, error) {
|
||||
targetURL, err := url.JoinPath(c.baseURL + "/ai/openapi/v2/instance-service/task")
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
header := sugonHeader{
|
||||
Token: token,
|
||||
}
|
||||
|
||||
req := GetInstanceIDReq{
|
||||
InstanceServiceName: instanceName,
|
||||
//TaskType: "ssh",
|
||||
Start: 0,
|
||||
Limit: 20,
|
||||
Sort: "desc",
|
||||
}
|
||||
|
||||
resp, err := myhttp.GetJSON(targetURL, myhttp.RequestParam{
|
||||
Body: req,
|
||||
Header: header,
|
||||
})
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
bodyBytes, err := io.ReadAll(resp.Body)
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("reading response body: %w", err)
|
||||
}
|
||||
contType := resp.Header.Get("Content-Type")
|
||||
if strings.Contains(contType, myhttp.ContentTypeJSON) {
|
||||
var codeResp GetInstanceIDResp
|
||||
if err := serder.JSONToObject(bodyBytes, &codeResp); err != nil {
|
||||
return "", fmt.Errorf("parsing response: %w", err)
|
||||
}
|
||||
|
||||
if codeResp.Code != "0" {
|
||||
return "", err
|
||||
}
|
||||
|
||||
if len(codeResp.Data) > 0 {
|
||||
return codeResp.Data[0].ID, nil
|
||||
}
|
||||
|
||||
return "", err
|
||||
}
|
||||
|
||||
return "", fmt.Errorf("unknow response content type: %s", contType)
|
||||
}
|
||||
|
||||
type GetInstanceUrlResp struct {
|
||||
ContainerPortInfoList []containerPortInfoList `json:"containerPortInfoList"`
|
||||
}
|
||||
|
||||
type containerPortInfoList struct {
|
||||
AccessUrl string `json:"accessUrl"`
|
||||
}
|
||||
|
||||
func (c *HttpClient) GetInstanceUrl(token string, instanceID string) (string, error) {
|
||||
path := "/ai/openapi/v2/instance-service/" + instanceID + "/detail"
|
||||
targetURL, err := url.JoinPath(c.baseURL + path)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
header := sugonHeader{
|
||||
Token: token,
|
||||
}
|
||||
resp, err := myhttp.GetJSON(targetURL, myhttp.RequestParam{
|
||||
Header: header,
|
||||
})
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
bodyBytes, err := io.ReadAll(resp.Body)
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("reading response body: %w", err)
|
||||
}
|
||||
contType := resp.Header.Get("Content-Type")
|
||||
if strings.Contains(contType, myhttp.ContentTypeJSON) {
|
||||
var codeResp response[GetInstanceUrlResp]
|
||||
if err := serder.JSONToObject(bodyBytes, &codeResp); err != nil {
|
||||
return "", fmt.Errorf("parsing response: %w", err)
|
||||
}
|
||||
|
||||
if codeResp.Code != "0" {
|
||||
return "", codeResp.ToError()
|
||||
}
|
||||
|
||||
if len(codeResp.Data.ContainerPortInfoList) > 0 {
|
||||
return codeResp.Data.ContainerPortInfoList[0].AccessUrl, nil
|
||||
}
|
||||
|
||||
return "", codeResp.ToError()
|
||||
}
|
||||
|
||||
return "", fmt.Errorf("unknow response content type: %s", contType)
|
||||
}
|
||||
|
||||
func (c *HttpClient) OperateSugonInstance(token string, instanceID string, operate string) (string, error) {
|
||||
var resp *http.Response
|
||||
header := sugonHeader{
|
||||
Token: token,
|
||||
}
|
||||
|
||||
switch operate {
|
||||
case schsdk.RunECS:
|
||||
path := "/ai/openapi/v2/instance-service/task/actions/restart" + "?instanceServiceId=" + instanceID
|
||||
targetURL, err := url.JoinPath(c.baseURL + path)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
resp, err = myhttp.PostJSON(targetURL, myhttp.RequestParam{
|
||||
Header: header,
|
||||
})
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
case schsdk.PauseECS:
|
||||
path := "/ai/openapi/v2/instance-service/task/actions/stop" + "?ids=" + instanceID
|
||||
targetURL, err := url.JoinPath(c.baseURL + path)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
resp, err = myhttp.PostJSON(targetURL, myhttp.RequestParam{
|
||||
Header: header,
|
||||
})
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
case schsdk.DestroyECS:
|
||||
path := "/ai/openapi/v2/instance-service/task" + "?ids=" + instanceID
|
||||
targetURL, err := url.JoinPath(c.baseURL + path)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
resp, err = myhttp.DeleteJSON(targetURL, myhttp.RequestParam{
|
||||
Header: header,
|
||||
})
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
default:
|
||||
return "", fmt.Errorf("unknow operate")
|
||||
}
|
||||
|
||||
contType := resp.Header.Get("Content-Type")
|
||||
if strings.Contains(contType, myhttp.ContentTypeJSON) {
|
||||
var codeResp response[any]
|
||||
if err := serder.JSONToObjectStream(resp.Body, &codeResp); err != nil {
|
||||
return "", fmt.Errorf("parsing response: %w", err)
|
||||
}
|
||||
|
||||
if codeResp.Code == "0" {
|
||||
return codeResp.Data.(string), nil
|
||||
}
|
||||
|
||||
return "", codeResp.ToError()
|
||||
}
|
||||
|
||||
return "", fmt.Errorf("unknow response content type: %s", contType)
|
||||
}
|
||||
|
||||
type runCommandReq struct {
|
||||
ID string `json:"id"`
|
||||
StartScriptActionScope string `json:"startScriptActionScope"`
|
||||
StartScriptContent string `json:"startScriptContent"`
|
||||
}
|
||||
|
||||
func (c *HttpClient) RunCommand(token string, instanceID string, content string) (string, error) {
|
||||
targetURL, err := url.JoinPath(c.baseURL + "/ai/openapi/v2/instance-service/task/actions/execute-script")
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
header := sugonHeader{
|
||||
Token: token,
|
||||
}
|
||||
req := runCommandReq{
|
||||
ID: instanceID,
|
||||
StartScriptActionScope: "all",
|
||||
StartScriptContent: content,
|
||||
}
|
||||
resp, err := myhttp.PostJSON(targetURL, myhttp.RequestParam{
|
||||
Header: header,
|
||||
Body: req,
|
||||
})
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
bodyBytes, err := io.ReadAll(resp.Body)
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("reading response body: %w", err)
|
||||
}
|
||||
println(string(bodyBytes))
|
||||
|
||||
contType := resp.Header.Get("Content-Type")
|
||||
if strings.Contains(contType, myhttp.ContentTypeJSON) {
|
||||
var codeResp response[any]
|
||||
if err := serder.JSONToObject(bodyBytes, &codeResp); err != nil {
|
||||
return "", fmt.Errorf("parsing response: %w", err)
|
||||
}
|
||||
|
||||
if codeResp.Code == "0" {
|
||||
return codeResp.Code, nil
|
||||
}
|
||||
|
||||
return "", codeResp.ToError()
|
||||
}
|
||||
|
||||
return "", fmt.Errorf("unknow response content type: %s", contType)
|
||||
}
|
|
@ -4,6 +4,7 @@ import (
|
|||
"fmt"
|
||||
"gitlink.org.cn/cloudream/common/consts/errorcode"
|
||||
"gitlink.org.cn/cloudream/common/pkgs/mq"
|
||||
schsdk "gitlink.org.cn/cloudream/common/sdks/scheduler"
|
||||
myhttp "gitlink.org.cn/cloudream/common/utils/http"
|
||||
"gitlink.org.cn/cloudream/common/utils/serder"
|
||||
"net/http"
|
||||
|
@ -98,14 +99,14 @@ func (c *HttpClient) GetReportInfo() (*http.Response, error) {
|
|||
type TaskOperateInfo struct {
|
||||
TaskID string
|
||||
Operate string
|
||||
Command string
|
||||
Runtime schsdk.JobRuntimeInfo
|
||||
}
|
||||
|
||||
func NewTaskOperateInfo(taskID string, operate string, command string) *TaskOperateInfo {
|
||||
func NewTaskOperateInfo(taskID string, operate string, runtime schsdk.JobRuntimeInfo) *TaskOperateInfo {
|
||||
return &TaskOperateInfo{
|
||||
TaskID: taskID,
|
||||
Operate: operate,
|
||||
Command: command,
|
||||
Runtime: runtime,
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -3,15 +3,16 @@ package task
|
|||
import (
|
||||
schsdk "gitlink.org.cn/cloudream/common/sdks/scheduler"
|
||||
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
|
||||
schmod "gitlink.org.cn/cloudream/scheduler/common/models"
|
||||
)
|
||||
|
||||
type ScheduleCreateECS struct {
|
||||
TaskInfoBase
|
||||
UserID cdssdk.UserID `json:"userID"`
|
||||
PackageID cdssdk.PackageID `json:"packageID"`
|
||||
Command string `json:"command"`
|
||||
StartShellPath string `json:"startShellPath"`
|
||||
Envs []schsdk.KVPair `json:"envs"`
|
||||
UserID cdssdk.UserID `json:"userID"`
|
||||
Command string `json:"command"`
|
||||
ObjectStorage schmod.ObjectStorage `json:"objectStorage"`
|
||||
ModelResource schmod.ModelResource `json:"modelResource"`
|
||||
Envs []schsdk.KVPair `json:"envs"`
|
||||
}
|
||||
|
||||
type ScheduleCreateECSStatus struct {
|
||||
|
@ -21,13 +22,13 @@ type ScheduleCreateECSStatus struct {
|
|||
Operate string `json:"operate"`
|
||||
}
|
||||
|
||||
func NewScheduleCreateECS(userID cdssdk.UserID, packageID cdssdk.PackageID, command string, startShellPath string, envs []schsdk.KVPair) *ScheduleCreateECS {
|
||||
func NewScheduleCreateECS(userID cdssdk.UserID, command string, objectStorage schmod.ObjectStorage, modelResource schmod.ModelResource, envs []schsdk.KVPair) *ScheduleCreateECS {
|
||||
return &ScheduleCreateECS{
|
||||
UserID: userID,
|
||||
PackageID: packageID,
|
||||
Command: command,
|
||||
StartShellPath: startShellPath,
|
||||
Envs: envs,
|
||||
UserID: userID,
|
||||
Command: command,
|
||||
ObjectStorage: objectStorage,
|
||||
ModelResource: modelResource,
|
||||
Envs: envs,
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -9,34 +9,12 @@ type ExecutorService interface {
|
|||
//ReportExecutorTaskStatus(msg *ReportExecutorTaskStatus) (*ReportExecutorTaskStatusResp, *mq.CodeMessage)
|
||||
}
|
||||
|
||||
// 接收executor上报的存活状态及任务执行情况
|
||||
//var _ = Register(Service.ReportExecutorTaskStatus)
|
||||
|
||||
// type ReportExecutorTaskStatus struct {
|
||||
// mq.MessageBodyBase
|
||||
// ExecutorID schmod.ExecutorID `json:"executorID"`
|
||||
// TaskStatus []ExecutorTaskStatus `json:"taskStatus"`
|
||||
// }
|
||||
//
|
||||
// type ReportExecutorTaskStatusResp struct {
|
||||
// mq.MessageBodyBase
|
||||
// }
|
||||
type ExecutorTaskStatus struct {
|
||||
ExecutorID schmod.ExecutorID `json:"executorID"`
|
||||
TaskID string `json:"taskID"`
|
||||
Status exectsk.TaskStatus `json:"status"`
|
||||
}
|
||||
|
||||
// func NewReportExecutorTaskStatus(executorID schmod.ExecutorID, taskStatus []ExecutorTaskStatus) *ReportExecutorTaskStatus {
|
||||
// return &ReportExecutorTaskStatus{
|
||||
// ExecutorID: executorID,
|
||||
// TaskStatus: taskStatus,
|
||||
// }
|
||||
// }
|
||||
//
|
||||
// func NewReportExecutorTaskStatusResp() *ReportExecutorTaskStatusResp {
|
||||
// return &ReportExecutorTaskStatusResp{}
|
||||
// }
|
||||
func NewExecutorTaskStatus(executorID schmod.ExecutorID, taskID string, status exectsk.TaskStatus) ExecutorTaskStatus {
|
||||
return ExecutorTaskStatus{
|
||||
ExecutorID: executorID,
|
||||
|
@ -44,7 +22,3 @@ func NewExecutorTaskStatus(executorID schmod.ExecutorID, taskID string, status e
|
|||
Status: status,
|
||||
}
|
||||
}
|
||||
|
||||
//func (c *Client) ReportExecutorTaskStatus(msg *ReportExecutorTaskStatus, opts ...mq.RequestOption) (*ReportExecutorTaskStatusResp, error) {
|
||||
// return mq.Request(Service.ReportExecutorTaskStatus, c.roundTripper, msg, opts...)
|
||||
//}
|
||||
|
|
|
@ -52,6 +52,22 @@ func SplitCommands(command string) []string {
|
|||
return commands
|
||||
}
|
||||
|
||||
func ConvertEnvsToCommand(envs []schsdk.KVPair) []string {
|
||||
var commands []string
|
||||
|
||||
for i := 0; i < len(envs); i++ {
|
||||
value := strings.Replace(envs[i].Value, "\\", "/", -1)
|
||||
commandContent := "sed -i '/@key@/d' ~/.bashrc && echo 'export @key@=@value@' >> ~/.bashrc"
|
||||
commandContent = strings.Replace(commandContent, "@key@", envs[i].Key, -1)
|
||||
commandContent = strings.Replace(commandContent, "@value@", value, -1)
|
||||
commands = append(commands, commandContent)
|
||||
}
|
||||
commandContent := "sudo source ~/.bashrc"
|
||||
commands = append(commands, commandContent)
|
||||
|
||||
return commands
|
||||
}
|
||||
|
||||
func GetSSHClient(username string, password string, address string) *ssh.Client {
|
||||
// SSH连接配置
|
||||
sshConfig := &ssh.ClientConfig{
|
||||
|
|
|
@ -2,20 +2,14 @@ package config
|
|||
|
||||
import (
|
||||
"fmt"
|
||||
schmod "gitlink.org.cn/cloudream/scheduler/common/models"
|
||||
"gitlink.org.cn/cloudream/scheduler/executor/internal/task/create_ecs"
|
||||
)
|
||||
|
||||
var CloudName string
|
||||
|
||||
func InitCloud(createECS map[string]interface{}) {
|
||||
// Extract the createECS section
|
||||
//createECS, ok := configMap["createECS"].(map[string]interface{})
|
||||
//if !ok {
|
||||
// fmt.Println("Invalid JSON structure: createECS section is missing or malformed")
|
||||
// return
|
||||
//}
|
||||
|
||||
// Extract the cloud type
|
||||
cloud, ok := createECS["cloud"].(string)
|
||||
CloudName = cloud
|
||||
if !ok {
|
||||
|
@ -23,19 +17,25 @@ func InitCloud(createECS map[string]interface{}) {
|
|||
return
|
||||
}
|
||||
|
||||
// Extract the config section
|
||||
config, ok := createECS["config"].(map[string]interface{})
|
||||
auth_config, ok := createECS["auth_config"].(map[string]interface{})
|
||||
if !ok {
|
||||
fmt.Println("Invalid JSON structure: config section is missing or malformed")
|
||||
return
|
||||
}
|
||||
|
||||
ecs_config, ok := createECS["ecs_config"].(map[string]interface{})
|
||||
if !ok {
|
||||
fmt.Println("Invalid JSON structure: config section is missing or malformed")
|
||||
return
|
||||
}
|
||||
|
||||
// Check the cloud type and generate query string accordingly
|
||||
switch cloud {
|
||||
case "AliCloud":
|
||||
create_ecs.AliConfig(config)
|
||||
case "HuaweiCloud":
|
||||
create_ecs.HWCloudConfig(config)
|
||||
case schmod.AliCloud:
|
||||
create_ecs.AliConfig(auth_config, ecs_config)
|
||||
case schmod.HuaweiCloud:
|
||||
create_ecs.HWCloudConfig(auth_config, ecs_config)
|
||||
case schmod.SugonCloud:
|
||||
create_ecs.SugonCloudConfig(auth_config, ecs_config)
|
||||
default:
|
||||
fmt.Println("Unsupported cloud type:", cloud)
|
||||
return
|
||||
|
|
|
@ -6,14 +6,6 @@ import (
|
|||
|
||||
var ExecutorID schmod.ExecutorID
|
||||
|
||||
func Init(id schmod.ExecutorID) {
|
||||
//ExecutorID = schmod.ExecutorID(uuid.NewString())
|
||||
func InitExecutorID(id schmod.ExecutorID) {
|
||||
ExecutorID = id
|
||||
}
|
||||
|
||||
const (
|
||||
UPDATE = "update"
|
||||
STOP = "stop"
|
||||
RESTART = "restart"
|
||||
DESTROY = "destroy"
|
||||
)
|
||||
|
|
|
@ -3,7 +3,6 @@ package http
|
|||
import (
|
||||
"github.com/gin-gonic/gin"
|
||||
"gitlink.org.cn/cloudream/common/pkgs/logger"
|
||||
"gitlink.org.cn/cloudream/scheduler/executor/internal/reporter"
|
||||
"gitlink.org.cn/cloudream/scheduler/executor/internal/services"
|
||||
)
|
||||
|
||||
|
@ -11,7 +10,6 @@ type Server struct {
|
|||
engine *gin.Engine
|
||||
listenAddr string
|
||||
svc *services.Service
|
||||
reporter *reporter.Reporter
|
||||
}
|
||||
|
||||
func NewServer(listenAddr string, svc *services.Service) (*Server, error) {
|
||||
|
|
|
@ -1,86 +0,0 @@
|
|||
package reporter
|
||||
|
||||
import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
schmod "gitlink.org.cn/cloudream/scheduler/common/models"
|
||||
exectsk "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq/executor/task"
|
||||
)
|
||||
|
||||
type Reporter struct {
|
||||
executorID schmod.ExecutorID
|
||||
reportInterval time.Duration
|
||||
taskStatus map[string]exectsk.TaskStatus
|
||||
taskStatusLock sync.Mutex
|
||||
reportNow chan bool
|
||||
}
|
||||
|
||||
func NewReporter(executorID schmod.ExecutorID, reportInterval time.Duration) Reporter {
|
||||
return Reporter{
|
||||
executorID: executorID,
|
||||
reportInterval: reportInterval,
|
||||
taskStatus: make(map[string]exectsk.TaskStatus),
|
||||
reportNow: make(chan bool),
|
||||
}
|
||||
}
|
||||
|
||||
func (r *Reporter) Report(taskID string, taskStatus exectsk.TaskStatus) {
|
||||
r.taskStatusLock.Lock()
|
||||
defer r.taskStatusLock.Unlock()
|
||||
|
||||
r.taskStatus[taskID] = taskStatus
|
||||
}
|
||||
|
||||
func (r *Reporter) ReportNow() {
|
||||
select {
|
||||
case r.reportNow <- true:
|
||||
default:
|
||||
}
|
||||
}
|
||||
|
||||
func (r *Reporter) Serve() error {
|
||||
//magCli, err := schglb.ManagerMQPool.Acquire()
|
||||
//if err != nil {
|
||||
// return fmt.Errorf("new manager client: %w", err)
|
||||
//}
|
||||
//defer schglb.ManagerMQPool.Release(magCli)
|
||||
|
||||
ticker := time.NewTicker(r.reportInterval)
|
||||
defer ticker.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ticker.C:
|
||||
case <-r.reportNow:
|
||||
ticker.Reset(r.reportInterval)
|
||||
}
|
||||
|
||||
//r.taskStatusLock.Lock()
|
||||
//var taskStatus []mgrmq.ExecutorTaskStatus
|
||||
//for taskID, status := range r.taskStatus {
|
||||
// taskStatus = append(taskStatus, mgrmq.NewExecutorTaskStatus(taskID, status))
|
||||
//}
|
||||
//r.taskStatus = make(map[string]exectsk.TaskStatus)
|
||||
//r.taskStatusLock.Unlock()
|
||||
|
||||
//status := mgrmq.NewReportExecutorTaskStatus(r.executorID, taskStatus)
|
||||
//// 将数据发送到管道中
|
||||
//globals.EventChannel <- *status
|
||||
|
||||
//_, err := magCli.ReportExecutorTaskStatus(mgrmq.NewReportExecutorTaskStatus(r.executorID, taskStatus))
|
||||
|
||||
//if err != nil {
|
||||
// logger.Warnf("reporting to manager: %s", err.Error())
|
||||
//
|
||||
// //若上报失败,数据应保留
|
||||
// r.taskStatusLock.Lock()
|
||||
// for _, ts := range taskStatus {
|
||||
// if _, exists := r.taskStatus[ts.TaskID]; !exists {
|
||||
// r.taskStatus[ts.TaskID] = ts.Status
|
||||
// }
|
||||
// }
|
||||
// r.taskStatusLock.Unlock()
|
||||
//}
|
||||
}
|
||||
}
|
|
@ -21,10 +21,10 @@ var aliclient = &ecs.Client{}
|
|||
var requestParam ecs.RunInstancesRequest
|
||||
var aliConfigMap map[string]interface{}
|
||||
|
||||
func AliConfig(configMap map[string]interface{}) {
|
||||
aliConfigMap = configMap
|
||||
func AliConfig(authConfigs map[string]interface{}, ecsConfigs map[string]interface{}) {
|
||||
aliConfigMap = ecsConfigs
|
||||
|
||||
jsonData, err := json.Marshal(configMap)
|
||||
jsonData, err := json.Marshal(ecsConfigs)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
@ -37,9 +37,9 @@ func AliConfig(configMap map[string]interface{}) {
|
|||
|
||||
config := &openapi.Config{}
|
||||
// 您的AccessKey ID
|
||||
config.AccessKeyId = tea.String(configMap["AccessKeyId"].(string))
|
||||
config.AccessKeyId = tea.String(authConfigs["AccessKeyId"].(string))
|
||||
// 您的AccessKey Secret
|
||||
config.AccessKeySecret = tea.String(configMap["AccessKeySecret"].(string))
|
||||
config.AccessKeySecret = tea.String(authConfigs["AccessKeySecret"].(string))
|
||||
// 您的可用区ID
|
||||
config.RegionId = requestParam.RegionId
|
||||
aliclient, _ = ecs.NewClient(config)
|
||||
|
|
|
@ -1,5 +1,7 @@
|
|||
package create_ecs
|
||||
|
||||
import schmod "gitlink.org.cn/cloudream/scheduler/common/models"
|
||||
|
||||
// CloudProvider 是一个接口,定义了创建服务器的方法
|
||||
type CloudProvider interface {
|
||||
CreateServer() (string, string, error)
|
||||
|
@ -28,12 +30,18 @@ func (f *AliCloudFactory) CreateProvider() CloudProvider {
|
|||
return &AliCloud{}
|
||||
}
|
||||
|
||||
type ShuGuangCloudFactory struct{}
|
||||
|
||||
func (f *ShuGuangCloudFactory) CreateProvider() CloudProvider {
|
||||
return &SugonCloud{}
|
||||
}
|
||||
|
||||
// GetFactory 根据云平台类型返回对应的工厂
|
||||
func GetFactory(providerType string) CloudFactory {
|
||||
switch providerType {
|
||||
case "HuaweiCloud":
|
||||
case schmod.HuaweiCloud:
|
||||
return &HuaweiCloudFactory{}
|
||||
case "AliCloud":
|
||||
case schmod.AliCloud:
|
||||
return &AliCloudFactory{}
|
||||
default:
|
||||
return nil
|
||||
|
|
|
@ -36,10 +36,10 @@ var serverbody model.PrePaidServer
|
|||
var hwConfigMap map[string]interface{}
|
||||
var hwclient ecs.EcsClient
|
||||
|
||||
func HWCloudConfig(configMap map[string]interface{}) {
|
||||
hwConfigMap = configMap
|
||||
func HWCloudConfig(authConfigs map[string]interface{}, ecsConfigs map[string]interface{}) {
|
||||
hwConfigMap = ecsConfigs
|
||||
|
||||
jsonData, err := json.Marshal(configMap)
|
||||
jsonData, err := json.Marshal(ecsConfigs)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
@ -49,8 +49,8 @@ func HWCloudConfig(configMap map[string]interface{}) {
|
|||
return
|
||||
}
|
||||
|
||||
ak := configMap["AccessKeyId"].(string)
|
||||
sk := configMap["AccessKeySecret"].(string)
|
||||
ak := authConfigs["AccessKeyId"].(string)
|
||||
sk := authConfigs["AccessKeySecret"].(string)
|
||||
|
||||
auth := basic.NewCredentialsBuilder().
|
||||
WithAk(ak).
|
||||
|
@ -59,7 +59,7 @@ func HWCloudConfig(configMap map[string]interface{}) {
|
|||
|
||||
hwclient = *ecs.NewEcsClient(
|
||||
ecs.EcsClientBuilder().
|
||||
WithRegion(region.ValueOf(configMap["Region"].(string))).
|
||||
WithRegion(region.ValueOf(ecsConfigs["Region"].(string))).
|
||||
WithCredential(auth).
|
||||
Build())
|
||||
|
||||
|
|
|
@ -0,0 +1,209 @@
|
|||
package create_ecs
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"gitlink.org.cn/cloudream/common/pkgs/logger"
|
||||
schsdk "gitlink.org.cn/cloudream/common/sdks/scheduler"
|
||||
myhttp "gitlink.org.cn/cloudream/common/utils/http"
|
||||
"gitlink.org.cn/cloudream/common/utils/serder"
|
||||
exemq "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq/executor"
|
||||
"io"
|
||||
"strings"
|
||||
"time"
|
||||
)
|
||||
|
||||
type response[T any] struct {
|
||||
Code string `json:"code"`
|
||||
Msg string `json:"msg"`
|
||||
Data T `json:"data"`
|
||||
}
|
||||
|
||||
type sugonToken struct {
|
||||
ClusterId string `json:"clusterId"`
|
||||
ClusterName string `json:"clusterName"`
|
||||
Token string `json:"token"`
|
||||
}
|
||||
|
||||
type sugonTokenResp struct {
|
||||
Code string `json:"code"`
|
||||
Msg string `json:"msg"`
|
||||
Data []sugonToken `json:"data"`
|
||||
}
|
||||
|
||||
func getToken(authConfigs map[string]interface{}) (string, error) {
|
||||
|
||||
header := make(map[string]string)
|
||||
header["User"] = authConfigs["user"].(string)
|
||||
header["Password"] = authConfigs["password"].(string)
|
||||
header["Orgid"] = authConfigs["orgid"].(string)
|
||||
|
||||
resp, err := myhttp.PostJSON(authConfigs["get_token_url"].(string), myhttp.RequestParam{
|
||||
Header: header,
|
||||
})
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
contType := resp.Header.Get("Content-Type")
|
||||
if strings.Contains(contType, myhttp.ContentTypeJSON) {
|
||||
var codeResp sugonTokenResp
|
||||
if err := serder.JSONToObjectStream(resp.Body, &codeResp); err != nil {
|
||||
return "", fmt.Errorf("parsing response: %w", err)
|
||||
}
|
||||
|
||||
for i := 0; i < len(codeResp.Data); i++ {
|
||||
data := codeResp.Data[i]
|
||||
if data.ClusterName == authConfigs["clusterName"] {
|
||||
return data.Token, nil
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return "", fmt.Errorf("there is no token")
|
||||
}
|
||||
|
||||
type sugonUrlResp struct {
|
||||
Name string `json:"name"`
|
||||
AiUrls []sugonUrl `json:"aiUrls"`
|
||||
}
|
||||
|
||||
type sugonUrl struct {
|
||||
Url string `json:"url"`
|
||||
}
|
||||
|
||||
func getUrl(token string, url string) (string, error) {
|
||||
header := make(map[string]string)
|
||||
header["Token"] = token
|
||||
resp, err := myhttp.GetForm(url, myhttp.RequestParam{
|
||||
Header: header,
|
||||
})
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
// 读取并打印原始响应
|
||||
bodyBytes, err := io.ReadAll(resp.Body)
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("reading response body: %w", err)
|
||||
}
|
||||
|
||||
contType := resp.Header.Get("Content-Type")
|
||||
if strings.Contains(contType, myhttp.ContentTypeJSON) {
|
||||
var codeResp response[sugonUrlResp]
|
||||
if err := serder.JSONToObject(bodyBytes, &codeResp); err != nil {
|
||||
return "", fmt.Errorf("parsing response: %w", err)
|
||||
}
|
||||
|
||||
if len(codeResp.Data.AiUrls) == 0 {
|
||||
return "", fmt.Errorf("there is no url")
|
||||
}
|
||||
|
||||
return codeResp.Data.AiUrls[0].Url, nil
|
||||
}
|
||||
|
||||
return "", fmt.Errorf("there is no token")
|
||||
}
|
||||
|
||||
type SugonCloud struct{}
|
||||
|
||||
var ecsConfig map[string]interface{}
|
||||
var authConfig map[string]interface{}
|
||||
var client exemq.HttpClient
|
||||
|
||||
func SugonCloudConfig(authConfigs map[string]interface{}, ecsConfigs map[string]interface{}) {
|
||||
ecsConfig = ecsConfigs
|
||||
token, err := getToken(authConfigs)
|
||||
if err != nil {
|
||||
logger.Error(err.Error())
|
||||
return
|
||||
}
|
||||
url, err := getUrl(token, "https://ac.sugon.com/ac/openapi/v2/center")
|
||||
httpPool := exemq.NewHttpPool(&exemq.Config{})
|
||||
c, err := httpPool.AcquireByUrl(url)
|
||||
if err != nil {
|
||||
logger.Error(err.Error())
|
||||
return
|
||||
}
|
||||
client = *c
|
||||
}
|
||||
|
||||
func (s SugonCloud) CreateServer() (string, string, error) {
|
||||
instanceServiceName := "auto_instance_" + time.Now().Format("20060102150405")
|
||||
ecsConfig["instanceServiceName"] = instanceServiceName
|
||||
// 获取token
|
||||
token, err := getToken(authConfig)
|
||||
if err != nil {
|
||||
logger.Error(err.Error())
|
||||
return "", "", err
|
||||
}
|
||||
// 创建实例
|
||||
_, err = client.CreateSugonInstance(token, ecsConfig)
|
||||
if err != nil {
|
||||
return "", "", err
|
||||
}
|
||||
// 获取实例ID
|
||||
instanceID, err := client.GetInstanceID(token, instanceServiceName)
|
||||
if err != nil {
|
||||
return "", "", err
|
||||
}
|
||||
// 获取实例url
|
||||
url, err := client.GetInstanceUrl(token, instanceID)
|
||||
|
||||
logger.Info("create ecs success, instanceID: " + instanceID + " url: " + url)
|
||||
|
||||
return instanceID, url, nil
|
||||
}
|
||||
|
||||
func (s SugonCloud) RunCommand(commands []string, instanceID string, timeout int) (string, error) {
|
||||
// 获取token
|
||||
token, err := getToken(authConfig)
|
||||
if err != nil {
|
||||
logger.Error(err.Error())
|
||||
return "", err
|
||||
}
|
||||
for i := 0; i < len(commands); i++ {
|
||||
_, err := client.RunCommand(token, instanceID, commands[i])
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
}
|
||||
return "", err
|
||||
}
|
||||
|
||||
func (s SugonCloud) DeleteInstance(instanceID string) (string, error) {
|
||||
// 获取token
|
||||
token, err := getToken(authConfig)
|
||||
if err != nil {
|
||||
logger.Error(err.Error())
|
||||
return "", err
|
||||
}
|
||||
instance, err := client.OperateSugonInstance(token, instanceID, schsdk.DestroyECS)
|
||||
return instance, err
|
||||
}
|
||||
|
||||
func (s SugonCloud) StopInstance(instanceID string) (string, error) {
|
||||
// 获取token
|
||||
token, err := getToken(authConfig)
|
||||
if err != nil {
|
||||
logger.Error(err.Error())
|
||||
return "", err
|
||||
}
|
||||
instance, err := client.OperateSugonInstance(token, instanceID, schsdk.PauseECS)
|
||||
return instance, err
|
||||
}
|
||||
|
||||
func (s SugonCloud) RebootInstances(instanceID string) (string, error) {
|
||||
//TODO implement me
|
||||
panic("implement me")
|
||||
}
|
||||
|
||||
func (s SugonCloud) StartInstances(instanceID string) (string, error) {
|
||||
// 获取token
|
||||
token, err := getToken(authConfig)
|
||||
if err != nil {
|
||||
logger.Error(err.Error())
|
||||
return "", err
|
||||
}
|
||||
instance, err := client.OperateSugonInstance(token, instanceID, schsdk.RunECS)
|
||||
return instance, err
|
||||
}
|
|
@ -3,10 +3,14 @@ package task
|
|||
import (
|
||||
"gitlink.org.cn/cloudream/common/pkgs/logger"
|
||||
schsdk "gitlink.org.cn/cloudream/common/sdks/scheduler"
|
||||
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
|
||||
schglb "gitlink.org.cn/cloudream/scheduler/common/globals"
|
||||
schmod "gitlink.org.cn/cloudream/scheduler/common/models"
|
||||
"gitlink.org.cn/cloudream/scheduler/common/pkgs/mq/executor"
|
||||
exectsk "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq/executor/task"
|
||||
"gitlink.org.cn/cloudream/scheduler/common/utils"
|
||||
"gitlink.org.cn/cloudream/scheduler/executor/internal/config"
|
||||
"strconv"
|
||||
"strings"
|
||||
|
||||
//"gitlink.org.cn/cloudream/scheduler/executor/internal/config"
|
||||
|
@ -39,55 +43,51 @@ func (t *ScheduleCreateECS) Execute(task *Task, ctx TaskContext) {
|
|||
}
|
||||
|
||||
var count = 1
|
||||
var instanceID string
|
||||
var ecsIP string
|
||||
|
||||
//var instanceID string
|
||||
//var ecsIP string
|
||||
|
||||
func (t *ScheduleCreateECS) do(task *Task, ctx TaskContext) error {
|
||||
// 创建云主机
|
||||
factory := create_ecs.GetFactory(config.CloudName)
|
||||
provider := factory.CreateProvider()
|
||||
instanceID, ecsIP, err := provider.CreateServer()
|
||||
|
||||
var commands []string
|
||||
commandContent := "sed -i '/SCH_DATA_IN/d' ~/.bashrc && echo 'export SCH_DATA_IN=" + strings.Replace(t.Envs[0].Value, "\\", "/", -1) + "' >> ~/.bashrc"
|
||||
commands = append(commands, commandContent)
|
||||
commandContent = "sed -i '/SCH_DATA_OUT/d' ~/.bashrc && echo 'export SCH_DATA_OUT=" + strings.Replace(t.Envs[1].Value, "\\", "/", -1) + "' >> ~/.bashrc"
|
||||
commands = append(commands, commandContent)
|
||||
logger.Info("create ECS success, instance id: " + instanceID + ", address: " + ecsIP)
|
||||
|
||||
// 曙光服务器需要将访问路径放入到环境变量中
|
||||
if config.CloudName == schmod.SugonCloud {
|
||||
segments := strings.Split(strings.Trim(ecsIP, "/"), "/")
|
||||
if len(segments) > 0 {
|
||||
t.Envs = append(t.Envs, schsdk.KVPair{Key: schsdk.AccessPath, Value: segments[len(segments)-1]})
|
||||
}
|
||||
}
|
||||
|
||||
// 设置环境变量
|
||||
commands := utils.ConvertEnvsToCommand(t.Envs)
|
||||
|
||||
switch t.ObjectStorage.MountType {
|
||||
case schsdk.RcloneMount:
|
||||
rcloneCommands := getRcloneCommands(t.ModelResource, t.ObjectStorage, t.UserID)
|
||||
commands = append(commands, rcloneCommands...)
|
||||
case schsdk.Mounted:
|
||||
commandContent := "sudo sh @startScript@ > /opt/startup.log"
|
||||
commandContent = strings.Replace(commandContent, "@startScript@", t.ModelResource.StartShellPath, -1)
|
||||
commands = append(commands, commandContent)
|
||||
}
|
||||
|
||||
arr := utils.SplitCommands(t.Command)
|
||||
commands = append(commands, arr...)
|
||||
|
||||
// 创建云主机
|
||||
factory := create_ecs.GetFactory(config.CloudName)
|
||||
provider := factory.CreateProvider()
|
||||
|
||||
instanceID := "i-bp1cict40j45l6va7m8m"
|
||||
ecsIP := "121.196.200.69"
|
||||
//err := error(nil)
|
||||
//if count == 2 {
|
||||
// instanceID = "i-bp1dokchih168087wqck"
|
||||
// ecsIP = "118.31.50.157"
|
||||
//}
|
||||
//count = 2
|
||||
|
||||
println(len(commands))
|
||||
//println(err)
|
||||
|
||||
//if count == 1 {
|
||||
// instanceID, ecsIP, err = provider.CreateServer()
|
||||
// if err != nil {
|
||||
// task.SendStatus(exectsk.NewScheduleCreateECSStatus("", "", err.Error()))
|
||||
// return err
|
||||
// }
|
||||
// count = 2
|
||||
//}
|
||||
logger.Info("create ECS success, instance id: " + instanceID + ", ip: " + ecsIP)
|
||||
|
||||
//_, err = provider.RunCommand(commands, instanceID, 2000)
|
||||
//if err != nil {
|
||||
// task.SendStatus(exectsk.NewScheduleCreateECSStatus("", "", err.Error()))
|
||||
// return err
|
||||
//}
|
||||
_, err = provider.RunCommand(commands, instanceID, 2000)
|
||||
if err != nil {
|
||||
task.SendStatus(exectsk.NewScheduleCreateECSStatus("", "", err.Error()))
|
||||
return err
|
||||
}
|
||||
|
||||
// 返回执行结果
|
||||
task.SendStatus(exectsk.NewScheduleCreateECSStatus("http://"+ecsIP+":5013/chat", schsdk.CreateECS, ""))
|
||||
println("create ECS success, waiting msg...")
|
||||
println("create ECS success!")
|
||||
|
||||
// 监听更新操作
|
||||
for {
|
||||
|
@ -126,7 +126,7 @@ func (t *ScheduleCreateECS) do(task *Task, ctx TaskContext) error {
|
|||
//}
|
||||
task.SendStatus(exectsk.NewScheduleCreateECSStatus("", schsdk.DestroyECS, ""))
|
||||
case schsdk.OperateServer:
|
||||
//executeCommands(provider, instanceID, task, info.Command)
|
||||
executeCommands(provider, instanceID, task, info.Runtime)
|
||||
default:
|
||||
//executeCommands(provider, instanceID, task, info.Command)
|
||||
}
|
||||
|
@ -134,8 +134,48 @@ func (t *ScheduleCreateECS) do(task *Task, ctx TaskContext) error {
|
|||
}
|
||||
}
|
||||
|
||||
func executeCommands(provider create_ecs.CloudProvider, instanceID string, task *Task, command string) {
|
||||
commands := utils.SplitCommands(command)
|
||||
func getRcloneCommands(resource schmod.ModelResource, storage schmod.ObjectStorage, userID cdssdk.UserID) []string {
|
||||
var commands []string
|
||||
|
||||
// 下载Rclone
|
||||
commandContent := "yum install -y fuse3"
|
||||
commands = append(commands, commandContent)
|
||||
commandContent = "cd /opt && downloadCode='import requests;response=requests.get(\"@url@\",stream=True);response.raise_for_status();boundary=response.headers.get(\"Content-Type\").split(\"boundary=\")[-1].encode();content=response.content;body=[part.split(b\"\\r\\n\\r\\n\",1)[1].rsplit(b\"\\r\\n--\",1)[0] for part in content.split(b\"--\"+boundary+b\"\\r\\n\") if b\"filename=\" in part][0];open(\"@filename@\",\"wb\").write(body);print(\"success\")' && rclone=\"$cds_url/object/download?userID=$userID&objectID=$rcloneID\" && python3 -c \"$(echo \"$downloadCode\" | sed -e \"s|@url@|$(printf '%s' \"$rclone\" | sed 's/[&/\\]/\\\\&/g')|\" -e \"s|@filename@|rclone|\")\" && chmod +x rclone"
|
||||
commandContent = strings.Replace(commandContent, "$cds_url", schglb.CloudreamStorageConfig.URL, -1)
|
||||
commandContent = strings.Replace(commandContent, "$rcloneID", schglb.CDSRclone.CDSRcloneID, -1)
|
||||
commandContent = strings.Replace(commandContent, "$userID", strconv.FormatInt(int64(userID), 10), -1)
|
||||
commands = append(commands, commandContent)
|
||||
|
||||
// 生成Rclone配置文件
|
||||
commandContent = "echo -e '[@tagName@] \n type = s3 \n provider = @provider@ \n access_key_id = @ak@ \n secret_access_key = @sk@ \n endpoint = @endpoint@ \n storage_class = STANDARD' > /opt/rclone.conf"
|
||||
tagName := storage.Bucket + "_" + storage.AK
|
||||
commandContent = strings.Replace(commandContent, "@tagName@", tagName, -1)
|
||||
commandContent = strings.Replace(commandContent, "@provider@", storage.Manufacturer, -1)
|
||||
commandContent = strings.Replace(commandContent, "@ak@", storage.AK, -1)
|
||||
commandContent = strings.Replace(commandContent, "@sk@", storage.SK, -1)
|
||||
commandContent = strings.Replace(commandContent, "@endpoint@", storage.Endpoint, -1)
|
||||
commands = append(commands, commandContent)
|
||||
|
||||
// 挂载Rclone
|
||||
mountDir := "/mnt/oss"
|
||||
commandContent = "mkdir -p @mountDir@ && cd /opt && nohup ./rclone mount @tagName@:@bucket@ @mountDir@ --vfs-cache-mode full --vfs-read-wait 0 --vfs-read-chunk-size 128M --cache-db-purge -vv > rcloneMount.log 2>&1 &"
|
||||
commandContent = strings.Replace(commandContent, "@tagName@", tagName, -1)
|
||||
commandContent = strings.Replace(commandContent, "@bucket@", storage.Bucket, -1)
|
||||
commandContent = strings.Replace(commandContent, "@mountDir@", mountDir, -1)
|
||||
commands = append(commands, commandContent)
|
||||
|
||||
// 执行启动脚本
|
||||
startScript := mountDir + "/" + resource.StartShellPath
|
||||
commandContent = "sudo sh @startScript@ > /opt/startup.log"
|
||||
commandContent = strings.Replace(commandContent, "@startScript@", startScript, -1)
|
||||
commands = append(commands, commandContent)
|
||||
|
||||
return commands
|
||||
}
|
||||
|
||||
func executeCommands(provider create_ecs.CloudProvider, instanceID string, task *Task, runtime schsdk.JobRuntimeInfo) {
|
||||
commands := utils.ConvertEnvsToCommand(runtime.Envs)
|
||||
commands = append(commands, utils.SplitCommands(runtime.Command)...)
|
||||
|
||||
_, err := provider.RunCommand(commands, instanceID, 2000)
|
||||
if err != nil {
|
||||
|
|
|
@ -4,7 +4,6 @@ import (
|
|||
"fmt"
|
||||
"gitlink.org.cn/cloudream/common/pkgs/logger"
|
||||
schglb "gitlink.org.cn/cloudream/scheduler/common/globals"
|
||||
execmq "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq/executor"
|
||||
"gitlink.org.cn/cloudream/scheduler/executor/internal/config"
|
||||
myglbs "gitlink.org.cn/cloudream/scheduler/executor/internal/globals"
|
||||
"gitlink.org.cn/cloudream/scheduler/executor/internal/http"
|
||||
|
@ -31,7 +30,7 @@ func main() {
|
|||
schglb.InitPCMPool(&config.Cfg().PCM)
|
||||
config.InitCloud(config.Cfg().CloudECS)
|
||||
|
||||
myglbs.Init(config.Cfg().Application.ExecutorID)
|
||||
myglbs.InitExecutorID(config.Cfg().Application.ExecutorID)
|
||||
schglb.InitRcloneConfig(config.Cfg().Rclone.CDSRcloneID, config.Cfg().Rclone.CDSRcloneConfigID)
|
||||
|
||||
//rpter := reporter.NewReporter(myglbs.ExecutorID, time.Second*time.Duration(config.Cfg().ReportIntervalSec))
|
||||
|
@ -68,25 +67,3 @@ func main() {
|
|||
forever := make(chan bool)
|
||||
<-forever
|
||||
}
|
||||
|
||||
func serveMQServer(server *execmq.Server) {
|
||||
logger.Info("start serving mq server")
|
||||
|
||||
err := server.Serve()
|
||||
if err != nil {
|
||||
logger.Errorf("mq server stopped with error: %s", err.Error())
|
||||
}
|
||||
|
||||
logger.Info("mq server stopped")
|
||||
}
|
||||
|
||||
//func serveReporter(rpt *reporter.Reporter) {
|
||||
// logger.Info("start serving reporter")
|
||||
//
|
||||
// err := rpt.Serve()
|
||||
// if err != nil {
|
||||
// logger.Errorf("rpt stopped with error: %s", err.Error())
|
||||
// }
|
||||
//
|
||||
// logger.Info("rpt stopped")
|
||||
//}
|
||||
|
|
|
@ -1,11 +1,14 @@
|
|||
package event
|
||||
|
||||
import "gitlink.org.cn/cloudream/common/pkgs/future"
|
||||
import (
|
||||
"gitlink.org.cn/cloudream/common/pkgs/future"
|
||||
schsdk "gitlink.org.cn/cloudream/common/sdks/scheduler"
|
||||
)
|
||||
|
||||
type JobUpdateFuture = *future.SetValueFuture[UpdateResult]
|
||||
|
||||
type Update struct {
|
||||
Command string
|
||||
Runtime schsdk.JobRuntimeInfo
|
||||
Operate string
|
||||
Result JobUpdateFuture
|
||||
}
|
||||
|
@ -16,9 +19,9 @@ type UpdateResult struct {
|
|||
Err error
|
||||
}
|
||||
|
||||
func NewUpdate(command string, operate string, jobUpdateFuture JobUpdateFuture) *Update {
|
||||
func NewUpdate(runTime schsdk.JobRuntimeInfo, operate string, jobUpdateFuture JobUpdateFuture) *Update {
|
||||
return &Update{
|
||||
Command: command,
|
||||
Runtime: runTime,
|
||||
Operate: operate,
|
||||
Result: jobUpdateFuture,
|
||||
}
|
||||
|
|
|
@ -178,14 +178,13 @@ func (s *NormalJobExecuting) submitInstanceTask(rtx jobmgr.JobStateRunContext, j
|
|||
logger.Error(err.Error())
|
||||
return fmt.Errorf("getting object storage info: %w", err)
|
||||
}
|
||||
//commands := getRcloneMountCommand(objectStorage)
|
||||
println(objectStorage.CDSStorageID)
|
||||
// 发送扩容任务
|
||||
ecs := exetsk.NewScheduleCreateECS(
|
||||
userID,
|
||||
runningJob.Files.Dataset.PackageID,
|
||||
runningJob.Info.Runtime.Command+"\\n"+modelJobInfo.Command,
|
||||
modelInfo.StartShellPath,
|
||||
objectStorage,
|
||||
modelInfo,
|
||||
envs,
|
||||
)
|
||||
task, err := rtx.Mgr.ExecMgr.StartTask(ecs, ccInfo)
|
||||
|
@ -207,7 +206,7 @@ func (s *NormalJobExecuting) submitInstanceTask(rtx jobmgr.JobStateRunContext, j
|
|||
return fmt.Errorf("getting executor client: %w", err)
|
||||
}
|
||||
evt := v1.Value.(*event.Update)
|
||||
operateResp, err := client.OperateTask(executor.NewTaskOperateInfo(task.ID(), evt.Operate, evt.Command))
|
||||
operateResp, err := client.OperateTask(executor.NewTaskOperateInfo(task.ID(), evt.Operate, evt.Runtime))
|
||||
if err != nil {
|
||||
return fmt.Errorf("operate task: %w", err)
|
||||
}
|
||||
|
@ -265,10 +264,6 @@ func (s *NormalJobExecuting) submitInstanceTask(rtx jobmgr.JobStateRunContext, j
|
|||
}
|
||||
}
|
||||
|
||||
//func getRcloneMountCommand(storage schmod.ObjectStorage) interface{} {
|
||||
// command := "yum install -y fuse3 \\\\n cd /opt && downloadCode='import requests;response=requests.get(\\\"@url@\\\",stream=True);response.raise_for_status();boundary=response.headers.get(\\\"Content-Type\\\").split(\\\"boundary=\\\")[-1].encode();content=response.content;body=[part.split(b\\\"\\\\r\\\\n\\\\r\\\\n\\\",1)[1].rsplit(b\\\"\\\\r\\\\n--\\\",1)[0] for part in content.split(b\\\"--\\\"+boundary+b\\\"\\\\r\\\\n\\\") if b\\\"filename=\\\" in part][0];open(\\\"@filename@\\\",\\\"wb\\\").write(body);print(\\\"success\\\")' && rclone=\\\"http://121.36.5.116:7890/object/download?userID=1&objectID=9471093\\\" && rclone_conf=\\\"http://121.36.5.116:7890/object/download?userID=1&objectID=9472529\\\" && python3 -c \\\"$(echo \\\"$downloadCode\\\" | sed -e \\\"s|@url@|$(printf '%s' \\\"$rclone\\\" | sed 's/[&/\\\\]/\\\\\\\\&/g')|\\\" -e \\\"s|@filename@|rclone|\\\")\\\" && chmod +x rclone \\\\n echo -e '[oss] \\n type = s3 \\n provider = Alibaba \\n access_key_id = LTAI5tSgnMDd4fxZ1X19Yche \\n secret_access_key = tfxjJ2IJ0J4ihFFD3VUEe1SA9Ws36y \\n endpoint = oss-cn-hangzhou-internal.aliyuncs.com \\n storage_class = STANDARD' > /opt/rclone.conf \\\\n mkdir -p /mnt/oss && cd /opt && nohup ./rclone mount oss:pcm-hz /mnt/oss --vfs-cache-mode full --vfs-read-wait 0 --vfs-read-chunk-size 128M --cache-db-purge -vv > rcloneOSS.log 2>&1 & \\\\n sudo sh /mnt/oss/script/setup_model_env.sh > /opt/set_model.log \\\\n sudo sh /mnt/oss/script/copy_model.sh > /opt/copy_model.log"
|
||||
//}
|
||||
|
||||
// 判断算力中心是否支持环境变量配置,如果不支持,则读取脚本内容并拼接在Command参数后面
|
||||
func getRuntimeCommand(runtime schsdk.JobRuntimeInfo, packageID cdssdk.PackageID, outputPath string, remoteBase string, ccInfo schmod.ComputingCenter) (string, []schsdk.KVPair) {
|
||||
var envs []schsdk.KVPair
|
||||
|
|
|
@ -58,6 +58,7 @@ func (s *MultiInstanceRunning) do(rtx jobmgr.JobStateRunContext, jo *jobmgr.Job)
|
|||
// 微调任务特殊处理
|
||||
if info.Info.UpdateType == schsdk.FineTuning {
|
||||
multInstJob.Info.ModelJobInfo.Command = info.Info.Runtime.Command
|
||||
multInstJob.Info.Runtime.Envs = append(multInstJob.Info.Runtime.Envs, info.Info.Runtime.Envs...)
|
||||
subJobs = multInstJob.SubJobs
|
||||
}
|
||||
updateInstance(rtx, info, subJobs, instanceFuture)
|
||||
|
@ -78,7 +79,7 @@ func updateInstance(rtx jobmgr.JobStateRunContext, updateInfo *event.InstanceUpd
|
|||
go func() {
|
||||
defer wg.Done()
|
||||
fut := future.NewSetValue[event.UpdateResult]()
|
||||
rtx.Mgr.PostEvent(instanceID, event.NewUpdate(updateInfo.Info.Runtime.Command, updateInfo.Info.Operate, fut))
|
||||
rtx.Mgr.PostEvent(instanceID, event.NewUpdate(updateInfo.Info.Runtime, updateInfo.Info.Operate, fut))
|
||||
_, err := fut.Wait(context.TODO())
|
||||
|
||||
if err != nil {
|
||||
|
|
|
@ -12,7 +12,6 @@ import (
|
|||
"gitlink.org.cn/cloudream/scheduler/manager/internal/jobmgr"
|
||||
"gitlink.org.cn/cloudream/scheduler/manager/internal/jobmgr/event"
|
||||
"gitlink.org.cn/cloudream/scheduler/manager/internal/jobmgr/job"
|
||||
"strings"
|
||||
)
|
||||
|
||||
type MultiInstanceUpdate struct {
|
||||
|
@ -93,7 +92,8 @@ func (s *MultiInstanceUpdate) do(rtx jobmgr.JobStateRunContext, jo *jobmgr.Job)
|
|||
}
|
||||
|
||||
// 发送事件,更新各个instance
|
||||
updateJob.Info.Runtime.Command = strings.Replace(updateJob.Info.Runtime.Command, "$1", fullPath, -1)
|
||||
//updateJob.Info.Runtime.Command = strings.Replace(updateJob.Info.Runtime.Command, "$1", fullPath, -1)
|
||||
updateJob.Info.Runtime.Envs = append(updateJob.Info.Runtime.Envs, schsdk.KVPair{Key: schsdk.FinetuningOutEnv, Value: fullPath})
|
||||
updateInfo := event.InstanceUpdateInfo{
|
||||
Info: updateJob.Info,
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue