1、多实例任务LocalJobID改成拼接一个随机值

2、普通任务OutputFullPath赋值
3、其他优化
This commit is contained in:
JeshuaRen 2024-05-15 16:10:06 +08:00
parent 2b1f71be34
commit 1fc424a494
15 changed files with 67 additions and 108 deletions

View File

@ -20,8 +20,8 @@ type CreateInstanceResp struct {
} }
type CreateInstanceReq struct { type CreateInstanceReq struct {
JobID schsdk.JobID `json:"jobID" binding:"required"` JobID schsdk.JobID `json:"jobID" binding:"required"`
LocalPath schsdk.JobFileInfo `json:"localPath" binding:"required"` DataSet schsdk.JobFileInfo `json:"localPath" binding:"required"`
} }
func (s *Server) JobSvc() *JobService { func (s *Server) JobSvc() *JobService {
@ -33,13 +33,6 @@ func (s *Server) JobSvc() *JobService {
func (s *JobService) CreateInstance(ctx *gin.Context) { func (s *JobService) CreateInstance(ctx *gin.Context) {
log := logger.WithField("HTTP", "JobSet.HTTP") log := logger.WithField("HTTP", "JobSet.HTTP")
//var req CreateInstanceReq
//if err := ctx.ShouldBindJSON(&req); err != nil {
// log.Warnf("binding body: %s", err.Error())
// ctx.JSON(http.StatusBadRequest, Failed(errorcode.BadArgument, "missing argument or invalid argument"))
// return
//}
bodyData, err := io.ReadAll(ctx.Request.Body) bodyData, err := io.ReadAll(ctx.Request.Body)
if err != nil { if err != nil {
log.Warnf("reading request body: %s", err.Error()) log.Warnf("reading request body: %s", err.Error())
@ -54,7 +47,7 @@ func (s *JobService) CreateInstance(ctx *gin.Context) {
return return
} }
jobID, filesUploadScheme, err := s.svc.JobSetSvc().CreateInstance(req.JobID, req.LocalPath) jobID, filesUploadScheme, err := s.svc.JobSetSvc().CreateInstance(req.JobID, req.DataSet)
if err != nil { if err != nil {
log.Warnf("create job instance: %s", err.Error()) log.Warnf("create job instance: %s", err.Error())
ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "create job instance failed")) ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "create job instance failed"))

View File

@ -39,7 +39,7 @@ func (s *Server) Serve() error {
func (s *Server) initRouters() { func (s *Server) initRouters() {
s.engine.POST("/jobSet/submit", s.JobSetSvc().Submit) s.engine.POST("/jobSet/submit", s.JobSetSvc().Submit)
s.engine.POST("/job/CreateInstance", s.JobSvc().CreateInstance) s.engine.POST("/job/createInstance", s.JobSvc().CreateInstance)
s.engine.POST("/jobSet/localFileUploaded", s.JobSetSvc().LocalFileUploaded) s.engine.POST("/jobSet/localFileUploaded", s.JobSetSvc().LocalFileUploaded)
s.engine.GET("/jobSet/getServiceList", s.JobSetSvc().GetServiceList) s.engine.GET("/jobSet/getServiceList", s.JobSetSvc().GetServiceList)
} }

View File

@ -8,7 +8,7 @@ import (
) )
// Create 创建多实例任务中的实例任务 // Create 创建多实例任务中的实例任务
func (svc *JobSetService) CreateInstance(jobID schsdk.JobID, LocalPath schsdk.JobFileInfo) (schsdk.JobID, schsdk.JobFilesUploadScheme, error) { func (svc *JobSetService) CreateInstance(jobID schsdk.JobID, dataSet schsdk.JobFileInfo) (schsdk.JobID, schsdk.JobFilesUploadScheme, error) {
scheme := new(schsdk.JobFilesUploadScheme) scheme := new(schsdk.JobFilesUploadScheme)
@ -18,7 +18,7 @@ func (svc *JobSetService) CreateInstance(jobID schsdk.JobID, LocalPath schsdk.Jo
} }
defer schglb.ManagerMQPool.Release(mgrCli) defer schglb.ManagerMQPool.Release(mgrCli)
resp, err := mgrCli.CreateInstance(mgrmq.NewCreateInstance(jobID, LocalPath)) resp, err := mgrCli.CreateInstance(mgrmq.NewCreateInstance(jobID, dataSet))
if err != nil { if err != nil {
return "", *scheme, fmt.Errorf("submitting job set to manager: %w", err) return "", *scheme, fmt.Errorf("submitting job set to manager: %w", err)
} }

View File

@ -27,7 +27,6 @@ var _ = serder.UseTypeUnionExternallyTagged(types.Ref(types.NewTypeUnion[JobStat
(*WaitTargetCompleteDump)(nil), (*WaitTargetCompleteDump)(nil),
))) )))
// 调整中
type AdjustingDump struct { type AdjustingDump struct {
serder.Metadata `union:"Adjusting"` serder.Metadata `union:"Adjusting"`
Type JobStateDumpType `json:"type"` Type JobStateDumpType `json:"type"`
@ -38,7 +37,6 @@ func (dump *AdjustingDump) getType() JobStateDumpType {
return dump.Type return dump.Type
} }
// 任务结束
type CompletedDump struct { type CompletedDump struct {
serder.Metadata `union:"Completed"` serder.Metadata `union:"Completed"`
Type JobStateDumpType `json:"type"` Type JobStateDumpType `json:"type"`
@ -67,7 +65,6 @@ func (dump *MultiInstCreateRunningDump) getType() JobStateDumpType {
return dump.Type return dump.Type
} }
// 普通任务执行中
type NormalJobExecutingDump struct { type NormalJobExecutingDump struct {
serder.Metadata `union:"NormalJobExecuting"` serder.Metadata `union:"NormalJobExecuting"`
Type JobStateDumpType `json:"type"` Type JobStateDumpType `json:"type"`
@ -78,7 +75,6 @@ func (dump *NormalJobExecutingDump) getType() JobStateDumpType {
return dump.Type return dump.Type
} }
// 回源任务执行中
type DataReturnExecutingDump struct { type DataReturnExecutingDump struct {
serder.Metadata `union:"DataReturnExecuting"` serder.Metadata `union:"DataReturnExecuting"`
Type JobStateDumpType `json:"type"` Type JobStateDumpType `json:"type"`
@ -88,7 +84,6 @@ func (dump *DataReturnExecutingDump) getType() JobStateDumpType {
return dump.Type return dump.Type
} }
// 制作调整方案中
type MakeingAdjustSchemeDump struct { type MakeingAdjustSchemeDump struct {
serder.Metadata `union:"MakeingAdjustScheme"` serder.Metadata `union:"MakeingAdjustScheme"`
Type JobStateDumpType `json:"type"` Type JobStateDumpType `json:"type"`
@ -98,7 +93,6 @@ func (dump *MakeingAdjustSchemeDump) getType() JobStateDumpType {
return dump.Type return dump.Type
} }
// 预调度中
type PreSchedulingDump struct { type PreSchedulingDump struct {
serder.Metadata `union:"PreScheduling"` serder.Metadata `union:"PreScheduling"`
Type JobStateDumpType `json:"type"` Type JobStateDumpType `json:"type"`
@ -109,7 +103,6 @@ func (dump *PreSchedulingDump) getType() JobStateDumpType {
return dump.Type return dump.Type
} }
// 准备调整中
type ReadyToAdjustDump struct { type ReadyToAdjustDump struct {
serder.Metadata `union:"ReadyToAdjust"` serder.Metadata `union:"ReadyToAdjust"`
Type JobStateDumpType `json:"type"` Type JobStateDumpType `json:"type"`
@ -119,7 +112,6 @@ func (dump *ReadyToAdjustDump) getType() JobStateDumpType {
return dump.Type return dump.Type
} }
// 普通任务准备执行中
type NormalJobReadyToExecuteDump struct { type NormalJobReadyToExecuteDump struct {
serder.Metadata `union:"NormalJobReadyToExecute"` serder.Metadata `union:"NormalJobReadyToExecute"`
Type JobStateDumpType `json:"type"` Type JobStateDumpType `json:"type"`
@ -129,7 +121,6 @@ func (dump *NormalJobReadyToExecuteDump) getType() JobStateDumpType {
return dump.Type return dump.Type
} }
// 回源任务准备执行中
type DataReturnReadyToExecuteDump struct { type DataReturnReadyToExecuteDump struct {
serder.Metadata `union:"DataReturnReadyToExecute"` serder.Metadata `union:"DataReturnReadyToExecute"`
Type JobStateDumpType `json:"type"` Type JobStateDumpType `json:"type"`
@ -139,7 +130,6 @@ func (dump *DataReturnReadyToExecuteDump) getType() JobStateDumpType {
return dump.Type return dump.Type
} }
// 等待回源目标完成中
type WaitTargetCompleteDump struct { type WaitTargetCompleteDump struct {
serder.Metadata `union:"WaitTargetComplete"` serder.Metadata `union:"WaitTargetComplete"`
Type JobStateDumpType `json:"type"` Type JobStateDumpType `json:"type"`

View File

@ -51,8 +51,8 @@ func (c *Client) SubmitJobSet(msg *SubmitJobSet, opts ...mq.RequestOption) (*Sub
type CreateInstance struct { type CreateInstance struct {
mq.MessageBodyBase mq.MessageBodyBase
JobID schsdk.JobID JobID schsdk.JobID
LocalPath schsdk.JobFileInfo DataSet schsdk.JobFileInfo
} }
type CreateInstanceResp struct { type CreateInstanceResp struct {
@ -61,10 +61,10 @@ type CreateInstanceResp struct {
UploadScheme schsdk.JobFilesUploadScheme `json:"uploadScheme"` UploadScheme schsdk.JobFilesUploadScheme `json:"uploadScheme"`
} }
func NewCreateInstance(jobID schsdk.JobID, LocalPath schsdk.JobFileInfo) *CreateInstance { func NewCreateInstance(jobID schsdk.JobID, dataSet schsdk.JobFileInfo) *CreateInstance {
return &CreateInstance{ return &CreateInstance{
JobID: jobID, JobID: jobID,
LocalPath: LocalPath, DataSet: dataSet,
} }
} }

View File

@ -346,8 +346,6 @@ func (s *DefaultPreScheduler) scheduleForNormalOrMultiJob(jobSet *schsdk.JobSetI
allCCs[cc.CCID] = caNode allCCs[cc.CCID] = caNode
} }
//norJob := job.Job.(*schsdk.NormalJobInfo)
var jobFiles *schsdk.JobFilesInfo var jobFiles *schsdk.JobFilesInfo
var jobResource *schsdk.JobResourcesInfo var jobResource *schsdk.JobResourcesInfo
@ -396,8 +394,6 @@ func (s *DefaultPreScheduler) scheduleForSingleJob(job *schedulingJob, ccs map[s
allCCs[cc.CCID] = caNode allCCs[cc.CCID] = caNode
} }
//norJob := job.Job.(*schsdk.NormalJobInfo)
var jobFiles *schsdk.JobFilesInfo var jobFiles *schsdk.JobFilesInfo
var jobResource *schsdk.JobResourcesInfo var jobResource *schsdk.JobResourcesInfo

View File

@ -1,7 +1,10 @@
package utils package utils
import ( import (
"crypto/sha256"
"encoding/hex"
"fmt" "fmt"
"math/rand"
"path/filepath" "path/filepath"
"strconv" "strconv"
"time" "time"
@ -17,3 +20,15 @@ func MakeJobOutputFullPath(stgDir string, userID cdssdk.UserID, jobID schsdk.Job
func MakeResourcePackageName(jobID schsdk.JobID) string { func MakeResourcePackageName(jobID schsdk.JobID) string {
return fmt.Sprintf("%s@%s", string(jobID), time.Now().Format("2006-01-02 15:04:05")) return fmt.Sprintf("%s@%s", string(jobID), time.Now().Format("2006-01-02 15:04:05"))
} }
func GenerateRandomID() string {
currentTime := time.Now().UnixNano() / int64(time.Millisecond)
rand.Seed(currentTime)
randomNum := rand.Intn(1000) // 0 到 999 之间的随机整数
idBase := fmt.Sprintf("%d%03d", currentTime, randomNum)
hasher := sha256.New()
hasher.Write([]byte(idBase))
hashBytes := hasher.Sum(nil)
hashedID := hex.EncodeToString(hashBytes)
return hashedID
}

View File

@ -8,8 +8,8 @@ import (
type CreateInstanceFuture = *future.SetValueFuture[CreateInstanceResult] type CreateInstanceFuture = *future.SetValueFuture[CreateInstanceResult]
type InstanceCreate struct { type InstanceCreate struct {
LocalPath schsdk.JobFileInfo DataSet schsdk.JobFileInfo
Result CreateInstanceFuture Result CreateInstanceFuture
} }
type CreateInstanceResult struct { type CreateInstanceResult struct {
@ -17,10 +17,10 @@ type CreateInstanceResult struct {
FilesUploadScheme schsdk.JobFilesUploadScheme FilesUploadScheme schsdk.JobFilesUploadScheme
} }
func NewInstanceCreate(LocalPath schsdk.JobFileInfo, future CreateInstanceFuture) *InstanceCreate { func NewInstanceCreate(dataSet schsdk.JobFileInfo, future CreateInstanceFuture) *InstanceCreate {
return &InstanceCreate{ return &InstanceCreate{
LocalPath: LocalPath, DataSet: dataSet,
Result: future, Result: future,
} }
} }

View File

@ -32,11 +32,6 @@ func NewEventSet() EventSet {
return EventSet{} return EventSet{}
} }
// Post 函数用于向事件集合中发布一个事件。
// 如果有等待该事件的协程,会唤醒它们并将事件传递给它们。
// 参数:
//
// evt Event - 需要发布的事件对象。
func (s *EventSet) Post(evt Event) { func (s *EventSet) Post(evt Event) {
s.lock.Lock() // 加锁保护事件集合 s.lock.Lock() // 加锁保护事件集合
defer s.lock.Unlock() // 确保在函数结束时释放锁 defer s.lock.Unlock() // 确保在函数结束时释放锁
@ -59,9 +54,7 @@ func (s *EventSet) Post(evt Event) {
func (s *EventSet) Wait(ctx context.Context, cond EventWaitCondition) (Event, bool) { func (s *EventSet) Wait(ctx context.Context, cond EventWaitCondition) (Event, bool) {
s.lock.Lock() s.lock.Lock()
//defer s.lock.Unlock()
// 一个等待者只能等待一个事件
for i, evt := range s.events { for i, evt := range s.events {
if cond(evt) { if cond(evt) {
s.events = lo2.RemoveAt(s.events, i) s.events = lo2.RemoveAt(s.events, i)
@ -75,7 +68,6 @@ func (s *EventSet) Wait(ctx context.Context, cond EventWaitCondition) (Event, bo
condition: cond, condition: cond,
future: fut, future: fut,
} }
//s.events = append(s.events, waiter)
s.waiters = append(s.waiters, waiter) s.waiters = append(s.waiters, waiter)
s.lock.Unlock() s.lock.Unlock()

View File

@ -1,14 +0,0 @@
package job
import (
"fmt"
"testing"
)
func TestFunc(t *testing.T) {
a := 1
switch a {
case 1, 2:
fmt.Println("aaa11111")
}
}

View File

@ -46,24 +46,6 @@ func (s *Adjusting) Dump(rtx jobmgr.JobStateRunContext, jo *jobmgr.Job) jobmod.J
} }
func (s *Adjusting) do(rtx jobmgr.JobStateRunContext, jo *jobmgr.Job) error { func (s *Adjusting) do(rtx jobmgr.JobStateRunContext, jo *jobmgr.Job) error {
//norJob := jo.Body.(*job.NormalJob)
var jobFilesInfo schsdk.JobFilesInfo
var jobFiles *jobmod.JobFiles
switch runningJob := jo.Body.(type) {
case *job.NormalJob:
jobFilesInfo = runningJob.Info.Files
jobFiles = &runningJob.Files
runningJob.TargetCCID = s.scheme.TargetCCID
case *job.MultiInstanceJob:
jobFilesInfo = runningJob.Info.Files
jobFiles = &runningJob.Files
runningJob.TargetCCID = s.scheme.TargetCCID
case *job.InstanceJob:
jobFilesInfo = runningJob.Info.Files
jobFiles = &runningJob.Files
runningJob.TargetCCID = s.scheme.TargetCCID
}
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
defer cancel() defer cancel()
@ -94,8 +76,27 @@ func (s *Adjusting) do(rtx jobmgr.JobStateRunContext, jo *jobmgr.Job) error {
return fmt.Errorf("getting cds storage info: %w", err) return fmt.Errorf("getting cds storage info: %w", err)
} }
// TODO UserID // TODO UserID
//norJob.OutputFullPath = utils.MakeJobOutputFullPath(stgInfo.Directory, 1, jo.JobID) outputFullPath := utils.MakeJobOutputFullPath(stgInfo.Directory, 1, jo.JobID)
utils.MakeJobOutputFullPath(stgInfo.Directory, 1, jo.JobID)
var jobFilesInfo schsdk.JobFilesInfo
var jobFiles *jobmod.JobFiles
switch runningJob := jo.Body.(type) {
case *job.NormalJob:
jobFilesInfo = runningJob.Info.Files
jobFiles = &runningJob.Files
runningJob.TargetCCID = s.scheme.TargetCCID
runningJob.OutputFullPath = outputFullPath
case *job.MultiInstanceJob:
jobFilesInfo = runningJob.Info.Files
jobFiles = &runningJob.Files
runningJob.TargetCCID = s.scheme.TargetCCID
case *job.InstanceJob:
jobFilesInfo = runningJob.Info.Files
jobFiles = &runningJob.Files
runningJob.TargetCCID = s.scheme.TargetCCID
runningJob.OutputFullPath = outputFullPath
}
wg := sync.WaitGroup{} wg := sync.WaitGroup{}
wg.Add(3) wg.Add(3)

View File

@ -2,10 +2,12 @@ package state
import ( import (
"context" "context"
"fmt"
"gitlink.org.cn/cloudream/common/pkgs/logger" "gitlink.org.cn/cloudream/common/pkgs/logger"
schsdk "gitlink.org.cn/cloudream/common/sdks/scheduler" schsdk "gitlink.org.cn/cloudream/common/sdks/scheduler"
jobmod "gitlink.org.cn/cloudream/scheduler/common/models/job" jobmod "gitlink.org.cn/cloudream/scheduler/common/models/job"
"gitlink.org.cn/cloudream/scheduler/common/pkgs/prescheduler" "gitlink.org.cn/cloudream/scheduler/common/pkgs/prescheduler"
"gitlink.org.cn/cloudream/scheduler/common/utils"
"gitlink.org.cn/cloudream/scheduler/manager/internal/jobmgr" "gitlink.org.cn/cloudream/scheduler/manager/internal/jobmgr"
"gitlink.org.cn/cloudream/scheduler/manager/internal/jobmgr/event" "gitlink.org.cn/cloudream/scheduler/manager/internal/jobmgr/event"
"gitlink.org.cn/cloudream/scheduler/manager/internal/jobmgr/job" "gitlink.org.cn/cloudream/scheduler/manager/internal/jobmgr/job"
@ -33,9 +35,11 @@ func (s *MultiInstanceInit) do(rtx jobmgr.JobStateRunContext, jo *jobmgr.Job) {
cancel() cancel()
}() }()
newLocalJobID := fmt.Sprintf("%s_%s", multInstJob.Info.LocalJobID, utils.GenerateRandomID())
instJobInfo := &schsdk.InstanceJobInfo{ instJobInfo := &schsdk.InstanceJobInfo{
Type: schsdk.JobTypeInstance, Type: schsdk.JobTypeInstance,
LocalJobID: multInstJob.Info.LocalJobID, LocalJobID: newLocalJobID,
Files: multInstJob.Info.Files, Files: multInstJob.Info.Files,
Runtime: multInstJob.Info.Runtime, Runtime: multInstJob.Info.Runtime,
Resources: multInstJob.Info.Resources, Resources: multInstJob.Info.Resources,
@ -55,14 +59,8 @@ func (s *MultiInstanceInit) do(rtx jobmgr.JobStateRunContext, jo *jobmgr.Job) {
// 在多实例任务中新增这个实例的任务ID // 在多实例任务中新增这个实例的任务ID
multInstJob.SubJobs = append(multInstJob.SubJobs, jobID) multInstJob.SubJobs = append(multInstJob.SubJobs, jobID)
job := &jobmgr.Job{ rtx.Mgr.ChangeState(jo, NewMultiInstanceRunning(prescheduler.NewDefaultPreScheduler()))
JobSetID: jo.JobSetID, logger.Info("Create multiInstance job success, jobID: " + jo.JobID)
JobID: jo.JobID,
Body: multInstJob,
}
rtx.Mgr.ChangeState(job, NewMultiInstanceRunning(prescheduler.NewDefaultPreScheduler()))
logger.Info("Create multiInstance job success, jobID: " + job.JobID)
} }
func (s *MultiInstanceInit) Dump(ctx jobmgr.JobStateRunContext, job *jobmgr.Job) jobmod.JobStateDump { func (s *MultiInstanceInit) Dump(ctx jobmgr.JobStateRunContext, job *jobmgr.Job) jobmod.JobStateDump {

View File

@ -2,10 +2,12 @@ package state
import ( import (
"context" "context"
"fmt"
"gitlink.org.cn/cloudream/common/pkgs/logger" "gitlink.org.cn/cloudream/common/pkgs/logger"
schsdk "gitlink.org.cn/cloudream/common/sdks/scheduler" schsdk "gitlink.org.cn/cloudream/common/sdks/scheduler"
jobmod "gitlink.org.cn/cloudream/scheduler/common/models/job" jobmod "gitlink.org.cn/cloudream/scheduler/common/models/job"
"gitlink.org.cn/cloudream/scheduler/common/pkgs/prescheduler" "gitlink.org.cn/cloudream/scheduler/common/pkgs/prescheduler"
"gitlink.org.cn/cloudream/scheduler/common/utils"
"gitlink.org.cn/cloudream/scheduler/manager/internal/jobmgr" "gitlink.org.cn/cloudream/scheduler/manager/internal/jobmgr"
"gitlink.org.cn/cloudream/scheduler/manager/internal/jobmgr/event" "gitlink.org.cn/cloudream/scheduler/manager/internal/jobmgr/event"
"gitlink.org.cn/cloudream/scheduler/manager/internal/jobmgr/job" "gitlink.org.cn/cloudream/scheduler/manager/internal/jobmgr/job"
@ -21,10 +23,6 @@ func NewMultiInstanceRunning(preScheduler prescheduler.PreScheduler) *MultiInsta
} }
} }
//func NewMultiInstanceRunning() *MultiInstanceRunning {
// return &MultiInstanceRunning{}
//}
func (s *MultiInstanceRunning) Run(rtx jobmgr.JobStateRunContext, job *jobmgr.Job) { func (s *MultiInstanceRunning) Run(rtx jobmgr.JobStateRunContext, job *jobmgr.Job) {
s.do(rtx, job) s.do(rtx, job)
} }
@ -53,14 +51,16 @@ func (s *MultiInstanceRunning) do(rtx jobmgr.JobStateRunContext, jo *jobmgr.Job)
// 构建InstanceJobInfo // 构建InstanceJobInfo
infoFiles := schsdk.JobFilesInfo{ infoFiles := schsdk.JobFilesInfo{
Dataset: ic.LocalPath, Dataset: ic.DataSet,
Code: multInstJob.Info.Files.Code, Code: multInstJob.Info.Files.Code,
Image: multInstJob.Info.Files.Image, Image: multInstJob.Info.Files.Image,
} }
newLocalJobID := fmt.Sprintf("%s_%s", multInstJob.Info.LocalJobID, utils.GenerateRandomID())
instJobInfo := &schsdk.InstanceJobInfo{ instJobInfo := &schsdk.InstanceJobInfo{
Type: schsdk.JobTypeInstance, Type: schsdk.JobTypeInstance,
LocalJobID: multInstJob.Info.LocalJobID, LocalJobID: newLocalJobID,
Files: infoFiles, Files: infoFiles,
Runtime: multInstJob.Info.Runtime, Runtime: multInstJob.Info.Runtime,
Resources: multInstJob.Info.Resources, Resources: multInstJob.Info.Resources,

View File

@ -129,21 +129,17 @@ func (m *Manager) PostEvent(jobID schsdk.JobID, evt Event) {
// 向某个任务集中的所有任务投递事件 // 向某个任务集中的所有任务投递事件
func (m *Manager) BroadcastEvent(jobSetID schsdk.JobSetID, evt Event) { func (m *Manager) BroadcastEvent(jobSetID schsdk.JobSetID, evt Event) {
// 加锁以确保发布事件时的线程安全
m.pubLock.Lock() m.pubLock.Lock()
defer m.pubLock.Unlock() // 确保函数退出时释放锁 defer m.pubLock.Unlock()
// 尝试从管理器的作业集中获取指定的作业集
jobSet, ok := m.jobSets[jobSetID] jobSet, ok := m.jobSets[jobSetID]
if !ok { if !ok {
// 如果作业集不存在,则直接返回 // 如果作业集不存在,则直接返回
return return
} }
// 遍历作业集中的所有任务,并为每个任务发布事件
for _, mjob := range jobSet.jobs { for _, mjob := range jobSet.jobs {
go func(j *mgrJob) { go func(j *mgrJob) {
// 使用 goroutine 为每个任务发布事件,以异步方式处理,避免阻塞
j.eventSet.Post(evt) j.eventSet.Post(evt)
}(mjob) }(mjob)
} }
@ -156,21 +152,17 @@ type SubmittingJob struct {
// 提交一个任务集 // 提交一个任务集
func (m *Manager) SubmitJobSet(jobs []SubmittingJob) schsdk.JobSetID { func (m *Manager) SubmitJobSet(jobs []SubmittingJob) schsdk.JobSetID {
// 加锁以保护对作业集ID和作业ID索引的修改
m.pubLock.Lock() m.pubLock.Lock()
defer m.pubLock.Unlock() defer m.pubLock.Unlock()
// 生成一个新的作业集ID并递增作业集ID索引
jobSetID := schsdk.JobSetID(fmt.Sprintf("%d", m.jobSetIDIndex)) jobSetID := schsdk.JobSetID(fmt.Sprintf("%d", m.jobSetIDIndex))
m.jobSetIDIndex += 1 m.jobSetIDIndex += 1
// 创建一个新的作业集实例,并初始化其作业映射
jobSet := &mgrJobSet{ jobSet := &mgrJobSet{
jobs: make(map[schsdk.JobID]*mgrJob), jobs: make(map[schsdk.JobID]*mgrJob),
} }
m.jobSets[jobSetID] = jobSet m.jobSets[jobSetID] = jobSet
// 遍历提交的作业为每个作业创建一个唯一的作业ID初始化作业状态并将其添加到作业集中
for i, subJob := range jobs { for i, subJob := range jobs {
jobID := schsdk.JobID(fmt.Sprintf("%d", m.jobIDIndex+i)) jobID := schsdk.JobID(fmt.Sprintf("%d", m.jobIDIndex+i))
job := &mgrJob{ job := &mgrJob{
@ -185,8 +177,6 @@ func (m *Manager) SubmitJobSet(jobs []SubmittingJob) schsdk.JobSetID {
jobSet.jobs[jobID] = job jobSet.jobs[jobID] = job
m.jobs[jobID] = job m.jobs[jobID] = job
// 更改作业的初始状态
//m.ChangeState(&job.job, subJob.InitState)
go func() { go func() {
subJob.InitState.Run(JobStateRunContext{ subJob.InitState.Run(JobStateRunContext{
Mgr: m, Mgr: m,
@ -195,10 +185,8 @@ func (m *Manager) SubmitJobSet(jobs []SubmittingJob) schsdk.JobSetID {
}, &job.job) }, &job.job)
}() }()
} }
// 更新作业ID索引基于提交的作业数量
m.jobIDIndex += len(jobs) m.jobIDIndex += len(jobs)
// 返回生成的作业集ID
return jobSetID return jobSetID
} }

View File

@ -71,7 +71,7 @@ func (svc *Service) CreateInstance(instInfo *mgrmq.CreateInstance) (*mgrmq.Creat
logger.Debugf("start create instance") logger.Debugf("start create instance")
fut := future.NewSetValue[event.CreateInstanceResult]() fut := future.NewSetValue[event.CreateInstanceResult]()
svc.jobMgr.PostEvent(instInfo.JobID, event.NewInstanceCreate(instInfo.LocalPath, fut)) svc.jobMgr.PostEvent(instInfo.JobID, event.NewInstanceCreate(instInfo.DataSet, fut))
result, err := fut.WaitValue(context.TODO()) result, err := fut.WaitValue(context.TODO())