Merge remote-tracking branch 'origin/mv_dctrl'

This commit is contained in:
Sydonian 2023-08-29 15:02:29 +08:00
commit bf9f3b926b
11 changed files with 380 additions and 0 deletions

2
executor/README.md Normal file
View File

@ -0,0 +1,2 @@
# scheduler-dcontroller

3
executor/go.mod Normal file
View File

@ -0,0 +1,3 @@
module gitlink.org.cn/cloudream/scheduler-dcontroller
go 1.20

View File

@ -0,0 +1,24 @@
package config
import (
cldstg "gitlink.org.cn/cloudream/common/api/storage"
log "gitlink.org.cn/cloudream/common/pkgs/logger"
c "gitlink.org.cn/cloudream/common/utils/config"
mymq "gitlink.org.cn/cloudream/scheduler-common/pkgs/mq"
)
type Config struct {
Logger log.Config `json:"logger"`
RabbitMQ mymq.Config `json:"rabbitMQ"`
CloudreamStorage cldstg.Config `json:"cloudreamStorage"`
}
var cfg Config
func Init() error {
return c.DefaultLoad("dcontroller", &cfg)
}
func Cfg() *Config {
return &cfg
}

View File

@ -0,0 +1,65 @@
package services
import (
"time"
"gitlink.org.cn/cloudream/common/pkgs/mq"
dctrlmq "gitlink.org.cn/cloudream/scheduler-common/pkgs/mq/dcontroller"
schtsk "gitlink.org.cn/cloudream/scheduler-dcontroller/internal/task"
)
func (svc *Service) StartStorageLoadPackage(msg *dctrlmq.StartStorageLoadPackage) (*dctrlmq.StartStorageLoadPackageResp, *mq.CodeMessage) {
tsk := svc.taskManager.StartNew(schtsk.NewStorageLoadPackage(msg.UserID, msg.PackageID, msg.StorageID))
return mq.ReplyOK(dctrlmq.NewStartStorageLoadPackageResp(tsk.ID()))
}
func (svc *Service) WaitStorageLoadPackage(msg *dctrlmq.WaitStorageLoadPackage) (*dctrlmq.WaitStorageLoadPackageResp, *mq.CodeMessage) {
tsk := svc.taskManager.FindByID(msg.TaskID)
if tsk.WaitTimeout(time.Duration(msg.WaitTimeoutMs) * time.Millisecond) {
errMsg := ""
if tsk.Error() != nil {
errMsg = tsk.Error().Error()
}
return mq.ReplyOK(dctrlmq.NewWaitStorageLoadPackageResp(true, errMsg))
}
return mq.ReplyOK(dctrlmq.NewWaitStorageLoadPackageResp(false, ""))
}
func (svc *Service) StartStorageCreatePackage(msg *dctrlmq.StartStorageCreatePackage) (*dctrlmq.StartStorageCreatePackageResp, *mq.CodeMessage) {
tsk := svc.taskManager.StartNew(schtsk.NewStorageCreatePackage(msg.UserID, msg.StorageID, msg.Path, msg.BucketID, msg.Name, msg.Redundancy))
return mq.ReplyOK(dctrlmq.NewStartStorageCreatePackageResp(tsk.ID()))
}
func (svc *Service) WaitStorageCreatePackage(msg *dctrlmq.WaitStorageCreatePackage) (*dctrlmq.WaitStorageCreatePackageResp, *mq.CodeMessage) {
tsk := svc.taskManager.FindByID(msg.TaskID)
if tsk.WaitTimeout(time.Duration(msg.WaitTimeoutMs) * time.Millisecond) {
tskBody := tsk.Body().(*schtsk.StorageCreatePackage)
errMsg := ""
if tsk.Error() != nil {
errMsg = tsk.Error().Error()
}
return mq.ReplyOK(dctrlmq.NewWaitStorageCreatePackageResp(true, errMsg, tskBody.Result.PackageID))
}
return mq.ReplyOK(dctrlmq.NewWaitStorageCreatePackageResp(false, "", 0))
}
func (svc *Service) StartCacheMovePackage(msg *dctrlmq.StartCacheMovePackage) (*dctrlmq.StartCacheMovePackageResp, *mq.CodeMessage) {
tsk := svc.taskManager.StartNew(schtsk.NewCacheMovePackage(msg.UserID, msg.PackageID, msg.NodeID))
return mq.ReplyOK(dctrlmq.NewStartCacheMovePackageResp(tsk.ID()))
}
func (svc *Service) WaitCacheMovePackage(msg *dctrlmq.WaitCacheMovePackage) (*dctrlmq.WaitCacheMovePackageResp, *mq.CodeMessage) {
tsk := svc.taskManager.FindByID(msg.TaskID)
if tsk.WaitTimeout(time.Duration(msg.WaitTimeoutMs) * time.Millisecond) {
errMsg := ""
if tsk.Error() != nil {
errMsg = tsk.Error().Error()
}
return mq.ReplyOK(dctrlmq.NewWaitCacheMovePackageResp(true, errMsg))
}
return mq.ReplyOK(dctrlmq.NewWaitCacheMovePackageResp(false, ""))
}

View File

@ -0,0 +1,15 @@
package services
import (
"gitlink.org.cn/cloudream/scheduler-dcontroller/internal/task"
)
type Service struct {
taskManager *task.Manager
}
func NewService(taskMgr *task.Manager) *Service {
return &Service{
taskManager: taskMgr,
}
}

View File

@ -0,0 +1,49 @@
package task
import (
"fmt"
"time"
"gitlink.org.cn/cloudream/common/api/storage"
"gitlink.org.cn/cloudream/common/pkgs/logger"
"gitlink.org.cn/cloudream/scheduler-common/globals"
)
type CacheMovePackage struct {
userID int64
packageID int64
nodeID int64
}
func NewCacheMovePackage(userID int64, packageID int64, nodeID int64) *CacheMovePackage {
return &CacheMovePackage{
userID: userID,
packageID: packageID,
nodeID: nodeID,
}
}
func (t *CacheMovePackage) Execute(ctx TaskContext, complete CompleteFn) {
log := logger.WithType[CacheMovePackage]("Task")
log.Debugf("begin with %v", logger.FormatStruct(t))
defer log.Debugf("end")
err := t.do(ctx)
complete(err, CompleteOption{
RemovingDelay: time.Minute,
})
}
func (t *CacheMovePackage) do(ctx TaskContext) error {
stgCli, err := globals.CloudreamStoragePool.Acquire()
if err != nil {
return fmt.Errorf("new cloudream storage client: %w", err)
}
defer stgCli.Close()
return stgCli.CacheMovePackage(storage.CacheMovePackageReq{
UserID: t.userID,
PackageID: t.packageID,
NodeID: t.packageID,
})
}

View File

@ -0,0 +1,72 @@
package task
import (
"fmt"
"time"
"gitlink.org.cn/cloudream/common/api/storage"
"gitlink.org.cn/cloudream/common/models"
"gitlink.org.cn/cloudream/common/pkgs/logger"
"gitlink.org.cn/cloudream/scheduler-common/globals"
)
type StorageCreatePackage struct {
userID int64
storageID int64
path string
bucketID int64
name string
redundancy models.TypedRedundancyInfo
Result StorageCreatePackageResult
}
type StorageCreatePackageResult struct {
PackageID int64
}
func NewStorageCreatePackage(userID int64, storageID int64, path string, bucketID int64, name string, redundancy models.TypedRedundancyInfo) *StorageCreatePackage {
return &StorageCreatePackage{
userID: userID,
storageID: storageID,
path: path,
bucketID: bucketID,
name: name,
redundancy: redundancy,
}
}
func (t *StorageCreatePackage) Execute(ctx TaskContext, complete CompleteFn) {
log := logger.WithType[StorageCreatePackage]("Task")
log.Debugf("begin")
defer log.Debugf("end")
err := t.do(ctx)
complete(err, CompleteOption{
RemovingDelay: time.Minute,
})
}
func (t *StorageCreatePackage) do(ctx TaskContext) error {
stgCli, err := globals.CloudreamStoragePool.Acquire()
if err != nil {
return fmt.Errorf("new cloudream storage client: %w", err)
}
defer stgCli.Close()
resp, err := stgCli.StorageCreatePackage(storage.StorageCreatePackageReq{
UserID: t.userID,
StorageID: t.storageID,
Path: t.path,
BucketID: t.bucketID,
Name: t.name,
Redundancy: t.redundancy,
})
if err != nil {
return err
}
t.Result.PackageID = resp.PackageID
return nil
}

View File

@ -0,0 +1,49 @@
package task
import (
"fmt"
"time"
"gitlink.org.cn/cloudream/common/api/storage"
"gitlink.org.cn/cloudream/common/pkgs/logger"
"gitlink.org.cn/cloudream/scheduler-common/globals"
)
type StorageLoadPackage struct {
userID int64
packageID int64
storageID int64
}
func NewStorageLoadPackage(userID int64, packageID int64, storageID int64) *StorageLoadPackage {
return &StorageLoadPackage{
userID: userID,
packageID: packageID,
storageID: storageID,
}
}
func (t *StorageLoadPackage) Execute(ctx TaskContext, complete CompleteFn) {
log := logger.WithType[StorageLoadPackage]("Task")
log.Debugf("begin with %w", logger.FormatStruct(t))
defer log.Debugf("end")
err := t.do(ctx)
complete(err, CompleteOption{
RemovingDelay: time.Minute,
})
}
func (t *StorageLoadPackage) do(ctx TaskContext) error {
stgCli, err := globals.CloudreamStoragePool.Acquire()
if err != nil {
return fmt.Errorf("new cloudream storage client: %w", err)
}
defer stgCli.Close()
return stgCli.StorageLoadPackage(storage.StorageLoadPackageReq{
UserID: t.userID,
PackageID: t.packageID,
StorageID: t.storageID,
})
}

View File

@ -0,0 +1,24 @@
package task
import (
"gitlink.org.cn/cloudream/common/pkgs/task"
)
type TaskContext struct {
}
// 需要在Task结束后主动调用completing函数将在Manager加锁期间被调用
// 因此适合进行执行结果的设置
type CompleteFn = task.CompleteFn
type Manager = task.Manager[TaskContext]
type TaskBody = task.TaskBody[TaskContext]
type Task = task.Task[TaskContext]
type CompleteOption = task.CompleteOption
func NewManager() Manager {
return task.NewManager(TaskContext{})
}

View File

@ -0,0 +1,20 @@
//go:build mage
package main
import (
"gitlink.org.cn/cloudream/common/magefiles"
//mage:import
_ "gitlink.org.cn/cloudream/common/magefiles/targets"
)
var Default = Build
func Build() error {
return magefiles.Build(magefiles.BuildArgs{
OutputName: "dcontroller",
OutputDir: "dcontroller",
AssetsDir: "assets",
})
}

57
executor/main.go Normal file
View File

@ -0,0 +1,57 @@
package main
import (
"fmt"
"os"
"gitlink.org.cn/cloudream/common/pkgs/logger"
"gitlink.org.cn/cloudream/scheduler-common/globals"
dctrlmq "gitlink.org.cn/cloudream/scheduler-common/pkgs/mq/dcontroller"
"gitlink.org.cn/cloudream/scheduler-dcontroller/internal/config"
"gitlink.org.cn/cloudream/scheduler-dcontroller/internal/services"
"gitlink.org.cn/cloudream/scheduler-dcontroller/internal/task"
)
func main() {
err := config.Init()
if err != nil {
fmt.Printf("init config failed, err: %s", err.Error())
os.Exit(1)
}
err = logger.Init(&config.Cfg().Logger)
if err != nil {
fmt.Printf("init logger failed, err: %s", err.Error())
os.Exit(1)
}
globals.InitCloudreamStoragePool(&config.Cfg().CloudreamStorage)
taskMgr := task.NewManager()
mqSvr, err := dctrlmq.NewServer(services.NewService(&taskMgr), &config.Cfg().RabbitMQ)
if err != nil {
logger.Fatalf("new dcontroller server failed, err: %s", err.Error())
}
mqSvr.OnError = func(err error) {
logger.Warnf("dcontroller server err: %s", err.Error())
}
// 启动服务
go serveMQServer(mqSvr)
forever := make(chan bool)
<-forever
}
func serveMQServer(server *dctrlmq.Server) {
logger.Info("start serving mq server")
err := server.Serve()
if err != nil {
logger.Errorf("mq server stopped with error: %s", err.Error())
}
logger.Info("mq server stopped")
}