修复回源功能

This commit is contained in:
Sydonian 2025-04-27 15:07:04 +08:00
parent 1a1d52ef9f
commit ffcb756922
24 changed files with 498 additions and 244 deletions

View File

@ -54,6 +54,12 @@ func DoTx11[T any, R any](db *DB, do func(tx SQLContext, t T) (R, error), t T) (
return ret, err
}
func DoTx20[T1 any, T2 any](db *DB, do func(tx SQLContext, t1 T1, t2 T2) error, t1 T1, t2 T2) error {
return db.db.Transaction(func(tx *gorm.DB) error {
return do(SQLContext{tx}, t1, t2)
})
}
func DoTx21[T1 any, T2 any, R any](db *DB, do func(tx SQLContext, t1 T1, t2 T2) (R, error), t1 T1, t2 T2) (R, error) {
var ret R
err := db.db.Transaction(func(tx *gorm.DB) error {

View File

@ -162,7 +162,7 @@ func (*PackageDB) GetByFullName(ctx SQLContext, bucketName string, packageName s
return ret, err
}
func (db *PackageDB) Create(ctx SQLContext, bucketID types.BucketID, name string) (types.Package, error) {
func (db *PackageDB) Create(ctx SQLContext, bucketID types.BucketID, name string, createTime time.Time) (types.Package, error) {
var packageID int64
err := ctx.Table("Package").
Select("PackageID").
@ -176,7 +176,7 @@ func (db *PackageDB) Create(ctx SQLContext, bucketID types.BucketID, name string
return types.Package{}, gorm.ErrDuplicatedKey
}
newPackage := types.Package{Name: name, BucketID: bucketID, CreateTime: time.Now()}
newPackage := types.Package{Name: name, BucketID: bucketID, CreateTime: createTime}
if err := ctx.Create(&newPackage).Error; err != nil {
return types.Package{}, fmt.Errorf("insert package failed, err: %w", err)
}
@ -301,7 +301,7 @@ func (db *PackageDB) TryCreateAll(ctx SQLContext, bktName string, pkgName string
return types.Package{}, fmt.Errorf("get package by name: %w", err)
}
pkg, err = db.Create(ctx, bkt.BucketID, pkgName)
pkg, err = db.Create(ctx, bkt.BucketID, pkgName, time.Now())
if err != nil {
return types.Package{}, fmt.Errorf("create package: %w", err)
}

View File

@ -50,8 +50,7 @@ func (s *UserSpaceService) CreatePackage(ctx *gin.Context) {
return
}
pkg, err := s.svc.UserSpaceSvc().UserSpaceCreatePackage(
req.BucketID, req.Name, req.UserSpaceID, req.Path, req.SpaceAffinity)
pkg, err := s.svc.Uploader.UserSpaceUpload(req.UserSpaceID, req.Path, req.BucketID, req.Name, req.SpaceAffinity)
if err != nil {
log.Warnf("userspace create package: %s", err.Error())
ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, fmt.Sprintf("userspace create package: %v", err)))
@ -59,7 +58,7 @@ func (s *UserSpaceService) CreatePackage(ctx *gin.Context) {
}
ctx.JSON(http.StatusOK, OK(cliapi.UserSpaceCreatePackageResp{
Package: pkg,
Package: *pkg,
}))
}

View File

@ -166,7 +166,7 @@ func (r *FuseBucket) NewDir(ctx context.Context, name string) (fuse.FsDir, error
return fmt.Errorf("get bucket: %v", err)
}
_, err = db.Package().Create(tx, bkt.BucketID, name)
_, err = db.Package().Create(tx, bkt.BucketID, name, time.Now())
if err != nil {
return fmt.Errorf("create package: %v", err)
}

View File

@ -2,6 +2,7 @@ package services
import (
"fmt"
"time"
"gitlink.org.cn/cloudream/common/pkgs/logger"
@ -34,7 +35,7 @@ func (svc *PackageService) GetBucketPackages(bucketID types.BucketID) ([]types.P
}
func (svc *PackageService) Create(bucketID types.BucketID, name string) (types.Package, error) {
pkg, err := svc.DB.Package().Create(svc.DB.DefCtx(), bucketID, name)
pkg, err := svc.DB.Package().Create(svc.DB.DefCtx(), bucketID, name, time.Now())
if err != nil {
return types.Package{}, err
}
@ -72,7 +73,7 @@ func (svc *PackageService) Clone(packageID types.PackageID, bucketID types.Bucke
err := svc.DB.DoTx(func(tx db.SQLContext) error {
var err error
pkg, err = svc.DB.Package().Create(tx, bucketID, name)
pkg, err = svc.DB.Package().Create(tx, bucketID, name, time.Now())
if err != nil {
return fmt.Errorf("creating package: %w", err)
}

View File

@ -115,42 +115,3 @@ func (svc *UserSpaceService) LoadPackage(packageID clitypes.PackageID, userspace
return nil
}
// 请求节点启动从UserSpace中上传文件的任务。会返回节点ID和任务ID
func (svc *UserSpaceService) UserSpaceCreatePackage(bucketID clitypes.BucketID, name string, userspaceID clitypes.UserSpaceID, path string, userspaceAffinity clitypes.UserSpaceID) (clitypes.Package, error) {
// coorCli, err := stgglb.CoordinatorMQPool.Acquire()
// if err != nil {
// return cdssdk.Package{}, fmt.Errorf("new coordinator client: %w", err)
// }
// defer stgglb.CoordinatorMQPool.Release(coorCli)
// stgResp, err := coorCli.GetUserSpaceDetails(coormq.ReqGetUserSpaceDetails([]cdssdk.UserSpaceID{userspaceID}))
// if err != nil {
// return cdssdk.Package{}, fmt.Errorf("getting userspace info: %w", err)
// }
// spaceDetail := svc.UserSpaceMeta.Get(userspaceID)
// if spaceDetail == nil {
// return cdssdk.Package{}, fmt.Errorf("userspace not found: %d", userspaceID)
// }
// if spaceDetail.UserSpace.ShardStore == nil {
// return cdssdk.Package{}, fmt.Errorf("shard userspace is not enabled")
// }
// hubCli, err := stgglb.HubMQPool.Acquire(spaceDetail.MasterHub.HubID)
// if err != nil {
// return cdssdk.Package{}, fmt.Errorf("new hub client: %w", err)
// }
// defer stgglb.HubMQPool.Release(hubCli)
// createResp, err := hubCli.UserSpaceCreatePackage(hubmq.ReqUserSpaceCreatePackage(bucketID, name, userspaceID, path, userspaceAffinity))
// if err != nil {
// return cdssdk.Package{}, err
// }
// return createResp.Package, nil
// TODO 待实现
return clitypes.Package{}, fmt.Errorf("not implemented")
}

View File

@ -152,7 +152,7 @@ func (u *Uploader) BeginCreateLoad(bktID clitypes.BucketID, pkgName string, load
return clitypes.Package{}, err
}
return u.db.Package().Create(u.db.DefCtx(), bktID, pkgName)
return u.db.Package().Create(u.db.DefCtx(), bktID, pkgName, time.Now())
})
if err != nil {
return nil, fmt.Errorf("create package: %w", err)

View File

@ -0,0 +1,184 @@
package uploader
import (
"context"
"fmt"
"math"
"strings"
"time"
"github.com/samber/lo"
"gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec"
"gitlink.org.cn/cloudream/jcs-pub/client/internal/db"
clitypes "gitlink.org.cn/cloudream/jcs-pub/client/types"
stgglb "gitlink.org.cn/cloudream/jcs-pub/common/globals"
"gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch2"
"gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch2/ops2"
"gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch2/parser"
"gitlink.org.cn/cloudream/jcs-pub/common/pkgs/mq/coordinator"
hubmq "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/mq/hub"
"gitlink.org.cn/cloudream/jcs-pub/common/pkgs/storage/types"
cortypes "gitlink.org.cn/cloudream/jcs-pub/coordinator/types"
)
func (u *Uploader) UserSpaceUpload(userSpaceID clitypes.UserSpaceID, rootPath string, targetBktID clitypes.BucketID, newPkgName string, uploadAffinity clitypes.UserSpaceID) (*clitypes.Package, error) {
srcSpace := u.spaceMeta.Get(userSpaceID)
if srcSpace == nil {
return nil, fmt.Errorf("user space %d not found", userSpaceID)
}
if srcSpace.MasterHub == nil {
return nil, fmt.Errorf("master hub not found for user space %d", userSpaceID)
}
pkg, err := db.DoTx01(u.db, func(tx db.SQLContext) (clitypes.Package, error) {
_, err := u.db.Bucket().GetByID(tx, targetBktID)
if err != nil {
return clitypes.Package{}, err
}
return u.db.Package().Create(tx, targetBktID, newPkgName, time.Now())
})
if err != nil {
return nil, fmt.Errorf("creating package: %w", err)
}
delPkg := func() {
u.db.Package().Delete(u.db.DefCtx(), pkg.PackageID)
}
spaceIDs, err := u.db.UserSpace().GetAllIDs(u.db.DefCtx())
if err != nil {
delPkg()
return nil, fmt.Errorf("getting user space ids: %w", err)
}
spaceDetails := u.spaceMeta.GetMany(spaceIDs)
spaceDetails = lo.Filter(spaceDetails, func(e *clitypes.UserSpaceDetail, i int) bool {
return e != nil && e.MasterHub != nil && e.UserSpace.ShardStore != nil
})
coorCli, err := stgglb.CoordinatorMQPool.Acquire()
if err != nil {
delPkg()
return nil, fmt.Errorf("acquiring coordinator mq client: %w", err)
}
defer stgglb.CoordinatorMQPool.Release(coorCli)
resp, err := coorCli.GetHubConnectivities(coordinator.ReqGetHubConnectivities([]cortypes.HubID{srcSpace.MasterHub.HubID}))
if err != nil {
delPkg()
return nil, fmt.Errorf("getting hub connectivities: %w", err)
}
cons := make(map[cortypes.HubID]cortypes.HubConnectivity)
for _, c := range resp.Connectivities {
cons[c.ToHubID] = c
}
var uploadSpaces []UploadSpaceInfo
for _, space := range spaceDetails {
if space.MasterHub == nil {
continue
}
latency := time.Duration(math.MaxInt64)
con, ok := cons[space.MasterHub.HubID]
if ok && con.Latency != nil {
latency = time.Duration(*con.Latency * float32(time.Millisecond))
}
uploadSpaces = append(uploadSpaces, UploadSpaceInfo{
Space: *space,
Delay: latency,
IsSameLocation: space.MasterHub.LocationID == srcSpace.MasterHub.LocationID,
})
}
if len(uploadSpaces) == 0 {
delPkg()
return nil, fmt.Errorf("user no available userspaces")
}
targetSapce := u.chooseUploadStorage(uploadSpaces, uploadAffinity)
srcHubCli, err := stgglb.HubMQPool.Acquire(srcSpace.MasterHub.HubID)
if err != nil {
delPkg()
return nil, fmt.Errorf("acquiring source hub mq client: %w", err)
}
defer stgglb.HubMQPool.Release(srcHubCli)
listAllResp, err := srcHubCli.PublicStoreListAll(&hubmq.PublicStoreListAll{
UserSpace: *srcSpace,
Path: rootPath,
})
if err != nil {
delPkg()
return nil, fmt.Errorf("listing public store: %w", err)
}
adds, err := u.uploadFromPublicStore(srcSpace, &targetSapce.Space, listAllResp.Entries, rootPath)
if err != nil {
delPkg()
return nil, fmt.Errorf("uploading from public store: %w", err)
}
_, err = db.DoTx21(u.db, u.db.Object().BatchAdd, pkg.PackageID, adds)
if err != nil {
delPkg()
return nil, fmt.Errorf("adding objects: %w", err)
}
return &pkg, nil
}
func (u *Uploader) uploadFromPublicStore(srcSpace *clitypes.UserSpaceDetail, targetSpace *clitypes.UserSpaceDetail, entries []types.PublicStoreEntry, rootPath string) ([]db.AddObjectEntry, error) {
ft := ioswitch2.FromTo{}
for _, e := range entries {
// 可以考虑增加一个配置项来控制是否上传空目录
if e.IsDir {
continue
}
ft.AddFrom(ioswitch2.NewFromPublicStore(*srcSpace.MasterHub, *srcSpace, e.Path))
ft.AddTo(ioswitch2.NewToShardStore(*targetSpace.MasterHub, *targetSpace, ioswitch2.RawStream(), e.Path))
}
plans := exec.NewPlanBuilder()
err := parser.Parse(ft, plans)
if err != nil {
return nil, fmt.Errorf("parsing plan: %w", err)
}
exeCtx := exec.NewExecContext()
exec.SetValueByType(exeCtx, u.stgPool)
ret, err := plans.Execute(exeCtx).Wait(context.Background())
if err != nil {
return nil, fmt.Errorf("executing plan: %w", err)
}
cleanRoot := strings.TrimSuffix(rootPath, clitypes.ObjectPathSeparator)
adds := make([]db.AddObjectEntry, 0, len(ret))
for _, e := range entries {
if e.IsDir {
continue
}
pat := strings.TrimPrefix(e.Path, cleanRoot+clitypes.ObjectPathSeparator)
if pat == cleanRoot {
pat = clitypes.BaseName(e.Path)
}
info := ret[e.Path].(*ops2.ShardInfoValue)
adds = append(adds, db.AddObjectEntry{
Path: pat,
Size: info.Size,
FileHash: info.Hash,
CreateTime: time.Now(),
UserSpaceIDs: []clitypes.UserSpaceID{targetSpace.UserSpace.UserSpaceID},
})
}
return adds, nil
}

View File

@ -3,7 +3,7 @@ package ioswitch2
import (
"gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec"
"gitlink.org.cn/cloudream/common/utils/math2"
"gitlink.org.cn/cloudream/jcs-pub/client/types"
clitypes "gitlink.org.cn/cloudream/jcs-pub/client/types"
cortypes "gitlink.org.cn/cloudream/jcs-pub/coordinator/types"
)
@ -69,9 +69,9 @@ type FromTos []FromTo
type FromTo struct {
// 如果输入或者输出用到了EC编码的流则需要提供EC参数。
ECParam *types.ECRedundancy
ECParam *clitypes.ECRedundancy
// 同上
SegmentParam *types.SegmentRedundancy
SegmentParam *clitypes.SegmentRedundancy
Froms []From
Toes []To
}
@ -110,13 +110,13 @@ func (f *FromDriver) GetStreamIndex() StreamIndex {
}
type FromShardstore struct {
FileHash types.FileHash
FileHash clitypes.FileHash
Hub cortypes.Hub
Space types.UserSpaceDetail
Space clitypes.UserSpaceDetail
StreamIndex StreamIndex
}
func NewFromShardstore(fileHash types.FileHash, hub cortypes.Hub, space types.UserSpaceDetail, strIdx StreamIndex) *FromShardstore {
func NewFromShardstore(fileHash clitypes.FileHash, hub cortypes.Hub, space clitypes.UserSpaceDetail, strIdx StreamIndex) *FromShardstore {
return &FromShardstore{
FileHash: fileHash,
Hub: hub,
@ -129,6 +129,26 @@ func (f *FromShardstore) GetStreamIndex() StreamIndex {
return f.StreamIndex
}
type FromPublicStore struct {
Hub cortypes.Hub
Space clitypes.UserSpaceDetail
Path string
}
func NewFromPublicStore(hub cortypes.Hub, space clitypes.UserSpaceDetail, path string) *FromPublicStore {
return &FromPublicStore{
Hub: hub,
Space: space,
Path: path,
}
}
func (f *FromPublicStore) GetStreamIndex() StreamIndex {
return StreamIndex{
Type: StreamIndexRaw,
}
}
type ToDriver struct {
Handle *exec.DriverReadStream
StreamIndex StreamIndex
@ -162,13 +182,13 @@ func (t *ToDriver) GetRange() math2.Range {
type ToShardStore struct {
Hub cortypes.Hub
Space types.UserSpaceDetail
Space clitypes.UserSpaceDetail
StreamIndex StreamIndex
Range math2.Range
FileHashStoreKey string
}
func NewToShardStore(hub cortypes.Hub, space types.UserSpaceDetail, strIdx StreamIndex, fileHashStoreKey string) *ToShardStore {
func NewToShardStore(hub cortypes.Hub, space clitypes.UserSpaceDetail, strIdx StreamIndex, fileHashStoreKey string) *ToShardStore {
return &ToShardStore{
Hub: hub,
Space: space,
@ -177,7 +197,7 @@ func NewToShardStore(hub cortypes.Hub, space types.UserSpaceDetail, strIdx Strea
}
}
func NewToShardStoreWithRange(hub cortypes.Hub, space types.UserSpaceDetail, streamIndex StreamIndex, fileHashStoreKey string, rng math2.Range) *ToShardStore {
func NewToShardStoreWithRange(hub cortypes.Hub, space clitypes.UserSpaceDetail, streamIndex StreamIndex, fileHashStoreKey string, rng math2.Range) *ToShardStore {
return &ToShardStore{
Hub: hub,
Space: space,
@ -197,11 +217,11 @@ func (t *ToShardStore) GetRange() math2.Range {
type LoadToPublic struct {
Hub cortypes.Hub
Space types.UserSpaceDetail
Space clitypes.UserSpaceDetail
ObjectPath string
}
func NewLoadToPublic(hub cortypes.Hub, space types.UserSpaceDetail, objectPath string) *LoadToPublic {
func NewLoadToPublic(hub cortypes.Hub, space clitypes.UserSpaceDetail, objectPath string) *LoadToPublic {
return &LoadToPublic{
Hub: hub,
Space: space,

View File

@ -2,30 +2,78 @@ package ops2
import (
"fmt"
"io"
"gitlink.org.cn/cloudream/common/pkgs/future"
"gitlink.org.cn/cloudream/common/pkgs/ioswitch/dag"
"gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec"
"gitlink.org.cn/cloudream/common/pkgs/logger"
"gitlink.org.cn/cloudream/common/utils/io2"
clitypes "gitlink.org.cn/cloudream/jcs-pub/client/types"
"gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch2"
"gitlink.org.cn/cloudream/jcs-pub/common/pkgs/storage/pool"
)
func init() {
exec.UseOp[*PublicLoad]()
exec.UseOp[*PublicWrite]()
exec.UseOp[*PublicRead]()
}
type PublicLoad struct {
type PublicRead struct {
Output exec.VarID
UserSpace clitypes.UserSpaceDetail
ObjectPath string
}
func (o *PublicRead) Execute(ctx *exec.ExecContext, e *exec.Executor) error {
logger.
WithField("Output", o.Output).
WithField("UserSpace", o.UserSpace).
WithField("ObjectPath", o.ObjectPath).
Debug("public read")
defer logger.Debug("public read end")
stgPool, err := exec.GetValueByType[*pool.Pool](ctx)
if err != nil {
return fmt.Errorf("getting storage pool: %w", err)
}
store, err := stgPool.GetPublicStore(&o.UserSpace)
if err != nil {
return fmt.Errorf("getting public store of storage %v: %w", o.UserSpace, err)
}
stream, err := store.Read(o.ObjectPath)
if err != nil {
return fmt.Errorf("reading object %v: %w", o.ObjectPath, err)
}
fut := future.NewSetVoid()
output := &exec.StreamValue{
Stream: io2.AfterReadClosed(stream, func(closer io.ReadCloser) {
fut.SetVoid()
}),
}
e.PutVar(o.Output, output)
return fut.Wait(ctx.Context)
}
func (o *PublicRead) String() string {
return fmt.Sprintf("PublicRead %v:%v -> %v", o.UserSpace, o.ObjectPath, o.Output)
}
type PublicWrite struct {
Input exec.VarID
UserSpace clitypes.UserSpaceDetail
ObjectPath string
}
func (o *PublicLoad) Execute(ctx *exec.ExecContext, e *exec.Executor) error {
func (o *PublicWrite) Execute(ctx *exec.ExecContext, e *exec.Executor) error {
logger.
WithField("Input", o.Input).
Debugf("load file to public store")
defer logger.Debugf("load file to public store finished")
Debugf("write file to public store")
defer logger.Debugf("write file to public store finished")
stgPool, err := exec.GetValueByType[*pool.Pool](ctx)
if err != nil {
@ -46,19 +94,57 @@ func (o *PublicLoad) Execute(ctx *exec.ExecContext, e *exec.Executor) error {
return store.Write(o.ObjectPath, input.Stream)
}
func (o *PublicLoad) String() string {
return fmt.Sprintf("PublicLoad %v -> %v:%v", o.Input, o.UserSpace, o.ObjectPath)
func (o *PublicWrite) String() string {
return fmt.Sprintf("PublicWrite %v -> %v:%v", o.Input, o.UserSpace, o.ObjectPath)
}
type PublicLoadNode struct {
type PublicReadNode struct {
dag.NodeBase
From ioswitch2.From
UserSpace clitypes.UserSpaceDetail
ObjectPath string
}
func (b *GraphNodeBuilder) NewPublicRead(from ioswitch2.From, userSpace clitypes.UserSpaceDetail, objPath string) *PublicReadNode {
node := &PublicReadNode{
From: from,
UserSpace: userSpace,
ObjectPath: objPath,
}
b.AddNode(node)
node.OutputStreams().Init(node, 1)
return node
}
func (t *PublicReadNode) GetFrom() ioswitch2.From {
return t.From
}
func (t *PublicReadNode) Output() dag.StreamOutputSlot {
return dag.StreamOutputSlot{
Node: t,
Index: 0,
}
}
func (t *PublicReadNode) GenerateOp() (exec.Op, error) {
return &PublicRead{
Output: t.Output().Var().VarID,
UserSpace: t.UserSpace,
ObjectPath: t.ObjectPath,
}, nil
}
type PublicWriteNode struct {
dag.NodeBase
To ioswitch2.To
UserSpace clitypes.UserSpaceDetail
ObjectPath string
}
func (b *GraphNodeBuilder) NewPublicLoad(to ioswitch2.To, userSpace clitypes.UserSpaceDetail, objPath string) *PublicLoadNode {
node := &PublicLoadNode{
func (b *GraphNodeBuilder) NewPublicWrite(to ioswitch2.To, userSpace clitypes.UserSpaceDetail, objPath string) *PublicWriteNode {
node := &PublicWriteNode{
To: to,
UserSpace: userSpace,
ObjectPath: objPath,
@ -69,23 +155,23 @@ func (b *GraphNodeBuilder) NewPublicLoad(to ioswitch2.To, userSpace clitypes.Use
return node
}
func (t *PublicLoadNode) GetTo() ioswitch2.To {
func (t *PublicWriteNode) GetTo() ioswitch2.To {
return t.To
}
func (t *PublicLoadNode) SetInput(input *dag.StreamVar) {
func (t *PublicWriteNode) SetInput(input *dag.StreamVar) {
input.To(t, 0)
}
func (t *PublicLoadNode) Input() dag.StreamInputSlot {
func (t *PublicWriteNode) Input() dag.StreamInputSlot {
return dag.StreamInputSlot{
Node: t,
Index: 0,
}
}
func (t *PublicLoadNode) GenerateOp() (exec.Op, error) {
return &PublicLoad{
func (t *PublicWriteNode) GenerateOp() (exec.Op, error) {
return &PublicWrite{
Input: t.InputStreams().Get(0).VarID,
UserSpace: t.UserSpace,
ObjectPath: t.ObjectPath,

View File

@ -336,6 +336,24 @@ func buildFromNode(ctx *state.GenerateState, f ioswitch2.From) (ops2.FromNode, e
return n, nil
case *ioswitch2.FromPublicStore:
// TODO 可以考虑支持设置读取范围
n := ctx.DAG.NewPublicRead(f, f.Space, f.Path)
switch addr := f.Hub.Address.(type) {
case *cortypes.HttpAddressInfo:
n.Env().ToEnvWorker(&ioswitch2.HttpHubWorker{Hub: f.Hub})
n.Env().Pinned = true
case *cortypes.GRPCAddressInfo:
n.Env().ToEnvWorker(&ioswitch2.HubWorker{Hub: f.Hub, Address: *addr})
n.Env().Pinned = true
default:
return nil, fmt.Errorf("unsupported node address type %T", addr)
}
return n, nil
default:
return nil, fmt.Errorf("unsupported from type %T", f)
}
@ -362,7 +380,7 @@ func buildToNode(ctx *state.GenerateState, t ioswitch2.To) (ops2.ToNode, error)
return n, nil
case *ioswitch2.LoadToPublic:
n := ctx.DAG.NewPublicLoad(t, t.Space, t.ObjectPath)
n := ctx.DAG.NewPublicWrite(t, t.Space, t.ObjectPath)
if err := setEnvByAddress(n, t.Hub, t.Hub.Address); err != nil {
return nil, err

View File

@ -8,7 +8,7 @@ import (
)
type Service interface {
// UserSpaceService
UserSpaceService
CacheService

View File

@ -1,48 +0,0 @@
package hub
/*
import (
"gitlink.org.cn/cloudream/common/pkgs/mq"
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
)
type UserSpaceService interface {
UserSpaceCreatePackage(msg *UserSpaceCreatePackage) (*UserSpaceCreatePackageResp, *mq.CodeMessage)
}
// 启动从UserSpace上传Package的任务
var _ = Register(Service.UserSpaceCreatePackage)
type UserSpaceCreatePackage struct {
mq.MessageBodyBase
UserID cdssdk.UserID `json:"userID"`
BucketID cdssdk.BucketID `json:"bucketID"`
Name string `json:"name"`
UserSpaceID cdssdk.UserSpaceID `json:"userspaceID"`
Path string `json:"path"`
UserSpaceAffinity cdssdk.UserSpaceID `json:"userspaceAffinity"`
}
type UserSpaceCreatePackageResp struct {
mq.MessageBodyBase
Package cdssdk.Package `json:"package"`
}
func ReqUserSpaceCreatePackage(userID cdssdk.UserID, bucketID cdssdk.BucketID, name string, userspaceID cdssdk.UserSpaceID, path string, stgAffinity cdssdk.UserSpaceID) *UserSpaceCreatePackage {
return &UserSpaceCreatePackage{
UserID: userID,
BucketID: bucketID,
Name: name,
UserSpaceID: userspaceID,
Path: path,
UserSpaceAffinity: stgAffinity,
}
}
func RespUserSpaceCreatePackage(pkg cdssdk.Package) *UserSpaceCreatePackageResp {
return &UserSpaceCreatePackageResp{
Package: pkg,
}
}
func (client *Client) UserSpaceCreatePackage(msg *UserSpaceCreatePackage, opts ...mq.RequestOption) (*UserSpaceCreatePackageResp, error) {
return mq.Request(Service.UserSpaceCreatePackage, client.rabbitCli, msg, opts...)
}
*/

View File

@ -0,0 +1,28 @@
package hub
import (
"gitlink.org.cn/cloudream/common/pkgs/mq"
clitypes "gitlink.org.cn/cloudream/jcs-pub/client/types"
stgtypes "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/storage/types"
)
type UserSpaceService interface {
PublicStoreListAll(msg *PublicStoreListAll) (*PublicStoreListAllResp, *mq.CodeMessage)
}
// 启动从UserSpace上传Package的任务
var _ = Register(Service.PublicStoreListAll)
type PublicStoreListAll struct {
mq.MessageBodyBase
UserSpace clitypes.UserSpaceDetail
Path string
}
type PublicStoreListAllResp struct {
mq.MessageBodyBase
Entries []stgtypes.PublicStoreEntry
}
func (client *Client) PublicStoreListAll(msg *PublicStoreListAll, opts ...mq.RequestOption) (*PublicStoreListAllResp, error) {
return mq.Request(Service.PublicStoreListAll, client.rabbitCli, msg, opts...)
}

View File

@ -31,11 +31,21 @@ func (b *builder) FeatureDesc() types.FeatureDesc {
}
func (b *builder) CreateShardStore() (types.ShardStore, error) {
return NewShardStore(b.detail)
cred, ok := b.detail.UserSpace.Credential.(*cortypes.LocalCred)
if !ok {
return nil, fmt.Errorf("invalid storage credential type %T for local storage", b.detail.UserSpace.Credential)
}
return NewShardStore(cred.RootDir, b.detail)
}
func (b *builder) CreatePublicStore() (types.PublicStore, error) {
return NewPublicStore(b.detail)
cred, ok := b.detail.UserSpace.Credential.(*cortypes.LocalCred)
if !ok {
return nil, fmt.Errorf("invalid storage credential type %T for local storage", b.detail.UserSpace.Credential)
}
return NewPublicStore(cred.RootDir, b.detail)
}
func (b *builder) CreateMultiparter() (types.Multiparter, error) {

View File

@ -8,25 +8,30 @@ import (
"gitlink.org.cn/cloudream/common/pkgs/logger"
clitypes "gitlink.org.cn/cloudream/jcs-pub/client/types"
"gitlink.org.cn/cloudream/jcs-pub/common/pkgs/storage/types"
)
type PublicStore struct {
root string
detail *clitypes.UserSpaceDetail
}
func NewPublicStore(detail *clitypes.UserSpaceDetail) (*PublicStore, error) {
func NewPublicStore(root string, detail *clitypes.UserSpaceDetail) (*PublicStore, error) {
return &PublicStore{
root: root,
detail: detail,
}, nil
}
func (s *PublicStore) Write(objPath string, stream io.Reader) error {
err := os.MkdirAll(filepath.Dir(objPath), 0755)
absObjPath := filepath.Join(s.root, objPath)
err := os.MkdirAll(filepath.Dir(absObjPath), 0755)
if err != nil {
return err
}
f, err := os.Create(objPath)
f, err := os.Create(absObjPath)
if err != nil {
return err
}
@ -41,7 +46,8 @@ func (s *PublicStore) Write(objPath string, stream io.Reader) error {
}
func (s *PublicStore) Read(objPath string) (io.ReadCloser, error) {
f, err := os.Open(objPath)
absObjPath := filepath.Join(s.root, objPath)
f, err := os.Open(absObjPath)
if err != nil {
return nil, err
}
@ -49,40 +55,48 @@ func (s *PublicStore) Read(objPath string) (io.ReadCloser, error) {
return f, nil
}
func (s *PublicStore) List(path string, recursive bool) ([]string, error) {
var pathes []string
if recursive {
err := filepath.WalkDir(path, func(path string, d fs.DirEntry, err error) error {
if err != nil {
return err
}
if d.IsDir() {
return nil
}
func (s *PublicStore) ListAll(path string) ([]types.PublicStoreEntry, error) {
absObjPath := filepath.Join(s.root, path)
pathes = append(pathes, filepath.ToSlash(path))
var es []types.PublicStoreEntry
err := filepath.WalkDir(absObjPath, func(path string, d fs.DirEntry, err error) error {
if err != nil {
return err
}
relaPath, err := filepath.Rel(s.root, path)
if err != nil {
return err
}
if d.IsDir() {
es = append(es, types.PublicStoreEntry{
Path: filepath.ToSlash(relaPath),
Size: 0,
IsDir: true,
})
return nil
}
info, err := d.Info()
if err != nil {
return err
}
es = append(es, types.PublicStoreEntry{
Path: filepath.ToSlash(relaPath),
Size: info.Size(),
IsDir: false,
})
if err != nil {
return nil, err
}
} else {
files, err := os.ReadDir(path)
if err != nil {
return nil, err
}
for _, f := range files {
if f.IsDir() {
continue
}
pathes = append(pathes, filepath.ToSlash(filepath.Join(path, f.Name())))
}
return nil
})
if os.IsNotExist(err) {
return nil, nil
}
if err != nil {
return nil, err
}
return pathes, nil
return es, nil
}
func (s *PublicStore) getLogger() logger.Logger {

View File

@ -31,8 +31,8 @@ type ShardStore struct {
done chan any
}
func NewShardStore(detail *clitypes.UserSpaceDetail) (*ShardStore, error) {
absRoot, err := filepath.Abs(detail.UserSpace.ShardStore.Root)
func NewShardStore(root string, detail *clitypes.UserSpaceDetail) (*ShardStore, error) {
absRoot, err := filepath.Abs(filepath.Join(root, detail.UserSpace.ShardStore.BaseDir))
if err != nil {
return nil, fmt.Errorf("get abs root: %w", err)
}

View File

@ -8,6 +8,7 @@ import (
"github.com/aws/aws-sdk-go-v2/service/s3"
"gitlink.org.cn/cloudream/common/pkgs/logger"
clitypes "gitlink.org.cn/cloudream/jcs-pub/client/types"
"gitlink.org.cn/cloudream/jcs-pub/common/pkgs/storage/types"
)
type PublicStore struct {
@ -51,20 +52,17 @@ func (s *PublicStore) Read(objPath string) (io.ReadCloser, error) {
return resp.Body, nil
}
func (s *PublicStore) List(path string, recursive bool) ([]string, error) {
func (s *PublicStore) ListAll(path string) ([]types.PublicStoreEntry, error) {
key := path
// TODO 待测试
input := &s3.ListObjectsInput{
Bucket: aws.String(s.Bucket),
Prefix: aws.String(key),
Bucket: aws.String(s.Bucket),
Prefix: aws.String(key),
Delimiter: aws.String("/"),
}
if !recursive {
input.Delimiter = aws.String("/")
}
var pathes []string
var objs []types.PublicStoreEntry
var marker *string
for {
@ -75,7 +73,11 @@ func (s *PublicStore) List(path string, recursive bool) ([]string, error) {
}
for _, obj := range resp.Contents {
pathes = append(pathes, *obj.Key)
objs = append(objs, types.PublicStoreEntry{
Path: *obj.Key,
Size: *obj.Size,
IsDir: false,
})
}
if !*resp.IsTruncated {
@ -85,7 +87,7 @@ func (s *PublicStore) List(path string, recursive bool) ([]string, error) {
marker = resp.NextMarker
}
return pathes, nil
return objs, nil
}
func (s *PublicStore) getLogger() logger.Logger {

View File

@ -52,7 +52,7 @@ func NewShardStore(detail *clitypes.UserSpaceDetail, cli *s3.Client, bkt string,
}
func (s *ShardStore) Start(ch *types.StorageEventChan) {
s.getLogger().Infof("start, root: %v", s.Detail.UserSpace.ShardStore.Root)
s.getLogger().Infof("start, root: %v", s.Detail.UserSpace.ShardStore.BaseDir)
go func() {
removeTempTicker := time.NewTicker(time.Minute * 10)
@ -81,7 +81,7 @@ func (s *ShardStore) removeUnusedTempFiles() {
for {
resp, err := s.cli.ListObjects(context.Background(), &s3.ListObjectsInput{
Bucket: aws.String(s.Bucket),
Prefix: aws.String(JoinKey(s.Detail.UserSpace.ShardStore.Root, TempDir, "/")),
Prefix: aws.String(JoinKey(s.Detail.UserSpace.ShardStore.BaseDir, TempDir, "/")),
Marker: marker,
})
@ -223,7 +223,7 @@ func (s *ShardStore) createTempFile() (string, string) {
s.lock.Lock()
defer s.lock.Unlock()
tmpDir := JoinKey(s.Detail.UserSpace.ShardStore.Root, TempDir)
tmpDir := JoinKey(s.Detail.UserSpace.ShardStore.BaseDir, TempDir)
tmpName := os2.GenerateRandomFileName(20)
s.workingTempFiles[tmpName] = true
@ -335,7 +335,7 @@ func (s *ShardStore) ListAll() ([]types.FileInfo, error) {
var infos []types.FileInfo
blockDir := JoinKey(s.Detail.UserSpace.ShardStore.Root, BlocksDir)
blockDir := JoinKey(s.Detail.UserSpace.ShardStore.BaseDir, BlocksDir)
var marker *string
for {
@ -384,7 +384,7 @@ func (s *ShardStore) GC(avaiables []clitypes.FileHash) error {
avais[hash] = true
}
blockDir := JoinKey(s.Detail.UserSpace.ShardStore.Root, BlocksDir)
blockDir := JoinKey(s.Detail.UserSpace.ShardStore.BaseDir, BlocksDir)
var deletes []s3types.ObjectIdentifier
var marker *string
@ -454,11 +454,11 @@ func (s *ShardStore) getLogger() logger.Logger {
}
func (s *ShardStore) GetFileDirFromHash(hash clitypes.FileHash) string {
return JoinKey(s.Detail.UserSpace.ShardStore.Root, BlocksDir, hash.GetHashPrefix(2))
return JoinKey(s.Detail.UserSpace.ShardStore.BaseDir, BlocksDir, hash.GetHashPrefix(2))
}
func (s *ShardStore) GetFilePathFromHash(hash clitypes.FileHash) string {
return JoinKey(s.Detail.UserSpace.ShardStore.Root, BlocksDir, hash.GetHashPrefix(2), string(hash))
return JoinKey(s.Detail.UserSpace.ShardStore.BaseDir, BlocksDir, hash.GetHashPrefix(2), string(hash))
}
var _ types.BypassWrite = (*ShardStore)(nil)

View File

@ -4,9 +4,17 @@ import (
"io"
)
type PublicStore interface {
Write(objectPath string, stream io.Reader) error
Read(objectPath string) (io.ReadCloser, error)
// 返回指定路径下的所有文件
List(path string, recursive bool) ([]string, error)
type PublicStoreEntry struct {
Path string
Size int64
IsDir bool
}
type PublicStore interface {
Write(path string, stream io.Reader) error
Read(path string) (io.ReadCloser, error)
// 返回指定路径下的所有文件文件路径是包含path在内的完整路径。返回结果的第一条一定是路径本身可能是文件也可能是目录。
// 如果路径不存在,那么不会返回错误,而是返回一个空列表。
// 返回的内容严格按照存储系统的原始结果来,比如当存储系统是一个对象存储时,那么就可能不会包含目录,或者包含用于模拟的以“/”结尾的对象。
ListAll(path string) ([]PublicStoreEntry, error)
}

View File

@ -144,6 +144,6 @@ func (a *S3Type) String() string {
}
type ShardStoreUserConfig struct {
Root string `json:"root"`
BaseDir string `json:"baseDir"`
MaxSize int64 `json:"maxSize"`
}

View File

@ -23,6 +23,7 @@ type LocalCred struct {
StorageCredential `json:"-"`
serder.Metadata `union:"Local"`
Type string `json:"type"`
RootDir string `json:"rootDir"`
}
type MashupCred struct {

View File

@ -1,59 +0,0 @@
package mq
/*
import (
"gitlink.org.cn/cloudream/common/consts/errorcode"
"gitlink.org.cn/cloudream/common/pkgs/logger"
"gitlink.org.cn/cloudream/common/pkgs/mq"
stgglb "gitlink.org.cn/cloudream/jcs-pub/common/globals"
hubmq "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/mq/hub"
coormq "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/mq/coordinator"
)
func (svc *Service) StorageCreatePackage(msg *hubmq.StorageCreatePackage) (*hubmq.StorageCreatePackageResp, *mq.CodeMessage) {
coorCli, err := stgglb.CoordinatorMQPool.Acquire()
if err != nil {
logger.Warnf("new coordinator client: %s", err.Error())
return nil, mq.Failed(errorcode.OperationFailed, "new coordinator client failed")
}
defer stgglb.CoordinatorMQPool.Release(coorCli)
pub, err := svc.stgHubs.GetPublicStore(msg.StorageID)
if err != nil {
return nil, mq.Failed(errorcode.OperationFailed, err.Error())
}
createResp, err := coorCli.CreatePackage(coormq.NewCreatePackage(msg.UserID, msg.BucketID, msg.Name))
if err != nil {
return nil, mq.Failed(errorcode.OperationFailed, err.Error())
}
uploader, err := svc.uploader.BeginUpdate(msg.UserID, createResp.Package.PackageID, msg.StorageAffinity, nil, nil)
if err != nil {
return nil, mq.Failed(errorcode.OperationFailed, err.Error())
}
objPathes, err := pub.List(msg.Path, true)
for _, p := range objPathes {
o, err := pub.Read(p)
if err != nil {
logger.Warnf("read object %s: %v", p, err)
continue
}
err = uploader.Upload(p, o)
o.Close()
if err != nil {
logger.Warnf("upload object %s: %v", p, err)
continue
}
}
_, err = uploader.Commit()
if err != nil {
return nil, mq.Failed(errorcode.OperationFailed, err.Error())
}
return mq.ReplyOK(hubmq.RespStorageCreatePackage(createResp.Package))
}
*/

View File

@ -0,0 +1,23 @@
package mq
import (
"gitlink.org.cn/cloudream/common/consts/errorcode"
"gitlink.org.cn/cloudream/common/pkgs/mq"
hubmq "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/mq/hub"
)
func (svc *Service) PublicStoreListAll(msg *hubmq.PublicStoreListAll) (*hubmq.PublicStoreListAllResp, *mq.CodeMessage) {
pub, err := svc.stgPool.GetPublicStore(&msg.UserSpace)
if err != nil {
return nil, mq.Failed(errorcode.OperationFailed, err.Error())
}
es, err := pub.ListAll(msg.Path)
if err != nil {
return nil, mq.Failed(errorcode.OperationFailed, err.Error())
}
return mq.ReplyOK(&hubmq.PublicStoreListAllResp{
Entries: es,
})
}