将executor改造成restful

This commit is contained in:
JeshuaRen 2024-07-17 14:19:50 +08:00
parent 5209146cf1
commit 5f1e789640
27 changed files with 655 additions and 59 deletions

View File

@ -1,4 +1,7 @@
{
"listenAddr": {
"address": ":7893"
},
"logger": {
"output": "stdout",
"outputFileName": "executor",
@ -17,5 +20,16 @@
"pcm": {
"url": "http://localhost:7070"
},
"reportIntervalSec": 10
"reportIntervalSec": 10,
"createECS": {
"cloud": "ali",
"config": {
"region": "cn-hangzhou",
"accessKeyId": "LTAI4FhxwzJ2zjq2YQJq7X5V",
"accessKeySecret": "2YQJq7X5V",
"instanceType": "ecs.g5.large",
"imageId": "ubuntu_18_04_64_20G_alibase_20190624.vhd",
"securityGroupId": "sg-bp1f2g2k3jyj5jq2hq5a"
}
}
}

View File

@ -31,6 +31,10 @@ type ComputingCenter struct {
Name string `json:"name" db:"Name"`
// 任务启动方式
Bootstrap schsdk.Bootstrap `json:"bootstrap" db:"Bootstrap"`
// 执行器ID
ExecutorID string `json:"executorID" db:"executorID"`
// 执行器URL
ExecutorURL string `json:"executorURL" db:"executorURL"`
}
type Image struct {

View File

@ -16,7 +16,7 @@ func (db *DB) ComputingCenter() *ComputingCenterDB {
func (*ComputingCenterDB) GetByID(ctx SQLContext, id schsdk.CCID) (schmod.ComputingCenter, error) {
var ret TempComputingCenter
err := sqlx.Get(ctx, &ret, "select * from ComputingCenter where CCID = ?", id)
err := sqlx.Get(ctx, &ret, "select cc.*, ei.executorURL from (select * from ComputingCenter where CCID = ?) as cc left join (select * from ExecutorInfo) as ei on cc.executorID = ei.executorID", id)
return ret.ToComputingCenter(), err
}

View File

@ -0,0 +1,5 @@
package executor
type Config struct {
URL string `json:"url"`
}

View File

@ -0,0 +1,53 @@
package executor
import (
"gitlink.org.cn/cloudream/common/sdks"
)
type response[T any] struct {
Code string `json:"code"`
Message string `json:"message"`
Data T `json:"data"`
}
func (r *response[T]) ToError() *sdks.CodeMessageError {
return &sdks.CodeMessageError{
Code: r.Code,
Message: r.Message,
}
}
type HttpClient struct {
baseURL string
}
func NewHttpClient(cfg *Config) *HttpClient {
return &HttpClient{
baseURL: cfg.URL,
}
}
type HttpPool interface {
AcquireByUrl(url string) (*HttpClient, error)
//Release(cli *Client)
}
type httppool struct {
cfg *Config
}
func NewHttpPool(cfg *Config) HttpPool {
return &httppool{
cfg: cfg,
}
}
func (p *httppool) AcquireByUrl(url string) (*HttpClient, error) {
p.cfg.URL = url
cli := NewHttpClient(p.cfg)
return cli, nil
}
//func (p *pool) Release(cli *Client) {
//
//}

View File

@ -1,7 +1,14 @@
package executor
import (
"fmt"
"gitlink.org.cn/cloudream/common/consts/errorcode"
"gitlink.org.cn/cloudream/common/pkgs/mq"
myhttp "gitlink.org.cn/cloudream/common/utils/http"
"gitlink.org.cn/cloudream/common/utils/serder"
"net/http"
"net/url"
"strings"
schmod "gitlink.org.cn/cloudream/scheduler/common/models"
exectsk "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq/executor/task"
@ -38,3 +45,49 @@ func NewStartTaskResp(execID schmod.ExecutorID, taskID string) *StartTaskResp {
func (c *Client) StartTask(msg *StartTask, opts ...mq.RequestOption) (*StartTaskResp, error) {
return mq.Request(Service.StartTask, c.rabbitCli, msg, opts...)
}
func (c *HttpClient) SubmitTask(req *StartTask) (*StartTaskResp, error) {
targetURL, err := url.JoinPath(c.baseURL + "/submitTask")
if err != nil {
return nil, err
}
resp, err := myhttp.PostJSON(targetURL, myhttp.RequestParam{
Body: req,
})
if err != nil {
return nil, err
}
contType := resp.Header.Get("Content-Type")
if strings.Contains(contType, myhttp.ContentTypeJSON) {
var codeResp response[StartTaskResp]
if err := serder.JSONToObjectStream(resp.Body, &codeResp); err != nil {
return nil, fmt.Errorf("parsing response: %w", err)
}
if codeResp.Code == errorcode.OK {
return &codeResp.Data, nil
}
return nil, codeResp.ToError()
}
return nil, fmt.Errorf("unknow response content type: %s", contType)
}
func (c *HttpClient) GetReportInfo() (*http.Response, error) {
targetURL, err := url.JoinPath(c.baseURL + "/getReportInfo")
if err != nil {
return nil, err
}
resp, err := myhttp.GetJSON(targetURL, myhttp.RequestParam{
Body: nil,
})
if err != nil {
return nil, err
}
return resp, nil
}

View File

@ -0,0 +1,21 @@
package task
type ScheduleCreateECS struct {
TaskInfoBase
}
type ScheduleCreateECSStatus struct {
TaskInfoBase
}
func NewScheduleCreateECS() *ScheduleCreateECS {
return &ScheduleCreateECS{}
}
func NewScheduleCreateECSStatus() *ScheduleCreateECSStatus {
return &ScheduleCreateECSStatus{}
}
func init() {
Register[*ScheduleCreateECS, *ScheduleCreateECSStatus]()
}

View File

@ -32,3 +32,48 @@ func GenerateRandomID() string {
hashedID := hex.EncodeToString(hashBytes)
return hashedID
}
// 将嵌套的 map 处理成查询字符串的递归函数
func ParseMapToStrings(config map[string]interface{}, prefix string) []string {
var queryStrings []string
for key, value := range config {
fullKey := key
if prefix != "" {
fullKey = prefix + "." + key
}
switch v := value.(type) {
case string:
queryStrings = append(queryStrings, fmt.Sprintf("%s=%s", fullKey, v))
case int, float64:
queryStrings = append(queryStrings, fmt.Sprintf("%s=%v", fullKey, v))
case bool:
queryStrings = append(queryStrings, fmt.Sprintf("%s=%t", fullKey, v))
case map[string]interface{}:
// 递归处理嵌套的 map
queryStrings = append(queryStrings, ParseMapToStrings(v, fullKey)...)
case []interface{}:
// 处理数组
for i, item := range v {
// 数组的键以索引为后缀
itemKey := fmt.Sprintf("%s[%d]", fullKey, i)
switch item := item.(type) {
case string:
queryStrings = append(queryStrings, fmt.Sprintf("%s=%s", itemKey, item))
case int, float64:
queryStrings = append(queryStrings, fmt.Sprintf("%s=%v", itemKey, item))
case bool:
queryStrings = append(queryStrings, fmt.Sprintf("%s=%t", itemKey, item))
case map[string]interface{}:
// 递归处理嵌套的 map
queryStrings = append(queryStrings, ParseMapToStrings(item, itemKey)...)
default:
fmt.Println("Unsupported config array item type:", item)
}
}
default:
fmt.Println("Unsupported config value type:", v)
}
}
return queryStrings
}

View File

@ -0,0 +1,38 @@
package config
import (
"fmt"
"gitlink.org.cn/cloudream/scheduler/executor/internal/task/create_ecs"
)
func InitCloud(configMap map[string]interface{}) {
// Extract the createECS section
createECS, ok := configMap["createECS"].(map[string]interface{})
if !ok {
fmt.Println("Invalid JSON structure: createECS section is missing or malformed")
return
}
// Extract the cloud type
cloud, ok := createECS["cloud"].(string)
if !ok {
fmt.Println("Invalid JSON structure: cloud field is missing or not a string")
return
}
// Extract the config section
config, ok := createECS["config"].(map[string]interface{})
if !ok {
fmt.Println("Invalid JSON structure: config section is missing or malformed")
return
}
// Check the cloud type and generate query string accordingly
switch cloud {
case "ali":
create_ecs.AliConfig(config)
default:
fmt.Println("Unsupported cloud type:", cloud)
return
}
}

View File

@ -8,12 +8,18 @@ import (
mymq "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq"
)
type ListenAddr struct {
Address string `json:"address"`
}
type Config struct {
Logger log.Config `json:"logger"`
ReportIntervalSec int `json:"reportIntervalSec"`
RabbitMQ mymq.Config `json:"rabbitMQ"`
CloudreamStorage cdssdk.Config `json:"cloudreamStorage"`
PCM pcmsdk.Config `json:"pcm"`
Logger log.Config `json:"logger"`
ReportIntervalSec int `json:"reportIntervalSec"`
RabbitMQ mymq.Config `json:"rabbitMQ"`
CloudreamStorage cdssdk.Config `json:"cloudreamStorage"`
PCM pcmsdk.Config `json:"pcm"`
Addr ListenAddr `json:"listenAddr"`
CloudECS map[string]interface{} `json:"createECS"`
}
var cfg Config

View File

@ -3,6 +3,7 @@ package globals
import (
"github.com/google/uuid"
schmod "gitlink.org.cn/cloudream/scheduler/common/models"
"gitlink.org.cn/cloudream/scheduler/common/pkgs/mq/manager"
)
var ExecutorID schmod.ExecutorID
@ -10,3 +11,6 @@ var ExecutorID schmod.ExecutorID
func Init() {
ExecutorID = schmod.ExecutorID(uuid.NewString())
}
// 全局变量定义
var EventChannel = make(chan manager.ReportExecutorTaskStatus)

View File

@ -0,0 +1,24 @@
package http
import "gitlink.org.cn/cloudream/common/consts/errorcode"
type Response struct {
Code string `json:"code"`
Message string `json:"message"`
Data any `json:"data"`
}
func OK(data any) Response {
return Response{
Code: errorcode.OK,
Message: "",
Data: data,
}
}
func Failed(code string, msg string) Response {
return Response{
Code: code,
Message: msg,
}
}

View File

@ -0,0 +1,45 @@
package http
import (
"github.com/gin-gonic/gin"
"gitlink.org.cn/cloudream/common/pkgs/logger"
"gitlink.org.cn/cloudream/scheduler/executor/internal/reporter"
"gitlink.org.cn/cloudream/scheduler/executor/internal/services"
)
type Server struct {
engine *gin.Engine
listenAddr string
svc *services.Service
reporter *reporter.Reporter
}
func NewServer(listenAddr string, svc *services.Service) (*Server, error) {
engine := gin.New()
return &Server{
engine: engine,
listenAddr: listenAddr,
svc: svc,
}, nil
}
func (s *Server) Serve() error {
s.initRouters()
logger.Infof("start serving http at: %s", s.listenAddr)
err := s.engine.Run(s.listenAddr)
if err != nil {
logger.Infof("http stopped with error: %s", err.Error())
return err
}
logger.Infof("http stopped")
return nil
}
func (s *Server) initRouters() {
s.engine.POST("/submitTask", s.TaskSvc().SubmitTask)
s.engine.GET("/getReportInfo", s.TaskSvc().GetReportInfo)
}

View File

@ -0,0 +1,64 @@
package http
import (
"encoding/json"
"github.com/gin-gonic/gin"
"gitlink.org.cn/cloudream/common/consts/errorcode"
"gitlink.org.cn/cloudream/common/pkgs/logger"
"gitlink.org.cn/cloudream/common/utils/reflect2"
execmq "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq/executor"
myglbs "gitlink.org.cn/cloudream/scheduler/executor/internal/globals"
"net/http"
)
type TaskService struct {
*Server
}
func (s *Server) TaskSvc() *TaskService {
return &TaskService{Server: s}
}
func (s *TaskService) SubmitTask(ctx *gin.Context) {
log := logger.WithField("HTTP", "JobSet.UploadPackage")
var req *execmq.StartTask
if err := ctx.ShouldBindJSON(&req); err != nil {
log.Warnf("binding body: %s", err.Error())
ctx.JSON(http.StatusBadRequest, Failed(errorcode.BadArgument, "missing argument or invalid argument"))
return
}
tsk, err := s.svc.SubmitTask(req)
if err != nil {
logger.WithField("Info", reflect2.TypeOfValue(req.Info).Name()).
Warnf("starting task by info: %s", err.Error())
ctx.JSON(http.StatusInternalServerError, Failed("500", "internal error"))
return
}
ctx.JSON(http.StatusOK, OK(execmq.NewStartTaskResp(myglbs.ExecutorID, tsk.ID())))
}
func (s *TaskService) GetReportInfo(ctx *gin.Context) {
//log := logger.WithField("HTTP", "JobSet.UploadPackage")
ctx.Header("Content-Type", "text/event-stream")
ctx.Header("Cache-Control", "no-cache")
ctx.Header("Connection", "keep-alive")
// 从管道中读取数据并实时返回
for report := range myglbs.EventChannel {
data, err := json.Marshal(report)
if err != nil {
return
}
_, err = ctx.Writer.Write(data)
if err != nil {
return
}
ctx.Writer.Flush() // 确保数据立即发送到客户端
}
//s.reporter.ReportFlow(ctx)
}

View File

@ -1,12 +1,10 @@
package reporter
import (
"fmt"
"gitlink.org.cn/cloudream/scheduler/executor/internal/globals"
"sync"
"time"
"gitlink.org.cn/cloudream/common/pkgs/logger"
schglb "gitlink.org.cn/cloudream/scheduler/common/globals"
schmod "gitlink.org.cn/cloudream/scheduler/common/models"
exectsk "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq/executor/task"
mgrmq "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq/manager"
@ -44,11 +42,11 @@ func (r *Reporter) ReportNow() {
}
func (r *Reporter) Serve() error {
magCli, err := schglb.ManagerMQPool.Acquire()
if err != nil {
return fmt.Errorf("new manager client: %w", err)
}
defer schglb.ManagerMQPool.Release(magCli)
//magCli, err := schglb.ManagerMQPool.Acquire()
//if err != nil {
// return fmt.Errorf("new manager client: %w", err)
//}
//defer schglb.ManagerMQPool.Release(magCli)
ticker := time.NewTicker(r.reportInterval)
defer ticker.Stop()
@ -68,18 +66,23 @@ func (r *Reporter) Serve() error {
r.taskStatus = make(map[string]exectsk.TaskStatus)
r.taskStatusLock.Unlock()
_, err := magCli.ReportExecutorTaskStatus(mgrmq.NewReportExecutorTaskStatus(r.executorID, taskStatus))
if err != nil {
logger.Warnf("reporting to manager: %s", err.Error())
status := mgrmq.NewReportExecutorTaskStatus(r.executorID, taskStatus)
// 将数据发送到管道中
globals.EventChannel <- *status
//若上报失败,数据应保留
r.taskStatusLock.Lock()
for _, ts := range taskStatus {
if _, exists := r.taskStatus[ts.TaskID]; !exists {
r.taskStatus[ts.TaskID] = ts.Status
}
}
r.taskStatusLock.Unlock()
}
//_, err := magCli.ReportExecutorTaskStatus(mgrmq.NewReportExecutorTaskStatus(r.executorID, taskStatus))
//if err != nil {
// logger.Warnf("reporting to manager: %s", err.Error())
//
// //若上报失败,数据应保留
// r.taskStatusLock.Lock()
// for _, ts := range taskStatus {
// if _, exists := r.taskStatus[ts.TaskID]; !exists {
// r.taskStatus[ts.TaskID] = ts.Status
// }
// }
// r.taskStatusLock.Unlock()
//}
}
}

View File

@ -8,8 +8,6 @@ type Service struct {
taskManager *task.Manager
}
func NewService(taskMgr *task.Manager) *Service {
return &Service{
taskManager: taskMgr,
}
func NewService() *Service {
return &Service{}
}

View File

@ -7,6 +7,7 @@ import (
"gitlink.org.cn/cloudream/common/utils/reflect2"
execmq "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq/executor"
myglbs "gitlink.org.cn/cloudream/scheduler/executor/internal/globals"
"gitlink.org.cn/cloudream/scheduler/executor/internal/task"
)
func (svc *Service) StartTask(msg *execmq.StartTask) (*execmq.StartTaskResp, *mq.CodeMessage) {
@ -19,3 +20,14 @@ func (svc *Service) StartTask(msg *execmq.StartTask) (*execmq.StartTaskResp, *mq
return mq.ReplyOK(execmq.NewStartTaskResp(myglbs.ExecutorID, tsk.ID()))
}
func (svc *Service) SubmitTask(msg *execmq.StartTask) (*task.Task, error) {
tsk, err := svc.taskManager.StartByInfo(msg.Info)
if err != nil {
logger.WithField("Info", reflect2.TypeOfValue(msg.Info).Name()).
Warnf("starting task by info: %s", err.Error())
return nil, err
}
return tsk, nil
}

View File

@ -0,0 +1,29 @@
package create_ecs
import (
"fmt"
"gitlink.org.cn/cloudream/scheduler/common/utils"
"strings"
)
// AliCloud实现了CloudProvider接口
type AliCloud struct{}
var targetURL string
func AliConfig(configMap map[string]interface{}) {
var queryStrings []string
queryStrings = append(queryStrings, utils.ParseMapToStrings(configMap, "")...)
// Join the query strings with '&'
queryString := strings.Join(queryStrings, "&")
targetURL = "http://ecs.aliyuncs.com/?Action=RunInstances&" + queryString
fmt.Println(targetURL)
}
func (a *AliCloud) CreateServer(name string) error {
fmt.Printf("Creating server on AliCloud: %s\n", name)
return nil
}

View File

@ -0,0 +1,41 @@
package create_ecs
// CloudProvider 是一个接口,定义了创建服务器的方法
type CloudProvider interface {
CreateServer(name string) error
}
// CloudFactory 是工厂接口
// 工厂模式中使用 CreateProvider 的设计原则是:
// 单一职责Factory 只负责创建 CloudProvider 实例CloudProvider 负责实际的服务器创建任务。
// 开闭原则Factory 可以扩展以支持新的 CloudProvider 实现,而无需修改现有代码。
// 依赖倒置原则:客户端代码依赖于 CloudProvider 接口而不是具体实现,从而减少了耦合。
type CloudFactory interface {
CreateProvider() CloudProvider
}
// HuaweiCloudFactory 实现了CloudFactory接口
type HuaweiCloudFactory struct{}
func (f *HuaweiCloudFactory) CreateProvider() CloudProvider {
return &HuaweiCloud{}
}
// AliCloudFactory 实现了CloudFactory接口
type AliCloudFactory struct{}
func (f *AliCloudFactory) CreateProvider() CloudProvider {
return &AliCloud{}
}
// GetFactory 根据云平台类型返回对应的工厂
func GetFactory(providerType string) CloudFactory {
switch providerType {
case "HuaweiCloud":
return &HuaweiCloudFactory{}
case "AliCloud":
return &AliCloudFactory{}
default:
return nil
}
}

View File

@ -0,0 +1,11 @@
package create_ecs
import "fmt"
// HuaweiCloud实现了CloudProvider接口
type HuaweiCloud struct{}
func (a *HuaweiCloud) CreateServer(name string) error {
fmt.Printf("Creating server on HuaweiCloud: %s\n", name)
return nil
}

View File

@ -0,0 +1,46 @@
package task
import (
"gitlink.org.cn/cloudream/common/pkgs/logger"
"gitlink.org.cn/cloudream/common/pkgs/task"
exectsk "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq/executor/task"
"gitlink.org.cn/cloudream/scheduler/executor/internal/task/create_ecs"
)
type ScheduleCreateECS struct {
*exectsk.ScheduleCreateECS
}
func NewScheduleCreateECS(info *exectsk.ScheduleCreateECS) *ScheduleCreateECS {
return &ScheduleCreateECS{
ScheduleCreateECS: info,
}
}
func (t *ScheduleCreateECS) Execute(task *task.Task[TaskContext], ctx TaskContext, complete CompleteFn) {
log := logger.WithType[ScheduleCreateECS]("Task")
log.Debugf("begin")
defer log.Debugf("end")
err := t.do(task.ID(), ctx)
if err != nil {
log.Error(err)
return
}
log.Info("ScheduleCreateECS...")
}
func (t *ScheduleCreateECS) do(taskID string, ctx TaskContext) error {
factory := create_ecs.GetFactory("AliCloud")
provider := factory.CreateProvider()
err := provider.CreateServer("MyServer")
if err != nil {
return err
}
return nil
}
func init() {
Register(NewScheduleCreateECS)
}

View File

@ -2,17 +2,15 @@ package main
import (
"fmt"
"os"
"time"
"gitlink.org.cn/cloudream/common/pkgs/logger"
schglb "gitlink.org.cn/cloudream/scheduler/common/globals"
execmq "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq/executor"
"gitlink.org.cn/cloudream/scheduler/executor/internal/config"
myglbs "gitlink.org.cn/cloudream/scheduler/executor/internal/globals"
"gitlink.org.cn/cloudream/scheduler/executor/internal/http"
"gitlink.org.cn/cloudream/scheduler/executor/internal/reporter"
"gitlink.org.cn/cloudream/scheduler/executor/internal/services"
"gitlink.org.cn/cloudream/scheduler/executor/internal/task"
"os"
)
func main() {
@ -28,29 +26,42 @@ func main() {
os.Exit(1)
}
schglb.InitMQPool(&config.Cfg().RabbitMQ)
//schglb.InitMQPool(&config.Cfg().RabbitMQ)
schglb.InitCloudreamStoragePool(&config.Cfg().CloudreamStorage)
schglb.InitPCMPool(&config.Cfg().PCM)
myglbs.Init()
rpter := reporter.NewReporter(myglbs.ExecutorID, time.Second*time.Duration(config.Cfg().ReportIntervalSec))
taskMgr := task.NewManager(&rpter)
mqSvr, err := execmq.NewServer(services.NewService(&taskMgr), &config.Cfg().RabbitMQ)
if err != nil {
logger.Fatalf("new executor server failed, err: %s", err.Error())
}
mqSvr.OnError(func(err error) {
logger.Warnf("executor server err: %s", err.Error())
})
//rpter := reporter.NewReporter(myglbs.ExecutorID, time.Second*time.Duration(config.Cfg().ReportIntervalSec))
//
//taskMgr := task.NewManager(&rpter)
//
//mqSvr, err := execmq.NewServer(services.NewService(&taskMgr), &config.Cfg().RabbitMQ)
//if err != nil {
// logger.Fatalf("new executor server failed, err: %s", err.Error())
//}
//
//mqSvr.OnError(func(err error) {
// logger.Warnf("executor server err: %s", err.Error())
//})
// 启动服务
go serveMQServer(mqSvr)
//go serveMQServer(mqSvr)
go serveReporter(&rpter)
//go serveReporter(&rpter)
svc := services.NewService()
server, err := http.NewServer(config.Cfg().Addr.Address, svc)
if err != nil {
logger.Warnf("new services failed, err: %s", err.Error())
os.Exit(1)
}
err = server.Serve()
if err != nil {
logger.Warnf("new services failed, err: %s", err.Error())
os.Exit(1)
}
forever := make(chan bool)
<-forever

View File

@ -1,7 +1,10 @@
package executormgr
import (
"bufio"
"encoding/json"
"fmt"
"net/http"
"sync"
"time"
@ -46,6 +49,28 @@ func NewManager(reportTimeout time.Duration) (*Manager, error) {
}, nil
}
func (m *Manager) ReceiveExecutorTaskStatus(resp *http.Response) {
defer resp.Body.Close()
scanner := bufio.NewScanner(resp.Body)
for scanner.Scan() {
line := scanner.Text()
if len(line) > 0 {
var msg mgrmq.ReportExecutorTaskStatus
if err := json.Unmarshal([]byte(line), &msg); err != nil {
fmt.Println("Error unmarshalling JSON:", err)
continue
}
fmt.Printf("Received: %+v\n", msg)
m.Report(msg.ExecutorID, msg.TaskStatus)
}
}
if err := scanner.Err(); err != nil {
fmt.Println("Error reading from response body:", err)
}
}
func (m *Manager) Report(execID schmod.ExecutorID, taskStatus []mgrmq.ExecutorTaskStatus) {
m.lock.Lock()
defer m.lock.Unlock()
@ -78,13 +103,33 @@ func (m *Manager) Report(execID schmod.ExecutorID, taskStatus []mgrmq.ExecutorTa
}
}
var ExecutorPool exemq.HttpPool
// 启动一个Task
func (m *Manager) StartTask(info exetsk.TaskInfo) *sync2.Channel[exetsk.TaskStatus] {
func (m *Manager) StartTask(info exetsk.TaskInfo, ccInfo schmod.ComputingCenter) *sync2.Channel[exetsk.TaskStatus] {
m.lock.Lock()
defer m.lock.Unlock()
ch := sync2.NewChannel[exetsk.TaskStatus]()
resp, err := m.exeCli.StartTask(exemq.NewStartTask(info))
client, err := ExecutorPool.AcquireByUrl(ccInfo.ExecutorURL)
//resp, err := m.exeCli.StartTask(exemq.NewStartTask(info))
if err != nil {
ch.CloseWithError(fmt.Errorf("start task: %w", err))
return ch
}
// 检测是否连接过这个Executor如果第一次连则发送请求监听上报信息
_, ok := m.executors[schmod.ExecutorID(ccInfo.ExecutorID)]
if !ok {
reportResp, err := client.GetReportInfo()
if err != nil {
ch.CloseWithError(fmt.Errorf("start task: %w", err))
return ch
}
m.ReceiveExecutorTaskStatus(reportResp)
}
resp, err := client.SubmitTask(exemq.NewStartTask(info))
if err != nil {
ch.CloseWithError(fmt.Errorf("start task: %w", err))
return ch

View File

@ -0,0 +1,24 @@
package executormgr
import (
"fmt"
schsdk "gitlink.org.cn/cloudream/common/sdks/scheduler"
gtwglb "gitlink.org.cn/cloudream/gateway/globals"
)
func (svc *JobSetService) Submit(info schsdk.JobSetInfo) (schsdk.JobSetID, *schsdk.JobSetFilesUploadScheme, error) {
schCli, err := gtwglb.CloudreamSchedulerPool.Acquire()
if err != nil {
return "", nil, fmt.Errorf("new cloudream scheduler client: %w", err)
}
defer gtwglb.CloudreamSchedulerPool.Release(schCli)
resp, err := schCli.JobSetSumbit(schsdk.JobSetSumbitReq{
JobSetInfo: info,
})
if err != nil {
return "", nil, fmt.Errorf("submitting job set: %w", err)
}
return resp.JobSetID, &resp.FilesUploadScheme, nil
}

View File

@ -136,7 +136,7 @@ func (s *NormalJobExecuting) do(rtx jobmgr.JobStateRunContext, jo *jobmgr.Job) e
envs,
// params, TODO params不应该是kv数组而应该是字符串数组
[]schsdk.KVPair{},
))
), ccInfo)
defer wt.Close()
for {
@ -214,7 +214,7 @@ func (s *DataReturnJobExecuting) do(rtx jobmgr.JobStateRunContext, jo *jobmgr.Jo
reJob.TargetJobOutputPath,
reJob.Info.BucketID,
packageName,
))
), ccInfo)
defer wt.Close()
status, err := wt.Receive(ctx)

View File

@ -150,7 +150,7 @@ func (s *PreScheduling) doPackageScheduling(ctx context.Context, rtx jobmgr.JobS
if scheme.Action == jobmod.ActionMove {
logger.Debugf("begin move pacakge %v to %v", file.PackageID, s.targetCCInfo.CDSNodeID)
wt := rtx.Mgr.ExecMgr.StartTask(exectsk.NewCacheMovePackage(1, file.PackageID, s.targetCCInfo.CDSNodeID))
wt := rtx.Mgr.ExecMgr.StartTask(exectsk.NewCacheMovePackage(1, file.PackageID, s.targetCCInfo.CDSNodeID), s.targetCCInfo)
defer wt.Close()
status, err := wt.Receive(ctx)
@ -169,7 +169,7 @@ func (s *PreScheduling) doPackageScheduling(ctx context.Context, rtx jobmgr.JobS
if scheme.Action == jobmod.ActionLoad {
logger.Debugf("begin load pacakge %v to %v", file.PackageID, s.targetCCInfo.CDSStorageID)
wt := rtx.Mgr.ExecMgr.StartTask(exectsk.NewStorageLoadPackage(1, file.PackageID, s.targetCCInfo.CDSStorageID))
wt := rtx.Mgr.ExecMgr.StartTask(exectsk.NewStorageLoadPackage(1, file.PackageID, s.targetCCInfo.CDSStorageID), s.targetCCInfo)
defer wt.Close()
status, err := wt.Receive(ctx)
@ -228,7 +228,7 @@ func (s *PreScheduling) doImageScheduling(ctx context.Context, rtx jobmgr.JobSta
}
// TODO UserID
wt := rtx.Mgr.ExecMgr.StartTask(exectsk.NewCacheMovePackage(1, *file.PackageID, s.targetCCInfo.CDSNodeID))
wt := rtx.Mgr.ExecMgr.StartTask(exectsk.NewCacheMovePackage(1, *file.PackageID, s.targetCCInfo.CDSNodeID), s.targetCCInfo)
defer wt.Close()
status, err := wt.Receive(ctx)
@ -261,7 +261,7 @@ func (s *PreScheduling) doImageScheduling(ctx context.Context, rtx jobmgr.JobSta
return fmt.Errorf("there must be only 1 object in the package which will be imported")
}
wt2 := rtx.Mgr.ExecMgr.StartTask(exectsk.NewUploadImage(s.targetCCInfo.PCMParticipantID, cdssdk.MakeIPFSFilePath(pkgObjs.Objects[0].FileHash)))
wt2 := rtx.Mgr.ExecMgr.StartTask(exectsk.NewUploadImage(s.targetCCInfo.PCMParticipantID, cdssdk.MakeIPFSFilePath(pkgObjs.Objects[0].FileHash)), s.targetCCInfo)
defer wt2.Close()
status2, err := wt2.Receive(ctx)

View File

@ -6,6 +6,6 @@ import (
)
func (svc *Service) ReportExecutorTaskStatus(msg *mgrmq.ReportExecutorTaskStatus) (*mgrmq.ReportExecutorTaskStatusResp, *mq.CodeMessage) {
svc.exeMgr.Report(msg.ExecutorID, msg.TaskStatus)
//svc.exeMgr.Report(msg.ExecutorID, msg.TaskStatus)
return mq.ReplyOK(mgrmq.NewReportExecutorTaskStatusResp())
}