增加UserSpace相关CRUD接口
This commit is contained in:
parent
993a81bf55
commit
f2c1c7cc81
|
@ -144,7 +144,14 @@ func (k *Keeper) Stop() {
|
|||
}
|
||||
}
|
||||
|
||||
func (k *Keeper) GetAuthInfo() (rpc.AccessTokenAuthInfo, error) {
|
||||
func (k *Keeper) GetToken() cortypes.UserAccessToken {
|
||||
k.lock.RLock()
|
||||
defer k.lock.RUnlock()
|
||||
|
||||
return k.token
|
||||
}
|
||||
|
||||
func (k *Keeper) MakeAuthInfo() (rpc.AccessTokenAuthInfo, error) {
|
||||
if !k.enabled {
|
||||
return rpc.AccessTokenAuthInfo{}, fmt.Errorf("function disabled")
|
||||
}
|
||||
|
|
|
@ -100,8 +100,10 @@ func serveHTTP(configPath string, opts serveHTTPOptions) {
|
|||
os.Exit(1)
|
||||
}
|
||||
|
||||
stgglb.UserID = accToken.GetToken().UserID
|
||||
stgglb.InitPools(hubRPCCfg, corRPCCfg)
|
||||
} else {
|
||||
stgglb.UserID = 0
|
||||
accToken = accesstoken.NewDisabled()
|
||||
}
|
||||
accTokenChan := accToken.Start()
|
||||
|
@ -115,7 +117,7 @@ func serveHTTP(configPath string, opts serveHTTPOptions) {
|
|||
|
||||
// 初始化系统事件发布器
|
||||
evtPub, err := sysevent.NewPublisher(config.Cfg().SysEvent, &datamap.SourceClient{
|
||||
UserID: config.Cfg().Local.UserID,
|
||||
UserID: stgglb.UserID,
|
||||
})
|
||||
if err != nil {
|
||||
logger.Errorf("new sysevent publisher: %v", err)
|
||||
|
|
|
@ -111,8 +111,10 @@ func test(configPath string) {
|
|||
os.Exit(1)
|
||||
}
|
||||
|
||||
stgglb.UserID = accToken.GetToken().UserID
|
||||
stgglb.InitPools(hubRPCCfg, corRPCCfg)
|
||||
} else {
|
||||
stgglb.UserID = 0
|
||||
accToken = accesstoken.NewDisabled()
|
||||
}
|
||||
accTokenChan := accToken.Start()
|
||||
|
@ -126,7 +128,7 @@ func test(configPath string) {
|
|||
|
||||
// 初始化系统事件发布器
|
||||
evtPub, err := sysevent.NewPublisher(config.Cfg().SysEvent, &datamap.SourceClient{
|
||||
UserID: config.Cfg().Local.UserID,
|
||||
UserID: stgglb.UserID,
|
||||
})
|
||||
if err != nil {
|
||||
logger.Errorf("new sysevent publisher: %v", err)
|
||||
|
|
|
@ -91,8 +91,10 @@ func vfsTest(configPath string, opts serveHTTPOptions) {
|
|||
os.Exit(1)
|
||||
}
|
||||
|
||||
stgglb.UserID = accToken.GetToken().UserID
|
||||
stgglb.InitPools(hubRPCCfg, corRPCCfg)
|
||||
} else {
|
||||
stgglb.UserID = 0
|
||||
accToken = accesstoken.NewDisabled()
|
||||
}
|
||||
accTokenChan := accToken.Start()
|
||||
|
@ -106,7 +108,7 @@ func vfsTest(configPath string, opts serveHTTPOptions) {
|
|||
|
||||
// 初始化系统事件发布器
|
||||
evtPub, err := sysevent.NewPublisher(config.Cfg().SysEvent, &datamap.SourceClient{
|
||||
UserID: config.Cfg().Local.UserID,
|
||||
UserID: stgglb.UserID,
|
||||
})
|
||||
if err != nil {
|
||||
logger.Errorf("new sysevent publisher: %v", err)
|
||||
|
|
|
@ -112,3 +112,8 @@ func (*ObjectAccessStatDB) DeleteInPackage(ctx SQLContext, packageID types.Packa
|
|||
err := ctx.Exec("DELETE o FROM ObjectAccessStat o INNER JOIN Object obj ON o.ObjectID = obj.ObjectID WHERE obj.PackageID = ?", packageID).Error
|
||||
return err
|
||||
}
|
||||
|
||||
func (*ObjectAccessStatDB) DeleteByUserSpaceID(ctx SQLContext, spaceID types.UserSpaceID) error {
|
||||
err := ctx.Table("ObjectAccessStat").Where("UserSpaceID = ?", spaceID).Delete(&types.ObjectAccessStat{}).Error
|
||||
return err
|
||||
}
|
||||
|
|
|
@ -80,6 +80,10 @@ func (db *ObjectBlockDB) BatchDeleteByFileHash(ctx SQLContext, spaceID types.Use
|
|||
return ctx.Table("ObjectBlock").Where("UserSpaceID = ? AND FileHash IN (?)", spaceID, fileHashes).Delete(&types.ObjectBlock{}).Error
|
||||
}
|
||||
|
||||
func (*ObjectBlockDB) DeleteByUserSpaceID(ctx SQLContext, spaceID types.UserSpaceID) error {
|
||||
return ctx.Table("ObjectBlock").Where("UserSpaceID = ?", spaceID).Delete(&types.ObjectBlock{}).Error
|
||||
}
|
||||
|
||||
func (db *ObjectBlockDB) CountBlockWithHash(ctx SQLContext, fileHash string) (int, error) {
|
||||
var cnt int64
|
||||
err := ctx.Table("ObjectBlock").
|
||||
|
|
|
@ -75,3 +75,7 @@ func (*PackageAccessStatDB) UpdateAllAmount(ctx SQLContext, historyWeight float6
|
|||
func (*PackageAccessStatDB) DeleteByPackageID(ctx SQLContext, pkgID types.PackageID) error {
|
||||
return ctx.Table("PackageAccessStat").Where("PackageID = ?", pkgID).Delete(&types.PackageAccessStat{}).Error
|
||||
}
|
||||
|
||||
func (*PackageAccessStatDB) DeleteByUserSpaceID(ctx SQLContext, spaceID types.UserSpaceID) error {
|
||||
return ctx.Table("PackageAccessStat").Where("UserSpaceID = ?", spaceID).Delete(&types.PackageAccessStat{}).Error
|
||||
}
|
||||
|
|
|
@ -93,6 +93,11 @@ func (*PinnedObjectDB) DeleteByObjectID(ctx SQLContext, objectID types.ObjectID)
|
|||
return err
|
||||
}
|
||||
|
||||
func (*PinnedObjectDB) DeleteByUserSpaceID(ctx SQLContext, spaceID types.UserSpaceID) error {
|
||||
err := ctx.Exec("delete from PinnedObject where UserSpaceID = ?", spaceID).Error
|
||||
return err
|
||||
}
|
||||
|
||||
func (*PinnedObjectDB) BatchDeleteByObjectID(ctx SQLContext, objectIDs []types.ObjectID) error {
|
||||
if len(objectIDs) == 0 {
|
||||
return nil
|
||||
|
|
|
@ -2,6 +2,7 @@ package db
|
|||
|
||||
import (
|
||||
"gitlink.org.cn/cloudream/jcs-pub/client/types"
|
||||
"gorm.io/gorm/clause"
|
||||
)
|
||||
|
||||
type UserSpaceDB struct {
|
||||
|
@ -12,9 +13,9 @@ func (db *DB) UserSpace() *UserSpaceDB {
|
|||
return &UserSpaceDB{DB: db}
|
||||
}
|
||||
|
||||
func (db *UserSpaceDB) GetByID(ctx SQLContext, stgID types.UserSpaceID) (types.UserSpace, error) {
|
||||
func (db *UserSpaceDB) GetByID(ctx SQLContext, spaceID types.UserSpaceID) (types.UserSpace, error) {
|
||||
var stg types.UserSpace
|
||||
err := ctx.Table("UserSpace").First(&stg, stgID).Error
|
||||
err := ctx.Table("UserSpace").First(&stg, spaceID).Error
|
||||
return stg, err
|
||||
}
|
||||
|
||||
|
@ -24,9 +25,9 @@ func (UserSpaceDB) GetAllIDs(ctx SQLContext) ([]types.UserSpaceID, error) {
|
|||
return stgs, err
|
||||
}
|
||||
|
||||
func (db *UserSpaceDB) BatchGetByID(ctx SQLContext, stgIDs []types.UserSpaceID) ([]types.UserSpace, error) {
|
||||
func (db *UserSpaceDB) BatchGetByID(ctx SQLContext, spaceIDs []types.UserSpaceID) ([]types.UserSpace, error) {
|
||||
var stgs []types.UserSpace
|
||||
err := ctx.Table("UserSpace").Find(&stgs, "UserSpaceID IN (?)", stgIDs).Error
|
||||
err := ctx.Table("UserSpace").Find(&stgs, "UserSpaceID IN (?)", spaceIDs).Error
|
||||
return stgs, err
|
||||
}
|
||||
|
||||
|
@ -47,3 +48,18 @@ func (db *UserSpaceDB) GetByName(ctx SQLContext, name string) (types.UserSpace,
|
|||
err := ctx.Table("UserSpace").Where("Name = ?", name).First(&stg).Error
|
||||
return stg, err
|
||||
}
|
||||
|
||||
func (*UserSpaceDB) Create(ctx SQLContext, space *types.UserSpace) error {
|
||||
return ctx.Table("UserSpace").Create(space).Error
|
||||
}
|
||||
|
||||
func (*UserSpaceDB) UpdateColumns(ctx SQLContext, space types.UserSpace, columns ...string) error {
|
||||
return ctx.Clauses(clause.OnConflict{
|
||||
Columns: []clause.Column{{Name: "UserSpaceID"}},
|
||||
DoUpdates: clause.AssignmentColumns(columns),
|
||||
}).Create(space).Error
|
||||
}
|
||||
|
||||
func (*UserSpaceDB) DeleteByID(ctx SQLContext, spaceID types.UserSpaceID) error {
|
||||
return ctx.Table("UserSpace").Delete(types.UserSpace{}, "UserSpaceID = ?", spaceID).Error
|
||||
}
|
||||
|
|
|
@ -46,6 +46,10 @@ func (s *Server) InitRouters(rt gin.IRoutes) {
|
|||
rt.POST(cliapi.UserSpaceDownloadPackagePath, awsAuth.Auth, s.UserSpace().DownloadPackage)
|
||||
rt.POST(cliapi.UserSpaceCreatePackagePath, awsAuth.Auth, s.UserSpace().CreatePackage)
|
||||
rt.GET(cliapi.UserSpaceGetPath, awsAuth.Auth, s.UserSpace().Get)
|
||||
rt.POST(cliapi.UserSpaceCreatePath, awsAuth.Auth, s.UserSpace().Create)
|
||||
rt.POST(cliapi.UserSpaceUpdatePath, awsAuth.Auth, s.UserSpace().Update)
|
||||
rt.POST(cliapi.UserSpaceDeletePath, awsAuth.Auth, s.UserSpace().Delete)
|
||||
rt.POST(cliapi.UserSpaceTestPath, awsAuth.Auth, s.UserSpace().Test)
|
||||
rt.POST(cliapi.UserSpaceSpaceToSpacePath, awsAuth.Auth, s.UserSpace().SpaceToSpace)
|
||||
|
||||
rt.GET(cliapi.BucketGetByNamePath, awsAuth.Auth, s.Bucket().GetByName)
|
||||
|
|
|
@ -84,6 +84,88 @@ func (s *UserSpaceService) Get(ctx *gin.Context) {
|
|||
}))
|
||||
}
|
||||
|
||||
func (s *UserSpaceService) Create(ctx *gin.Context) {
|
||||
log := logger.WithField("HTTP", "UserSpace.Create")
|
||||
|
||||
req, err := ShouldBindJSONEx[cliapi.UserSpaceCreate](ctx)
|
||||
if err != nil {
|
||||
log.Warnf("binding body: %s", err.Error())
|
||||
ctx.JSON(http.StatusBadRequest, Failed(errorcode.BadArgument, "missing argument or invalid argument"))
|
||||
return
|
||||
}
|
||||
|
||||
resp, cerr := s.svc.UserSpaceSvc().Create(req)
|
||||
if cerr != nil {
|
||||
log.Warnf("creating userspace: %v", cerr)
|
||||
ctx.JSON(http.StatusOK, Failed(string(cerr.Code), cerr.Message))
|
||||
return
|
||||
}
|
||||
|
||||
ctx.JSON(http.StatusOK, OK(resp))
|
||||
}
|
||||
|
||||
func (s *UserSpaceService) Update(ctx *gin.Context) {
|
||||
log := logger.WithField("HTTP", "UserSpace.Update")
|
||||
|
||||
var req cliapi.UserSpaceUpdate
|
||||
req, err := ShouldBindJSONEx[cliapi.UserSpaceUpdate](ctx)
|
||||
if err != nil {
|
||||
log.Warnf("binding body: %s", err.Error())
|
||||
ctx.JSON(http.StatusBadRequest, Failed(errorcode.BadArgument, "missing argument or invalid argument"))
|
||||
return
|
||||
}
|
||||
|
||||
resp, cerr := s.svc.UserSpaceSvc().Update(req)
|
||||
if cerr != nil {
|
||||
log.Warnf("updating userspace: %v", cerr)
|
||||
ctx.JSON(http.StatusOK, Failed(string(cerr.Code), cerr.Message))
|
||||
return
|
||||
}
|
||||
|
||||
ctx.JSON(http.StatusOK, OK(resp))
|
||||
}
|
||||
|
||||
func (s *UserSpaceService) Delete(ctx *gin.Context) {
|
||||
log := logger.WithField("HTTP", "UserSpace.Delete")
|
||||
|
||||
var req cliapi.UserSpaceDelete
|
||||
if err := ctx.ShouldBindJSON(&req); err != nil {
|
||||
log.Warnf("binding body: %s", err.Error())
|
||||
ctx.JSON(http.StatusBadRequest, Failed(errorcode.BadArgument, "missing argument or invalid argument"))
|
||||
return
|
||||
}
|
||||
|
||||
resp, cerr := s.svc.UserSpaceSvc().Delete(req)
|
||||
if cerr != nil {
|
||||
log.Warnf("deleting userspace: %v", cerr)
|
||||
ctx.JSON(http.StatusOK, Failed(string(cerr.Code), cerr.Message))
|
||||
return
|
||||
}
|
||||
|
||||
ctx.JSON(http.StatusOK, OK(resp))
|
||||
}
|
||||
|
||||
func (s *UserSpaceService) Test(ctx *gin.Context) {
|
||||
log := logger.WithField("HTTP", "UserSpace.Test")
|
||||
|
||||
var req cliapi.UserSpaceTest
|
||||
req, err := ShouldBindJSONEx[cliapi.UserSpaceTest](ctx)
|
||||
if err != nil {
|
||||
log.Warnf("binding body: %s", err.Error())
|
||||
ctx.JSON(http.StatusBadRequest, Failed(errorcode.BadArgument, "missing argument or invalid argument"))
|
||||
return
|
||||
}
|
||||
|
||||
resp, cerr := s.svc.UserSpaceSvc().Test(req)
|
||||
if cerr != nil {
|
||||
log.Warnf("testing userspace: %v", cerr)
|
||||
ctx.JSON(http.StatusOK, Failed(string(cerr.Code), cerr.Message))
|
||||
return
|
||||
}
|
||||
|
||||
ctx.JSON(http.StatusOK, OK(resp))
|
||||
}
|
||||
|
||||
func (s *UserSpaceService) SpaceToSpace(ctx *gin.Context) {
|
||||
log := logger.WithField("HTTP", "UserSpace.SpaceToSpace")
|
||||
|
||||
|
|
|
@ -3,8 +3,10 @@ package http
|
|||
import (
|
||||
"fmt"
|
||||
|
||||
"github.com/gin-gonic/gin"
|
||||
"gitlink.org.cn/cloudream/common/consts/errorcode"
|
||||
"gitlink.org.cn/cloudream/common/pkgs/mq"
|
||||
"gitlink.org.cn/cloudream/common/utils/serder"
|
||||
)
|
||||
|
||||
type Response struct {
|
||||
|
@ -35,3 +37,7 @@ func FailedError(err error) Response {
|
|||
|
||||
return Failed(errorcode.OperationFailed, err.Error())
|
||||
}
|
||||
|
||||
func ShouldBindJSONEx[T any](c *gin.Context) (T, error) {
|
||||
return serder.JSONToObjectStreamEx[T](c.Request.Body)
|
||||
}
|
||||
|
|
|
@ -114,6 +114,15 @@ func (mc *SimpleMetaCache[K, V]) ClearOutdated() {
|
|||
}
|
||||
}
|
||||
|
||||
func (mc *SimpleMetaCache[K, V]) Drop(keys []K) {
|
||||
mc.lock.Lock()
|
||||
defer mc.lock.Unlock()
|
||||
|
||||
for _, key := range keys {
|
||||
delete(mc.cache, key)
|
||||
}
|
||||
}
|
||||
|
||||
type CacheEntry[K comparable, V any] struct {
|
||||
Key K
|
||||
Data V
|
||||
|
|
|
@ -52,6 +52,10 @@ func (s *UserSpaceMeta) ClearOutdated() {
|
|||
s.cache.ClearOutdated()
|
||||
}
|
||||
|
||||
func (s *UserSpaceMeta) Drop(keys []types.UserSpaceID) {
|
||||
s.cache.Drop(keys)
|
||||
}
|
||||
|
||||
func (s *UserSpaceMeta) load(keys []types.UserSpaceID) ([]types.UserSpaceDetail, []bool) {
|
||||
vs := make([]types.UserSpaceDetail, len(keys))
|
||||
oks := make([]bool, len(keys))
|
||||
|
@ -65,7 +69,7 @@ func (s *UserSpaceMeta) load(keys []types.UserSpaceID) ([]types.UserSpaceDetail,
|
|||
detailMap := make(map[types.UserSpaceID]*types.UserSpaceDetail)
|
||||
for i := range spaces {
|
||||
detailMap[spaces[i].UserSpaceID] = &types.UserSpaceDetail{
|
||||
UserID: stgglb.Local.UserID,
|
||||
UserID: stgglb.UserID,
|
||||
UserSpace: spaces[i],
|
||||
}
|
||||
}
|
|
@ -11,13 +11,17 @@ import (
|
|||
"gitlink.org.cn/cloudream/common/pkgs/trie"
|
||||
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
|
||||
clitypes "gitlink.org.cn/cloudream/jcs-pub/client/types"
|
||||
"gorm.io/gorm"
|
||||
|
||||
"gitlink.org.cn/cloudream/jcs-pub/client/internal/db"
|
||||
"gitlink.org.cn/cloudream/jcs-pub/client/internal/downloader/strategy"
|
||||
cliapi "gitlink.org.cn/cloudream/jcs-pub/client/sdk/api/v1"
|
||||
"gitlink.org.cn/cloudream/jcs-pub/common/ecode"
|
||||
stgglb "gitlink.org.cn/cloudream/jcs-pub/common/globals"
|
||||
"gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch2"
|
||||
"gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch2/parser"
|
||||
"gitlink.org.cn/cloudream/jcs-pub/common/pkgs/publock/reqbuilder"
|
||||
"gitlink.org.cn/cloudream/jcs-pub/common/pkgs/storage/factory"
|
||||
"gitlink.org.cn/cloudream/jcs-pub/common/pkgs/storage/types"
|
||||
)
|
||||
|
||||
|
@ -37,6 +41,153 @@ func (svc *UserSpaceService) GetByName(name string) (clitypes.UserSpace, error)
|
|||
return svc.DB.UserSpace().GetByName(svc.DB.DefCtx(), name)
|
||||
}
|
||||
|
||||
func (svc *UserSpaceService) Create(req cliapi.UserSpaceCreate) (*cliapi.UserSpaceCreateResp, *ecode.CodeError) {
|
||||
db2 := svc.DB
|
||||
space, err := db.DoTx01(db2, func(tx db.SQLContext) (clitypes.UserSpace, error) {
|
||||
space, err := db2.UserSpace().GetByName(tx, req.Name)
|
||||
if err == nil {
|
||||
return clitypes.UserSpace{}, gorm.ErrDuplicatedKey
|
||||
}
|
||||
if err != gorm.ErrRecordNotFound {
|
||||
return clitypes.UserSpace{}, err
|
||||
}
|
||||
|
||||
space = clitypes.UserSpace{
|
||||
Name: req.Name,
|
||||
Storage: req.Storage,
|
||||
Credential: req.Credential,
|
||||
ShardStore: req.ShardStore,
|
||||
Features: req.Features,
|
||||
WorkingDir: req.WorkingDir,
|
||||
Revision: 0,
|
||||
}
|
||||
err = db2.UserSpace().Create(tx, &space)
|
||||
if err != nil {
|
||||
return clitypes.UserSpace{}, err
|
||||
}
|
||||
return space, nil
|
||||
})
|
||||
if err == gorm.ErrDuplicatedKey {
|
||||
return nil, ecode.New(ecode.DataExists, "user space name already exists")
|
||||
}
|
||||
if err != nil {
|
||||
return nil, ecode.Newf(ecode.OperationFailed, "%v", err)
|
||||
}
|
||||
return &cliapi.UserSpaceCreateResp{UserSpace: space}, nil
|
||||
}
|
||||
|
||||
func (svc *UserSpaceService) Update(req cliapi.UserSpaceUpdate) (*cliapi.UserSpaceUpdateResp, *ecode.CodeError) {
|
||||
db2 := svc.DB
|
||||
space, err := db.DoTx01(db2, func(tx db.SQLContext) (clitypes.UserSpace, error) {
|
||||
space, err := db2.UserSpace().GetByID(tx, req.UserSpaceID)
|
||||
if err != nil {
|
||||
return clitypes.UserSpace{}, err
|
||||
}
|
||||
|
||||
if space.Name != req.Name {
|
||||
_, err = db2.UserSpace().GetByName(tx, req.Name)
|
||||
if err == nil {
|
||||
return clitypes.UserSpace{}, gorm.ErrDuplicatedKey
|
||||
}
|
||||
if err != gorm.ErrRecordNotFound {
|
||||
return clitypes.UserSpace{}, err
|
||||
}
|
||||
}
|
||||
|
||||
space.Name = req.Name
|
||||
space.Credential = req.Credential
|
||||
space.Features = req.Features
|
||||
space.Revision += 1
|
||||
return space, db2.UserSpace().UpdateColumns(tx, space, "Name", "Credential", "Features", "Revision")
|
||||
})
|
||||
if err == gorm.ErrDuplicatedKey {
|
||||
return nil, ecode.New(ecode.DataExists, "user space name already exists")
|
||||
}
|
||||
if err != nil {
|
||||
return nil, ecode.Newf(ecode.OperationFailed, "%v", err)
|
||||
}
|
||||
|
||||
// 通知元数据缓存无效
|
||||
svc.UserSpaceMeta.Drop([]clitypes.UserSpaceID{req.UserSpaceID})
|
||||
|
||||
// 通知存储服务组件池停止组件。TODO 对于在Hub上运行的组件,需要一个机制去定时清理
|
||||
svc.StgPool.Drop(stgglb.UserID, space.UserSpaceID)
|
||||
|
||||
// TODO 考虑加锁再进行操作
|
||||
|
||||
return &cliapi.UserSpaceUpdateResp{UserSpace: space}, nil
|
||||
}
|
||||
|
||||
func (svc *UserSpaceService) Delete(req cliapi.UserSpaceDelete) (*cliapi.UserSpaceDeleteResp, *ecode.CodeError) {
|
||||
db2 := svc.DB
|
||||
err := db2.DoTx(func(tx db.SQLContext) error {
|
||||
err := db2.UserSpace().DeleteByID(tx, req.UserSpaceID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
err = db2.ObjectBlock().DeleteByUserSpaceID(tx, req.UserSpaceID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
err = db2.PinnedObject().DeleteByUserSpaceID(tx, req.UserSpaceID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
err = db2.ObjectAccessStat().DeleteByUserSpaceID(tx, req.UserSpaceID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
err = db2.PackageAccessStat().DeleteByUserSpaceID(tx, req.UserSpaceID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return nil, ecode.Newf(ecode.OperationFailed, "%v", err)
|
||||
}
|
||||
|
||||
// 通知元数据缓存无效
|
||||
svc.UserSpaceMeta.Drop([]clitypes.UserSpaceID{req.UserSpaceID})
|
||||
|
||||
// 通知存储服务组件池停止组件。TODO 对于在Hub上运行的组件,需要一个机制去定时清理
|
||||
svc.StgPool.Drop(stgglb.UserID, req.UserSpaceID)
|
||||
|
||||
// TODO 考虑加锁再进行操作,并且增加机制打断已经在进行的操作。
|
||||
|
||||
return &cliapi.UserSpaceDeleteResp{}, nil
|
||||
}
|
||||
|
||||
func (svc *UserSpaceService) Test(req cliapi.UserSpaceTest) (*cliapi.UserSpaceTestResp, *ecode.CodeError) {
|
||||
detail := clitypes.UserSpaceDetail{
|
||||
UserID: stgglb.UserID,
|
||||
UserSpace: clitypes.UserSpace{
|
||||
Name: "test",
|
||||
Storage: req.Storage,
|
||||
Credential: req.Credential,
|
||||
WorkingDir: req.WorikingDir,
|
||||
},
|
||||
}
|
||||
blder := factory.GetBuilder(&detail)
|
||||
baseStore, err := blder.CreateBaseStore(false)
|
||||
if err != nil {
|
||||
return nil, ecode.Newf(ecode.OperationFailed, "%v", err)
|
||||
}
|
||||
|
||||
// TODO 可以考虑增加一个专门用于检查配置的接口F
|
||||
_, err = baseStore.ListAll("")
|
||||
if err != nil {
|
||||
return nil, ecode.Newf(ecode.OperationFailed, "%v", err)
|
||||
}
|
||||
|
||||
return &cliapi.UserSpaceTestResp{}, nil
|
||||
}
|
||||
|
||||
func (svc *UserSpaceService) DownloadPackage(packageID clitypes.PackageID, userspaceID clitypes.UserSpaceID, rootPath string) error {
|
||||
coorCli := stgglb.CoordinatorRPCPool.Get()
|
||||
defer coorCli.Release()
|
||||
|
|
|
@ -5,6 +5,7 @@ import (
|
|||
|
||||
"gitlink.org.cn/cloudream/common/sdks"
|
||||
clitypes "gitlink.org.cn/cloudream/jcs-pub/client/types"
|
||||
cortypes "gitlink.org.cn/cloudream/jcs-pub/coordinator/types"
|
||||
)
|
||||
|
||||
const UserSpaceDownloadPackagePath = "/userspace/downloadPackage"
|
||||
|
@ -77,6 +78,107 @@ func (c *Client) UserSpaceGet(req UserSpaceGet) (*UserSpaceGetResp, error) {
|
|||
return JSONAPI(c.cfg, http.DefaultClient, &req, &UserSpaceGetResp{})
|
||||
}
|
||||
|
||||
// 创建用户空间
|
||||
const UserSpaceCreatePath = "/userspace/create"
|
||||
|
||||
type UserSpaceCreate struct {
|
||||
Name string `json:"name" binding:"required"`
|
||||
Storage cortypes.StorageType `json:"storage" binding:"required"`
|
||||
Credential cortypes.StorageCredential `json:"credential" binding:"required"`
|
||||
ShardStore *cortypes.ShardStoreUserConfig `json:"shardStore"`
|
||||
Features []cortypes.StorageFeature `json:"features"`
|
||||
WorkingDir string `json:"workingDir"`
|
||||
}
|
||||
|
||||
func (r *UserSpaceCreate) MakeParam() *sdks.RequestParam {
|
||||
return sdks.MakeJSONParam(http.MethodPost, UserSpaceCreatePath, r)
|
||||
}
|
||||
|
||||
type UserSpaceCreateResp struct {
|
||||
clitypes.UserSpace
|
||||
}
|
||||
|
||||
func (r *UserSpaceCreateResp) ParseResponse(resp *http.Response) error {
|
||||
return sdks.ParseCodeDataJSONResponse(resp, r)
|
||||
}
|
||||
|
||||
func (c *Client) UserSpaceCreate(req UserSpaceCreate) (*UserSpaceCreateResp, error) {
|
||||
return JSONAPI(c.cfg, http.DefaultClient, &req, &UserSpaceCreateResp{})
|
||||
}
|
||||
|
||||
// 更新用户空间。一些重要的配置不可再二次修改
|
||||
const UserSpaceUpdatePath = "/userspace/update"
|
||||
|
||||
type UserSpaceUpdate struct {
|
||||
UserSpaceID clitypes.UserSpaceID `json:"userSpaceID" binding:"required"`
|
||||
Name string `json:"name" binding:"required"`
|
||||
Credential cortypes.StorageCredential `json:"credential" binding:"required"`
|
||||
Features []cortypes.StorageFeature `json:"features"`
|
||||
}
|
||||
|
||||
func (r *UserSpaceUpdate) MakeParam() *sdks.RequestParam {
|
||||
return sdks.MakeJSONParam(http.MethodPost, UserSpaceUpdatePath, r)
|
||||
}
|
||||
|
||||
type UserSpaceUpdateResp struct {
|
||||
clitypes.UserSpace
|
||||
}
|
||||
|
||||
func (r *UserSpaceUpdateResp) ParseResponse(resp *http.Response) error {
|
||||
return sdks.ParseCodeDataJSONResponse(resp, r)
|
||||
}
|
||||
|
||||
func (c *Client) UserSpaceUpdate(req UserSpaceUpdate) (*UserSpaceUpdateResp, error) {
|
||||
return JSONAPI(c.cfg, http.DefaultClient, &req, &UserSpaceUpdateResp{})
|
||||
}
|
||||
|
||||
// 删除用户空间
|
||||
const UserSpaceDeletePath = "/userspace/delete"
|
||||
|
||||
type UserSpaceDelete struct {
|
||||
UserSpaceID clitypes.UserSpaceID `json:"userSpaceID" binding:"required"`
|
||||
}
|
||||
|
||||
func (r *UserSpaceDelete) MakeParam() *sdks.RequestParam {
|
||||
return sdks.MakeJSONParam(http.MethodPost, UserSpaceDeletePath, r)
|
||||
}
|
||||
|
||||
type UserSpaceDeleteResp struct{}
|
||||
|
||||
func (r *UserSpaceDeleteResp) ParseResponse(resp *http.Response) error {
|
||||
return sdks.ParseCodeDataJSONResponse(resp, r)
|
||||
}
|
||||
|
||||
func (c *Client) UserSpaceDelete(req UserSpaceDelete) (*UserSpaceDeleteResp, error) {
|
||||
return JSONAPI(c.cfg, http.DefaultClient, &req, &UserSpaceDeleteResp{})
|
||||
}
|
||||
|
||||
// 测试给定用户空间的配置是否有效
|
||||
const UserSpaceTestPath = "/userspace/test"
|
||||
|
||||
type UserSpaceTest struct {
|
||||
Storage cortypes.StorageType `json:"storage" binding:"required"`
|
||||
Credential cortypes.StorageCredential `json:"credential" binding:"required"`
|
||||
WorikingDir string `json:"workingDir"`
|
||||
}
|
||||
|
||||
func (r *UserSpaceTest) MakeParam() *sdks.RequestParam {
|
||||
return sdks.MakeJSONParam(http.MethodPost, UserSpaceTestPath, r)
|
||||
}
|
||||
|
||||
type UserSpaceTestResp struct {
|
||||
// TODO 可以考虑返回WorkingDir内的文件列表
|
||||
}
|
||||
|
||||
func (r *UserSpaceTestResp) ParseResponse(resp *http.Response) error {
|
||||
return sdks.ParseCodeDataJSONResponse(resp, r)
|
||||
}
|
||||
|
||||
func (c *Client) UserSpaceTest(req UserSpaceTest) (*UserSpaceTestResp, error) {
|
||||
return JSONAPI(c.cfg, http.DefaultClient, &req, &UserSpaceTestResp{})
|
||||
}
|
||||
|
||||
// 存储服务间直传
|
||||
const UserSpaceSpaceToSpacePath = "/userspace/spaceToSpace"
|
||||
|
||||
type UserSpaceSpaceToSpace struct {
|
||||
|
|
|
@ -6,7 +6,7 @@ import (
|
|||
|
||||
"github.com/samber/lo"
|
||||
"gitlink.org.cn/cloudream/common/utils/sort2"
|
||||
cotypes "gitlink.org.cn/cloudream/jcs-pub/coordinator/types"
|
||||
cortypes "gitlink.org.cn/cloudream/jcs-pub/coordinator/types"
|
||||
)
|
||||
|
||||
const (
|
||||
|
@ -75,13 +75,13 @@ type UserSpace struct {
|
|||
// 用户空间名称
|
||||
Name string `gorm:"column:Name; type:varchar(255); not null" json:"name"`
|
||||
// 用户空间所在的存储服务配置
|
||||
Storage cotypes.StorageType `gorm:"column:Storage; type:json; not null; serializer:union" json:"storage"`
|
||||
Storage cortypes.StorageType `gorm:"column:Storage; type:json; not null; serializer:union" json:"storage"`
|
||||
// 用户在指定存储节点的凭证信息,比如用户账户,AK/SK等
|
||||
Credential cotypes.StorageCredential `gorm:"column:Credential; type:json; not null; serializer:union" json:"credential"`
|
||||
Credential cortypes.StorageCredential `gorm:"column:Credential; type:json; not null; serializer:union" json:"credential"`
|
||||
// 用户空间的分片存储配置,如果为空,则表示不使用分片存储
|
||||
ShardStore *cotypes.ShardStoreUserConfig `gorm:"column:ShardStore; type:json; serializer:json" json:"shardStore"`
|
||||
ShardStore *cortypes.ShardStoreUserConfig `gorm:"column:ShardStore; type:json; serializer:json" json:"shardStore"`
|
||||
// 存储服务特性功能的配置
|
||||
Features []cotypes.StorageFeature `json:"features" gorm:"column:Features; type:json; serializer:union"`
|
||||
Features []cortypes.StorageFeature `json:"features" gorm:"column:Features; type:json; serializer:union"`
|
||||
// 各种组件保存数据的根目录。组件工作过程中都会以这个目录为根(除了BaseStore)。
|
||||
WorkingDir string `gorm:"column:WorkingDir; type:varchar(1024); not null" json:"workingDir"`
|
||||
// 工作目录在存储系统中的真实路径。当工作路径在挂载点内时,这个字段记录的是挂载背后的真实路径。部分直接与存储系统交互的组件需要知道真实路径。
|
||||
|
@ -220,9 +220,9 @@ func (o *ObjectDetail) GroupBlocks() []GrouppedObjectBlock {
|
|||
}
|
||||
|
||||
type UserSpaceDetail struct {
|
||||
UserID cotypes.UserID
|
||||
UserID cortypes.UserID
|
||||
UserSpace UserSpace
|
||||
RecommendHub *cotypes.Hub
|
||||
RecommendHub *cortypes.Hub
|
||||
}
|
||||
|
||||
func (d UserSpaceDetail) String() string {
|
||||
|
|
|
@ -0,0 +1,13 @@
|
|||
package ecode
|
||||
|
||||
type ErrorCode string
|
||||
|
||||
const (
|
||||
OK ErrorCode = "OK"
|
||||
OperationFailed ErrorCode = "OperationFailed"
|
||||
DataNotFound ErrorCode = "DataNotFound"
|
||||
DataExists ErrorCode = "DataExists"
|
||||
BadArgument ErrorCode = "BadArgument"
|
||||
TaskNotFound ErrorCode = "TaskNotFound"
|
||||
Unauthorized ErrorCode = "Unauthorized"
|
||||
)
|
|
@ -0,0 +1,26 @@
|
|||
package ecode
|
||||
|
||||
import "fmt"
|
||||
|
||||
type CodeError struct {
|
||||
Code ErrorCode `json:"code"`
|
||||
Message string `json:"message"`
|
||||
}
|
||||
|
||||
func (e *CodeError) Error() string {
|
||||
return fmt.Sprintf("code: %s, message: %s", e.Code, e.Message)
|
||||
}
|
||||
|
||||
func New(code ErrorCode, message string) *CodeError {
|
||||
return &CodeError{
|
||||
Code: code,
|
||||
Message: message,
|
||||
}
|
||||
}
|
||||
|
||||
func Newf(code ErrorCode, format string, args ...interface{}) *CodeError {
|
||||
return &CodeError{
|
||||
Code: code,
|
||||
Message: fmt.Sprintf(format, args...),
|
||||
}
|
||||
}
|
|
@ -5,10 +5,7 @@ import (
|
|||
)
|
||||
|
||||
type LocalMachineInfo struct {
|
||||
UserID types.UserID `json:"userID"`
|
||||
ExternalIP string `json:"externalIP"`
|
||||
LocalIP string `json:"localIP"`
|
||||
Location types.Location `json:"location"`
|
||||
Location types.Location `json:"location"`
|
||||
}
|
||||
|
||||
var Local *LocalMachineInfo
|
||||
|
@ -16,6 +13,9 @@ var Local *LocalMachineInfo
|
|||
// 是否是独立运行模式。只针对Client
|
||||
var StandaloneMode bool
|
||||
|
||||
// 当前Client服务登录的用户的ID,如果是Standalone模式,则为0。TODO 临时解决办法
|
||||
var UserID types.UserID
|
||||
|
||||
// InitLocal
|
||||
//
|
||||
// @Description: 初始化本地机器信息
|
||||
|
|
|
@ -38,7 +38,7 @@ type AccessTokenVerifier interface {
|
|||
}
|
||||
|
||||
type AccessTokenProvider interface {
|
||||
GetAuthInfo() (AccessTokenAuthInfo, error)
|
||||
MakeAuthInfo() (AccessTokenAuthInfo, error)
|
||||
}
|
||||
|
||||
func (s *ServerBase) tlsConfigSelector(hello *tls.ClientHelloInfo) (*tls.Config, error) {
|
||||
|
|
|
@ -142,7 +142,7 @@ func (p *ConnPool) Release(addr string) {
|
|||
}
|
||||
|
||||
func (p *ConnPool) populateAccessTokenUnary(ctx context.Context, method string, req, reply any, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
|
||||
authInfo, err := p.cfg.AccessTokenProvider.GetAuthInfo()
|
||||
authInfo, err := p.cfg.AccessTokenProvider.MakeAuthInfo()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -159,7 +159,7 @@ func (p *ConnPool) populateAccessTokenUnary(ctx context.Context, method string,
|
|||
}
|
||||
|
||||
func (p *ConnPool) populateAccessTokenStream(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
|
||||
authInfo, err := p.cfg.AccessTokenProvider.GetAuthInfo()
|
||||
authInfo, err := p.cfg.AccessTokenProvider.MakeAuthInfo()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
|
@ -40,6 +40,22 @@ func NewPool() *Pool {
|
|||
}
|
||||
}
|
||||
|
||||
func (p *Pool) Drop(userID cortypes.UserID, spaceID clitypes.UserSpaceID) {
|
||||
p.lock.Lock()
|
||||
defer p.lock.Unlock()
|
||||
|
||||
key := userSpaceKey{
|
||||
UserID: userID,
|
||||
UserSpaceID: spaceID,
|
||||
}
|
||||
space := p.spaces[key]
|
||||
if space != nil {
|
||||
space.Drop()
|
||||
}
|
||||
|
||||
delete(p.spaces, key)
|
||||
}
|
||||
|
||||
func (p *Pool) GetShardStore(spaceDetail *clitypes.UserSpaceDetail) (types.ShardStore, error) {
|
||||
p.lock.Lock()
|
||||
defer p.lock.Unlock()
|
||||
|
|
Loading…
Reference in New Issue