调整上传器的代码

This commit is contained in:
Sydonian 2025-04-03 16:07:45 +08:00
parent ab57707a1f
commit 894da1a67e
3 changed files with 162 additions and 192 deletions

View File

@ -9,36 +9,33 @@ import (
"time"
"gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec"
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
"gitlink.org.cn/cloudream/storage2/client/internal/db"
"gitlink.org.cn/cloudream/storage2/client/types"
stgglb "gitlink.org.cn/cloudream/storage2/common/globals"
"gitlink.org.cn/cloudream/storage2/common/pkgs/distlock"
"gitlink.org.cn/cloudream/storage2/common/pkgs/ioswitch2"
"gitlink.org.cn/cloudream/storage2/common/pkgs/ioswitch2/ops2"
"gitlink.org.cn/cloudream/storage2/common/pkgs/ioswitch2/parser"
coormq "gitlink.org.cn/cloudream/storage2/common/pkgs/mq/coordinator"
)
type CreateLoadUploader struct {
pkg cdssdk.Package
userID cdssdk.UserID
pkg types.Package
targetSpaces []types.UserSpaceDetail
loadRoots []string
uploader *Uploader
distlock *distlock.Mutex
successes []coormq.AddObjectEntry
successes []db.AddObjectEntry
lock sync.Mutex
commited bool
}
type CreateLoadResult struct {
Package cdssdk.Package
Objects map[string]cdssdk.Object
Package types.Package
Objects map[string]types.Object
}
func (u *CreateLoadUploader) Upload(pa string, size int64, stream io.Reader) error {
uploadTime := time.Now()
spaceIDs := make([]cdssdk.StorageID, 0, len(u.targetSpaces))
spaceIDs := make([]types.UserSpaceID, 0, len(u.targetSpaces))
ft := ioswitch2.FromTo{}
fromExec, hd := ioswitch2.NewFromDriver(ioswitch2.RawStream())
@ -46,7 +43,7 @@ func (u *CreateLoadUploader) Upload(pa string, size int64, stream io.Reader) err
for i, space := range u.targetSpaces {
ft.AddTo(ioswitch2.NewToShardStore(*space.MasterHub, space, ioswitch2.RawStream(), "fileHash"))
ft.AddTo(ioswitch2.NewLoadToPublic(*space.MasterHub, space, path.Join(u.loadRoots[i], pa)))
spaceIDs = append(spaceIDs, space.Storage.StorageID)
spaceIDs = append(spaceIDs, space.UserSpace.UserSpaceID)
}
plans := exec.NewPlanBuilder()
@ -69,12 +66,12 @@ func (u *CreateLoadUploader) Upload(pa string, size int64, stream io.Reader) err
// 记录上传结果
fileHash := ret["fileHash"].(*ops2.ShardInfoValue).Hash
u.successes = append(u.successes, coormq.AddObjectEntry{
Path: pa,
Size: size,
FileHash: fileHash,
UploadTime: uploadTime,
StorageIDs: spaceIDs,
u.successes = append(u.successes, db.AddObjectEntry{
Path: pa,
Size: size,
FileHash: fileHash,
UploadTime: uploadTime,
UserSpaceIDs: spaceIDs,
})
return nil
}
@ -90,31 +87,25 @@ func (u *CreateLoadUploader) Commit() (CreateLoadResult, error) {
defer u.distlock.Unlock()
coorCli, err := stgglb.CoordinatorMQPool.Acquire()
var addedObjs []types.Object
err := u.uploader.db.DoTx(func(tx db.SQLContext) error {
var err error
addedObjs, err = u.uploader.db.Object().BatchAdd(tx, u.pkg.PackageID, u.successes)
return err
})
if err != nil {
return CreateLoadResult{}, fmt.Errorf("new coordinator client: %w", err)
}
defer stgglb.CoordinatorMQPool.Release(coorCli)
updateResp, err := coorCli.UpdatePackage(coormq.NewUpdatePackage(u.pkg.PackageID, u.successes))
if err != nil {
return CreateLoadResult{}, fmt.Errorf("updating package: %w", err)
return CreateLoadResult{}, fmt.Errorf("adding objects: %w", err)
}
ret := CreateLoadResult{
Package: u.pkg,
Objects: make(map[string]cdssdk.Object),
Objects: make(map[string]types.Object),
}
for _, entry := range updateResp.Added {
for _, entry := range addedObjs {
ret.Objects[entry.Path] = entry
}
for i, stg := range u.targetSpaces {
// 不关注是否成功
coorCli.StoragePackageLoaded(coormq.ReqStoragePackageLoaded(u.userID, stg.Storage.StorageID, u.pkg.PackageID, u.loadRoots[i], nil))
}
return ret, nil
}

View File

@ -10,37 +10,35 @@ import (
"github.com/samber/lo"
"gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec"
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
stgglb "gitlink.org.cn/cloudream/storage2/common/globals"
stgmod "gitlink.org.cn/cloudream/storage2/common/models"
"gitlink.org.cn/cloudream/storage2/client/internal/db"
"gitlink.org.cn/cloudream/storage2/client/types"
"gitlink.org.cn/cloudream/storage2/common/pkgs/distlock"
"gitlink.org.cn/cloudream/storage2/common/pkgs/ioswitch2"
"gitlink.org.cn/cloudream/storage2/common/pkgs/ioswitch2/ops2"
"gitlink.org.cn/cloudream/storage2/common/pkgs/ioswitch2/parser"
coormq "gitlink.org.cn/cloudream/storage2/common/pkgs/mq/coordinator"
)
type UpdateUploader struct {
uploader *Uploader
pkgID cdssdk.PackageID
targetStg stgmod.StorageDetail
distMutex *distlock.Mutex
loadToStgs []stgmod.StorageDetail
loadToPath []string
successes []coormq.AddObjectEntry
lock sync.Mutex
commited bool
uploader *Uploader
pkgID types.PackageID
targetSpace types.UserSpaceDetail
distMutex *distlock.Mutex
loadToSpaces []types.UserSpaceDetail
loadToPath []string
successes []db.AddObjectEntry
lock sync.Mutex
commited bool
}
type UploadStorageInfo struct {
Storage stgmod.StorageDetail
type UploadSpaceInfo struct {
Space types.UserSpaceDetail
Delay time.Duration
IsSameLocation bool
}
type UpdateResult struct {
// 上传成功的文件列表Key为Path
Objects map[string]cdssdk.Object
Objects map[string]types.Object
}
func (w *UpdateUploader) Upload(pat string, stream io.Reader) error {
@ -49,10 +47,10 @@ func (w *UpdateUploader) Upload(pat string, stream io.Reader) error {
ft := ioswitch2.NewFromTo()
fromExec, hd := ioswitch2.NewFromDriver(ioswitch2.RawStream())
ft.AddFrom(fromExec).
AddTo(ioswitch2.NewToShardStore(*w.targetStg.MasterHub, w.targetStg, ioswitch2.RawStream(), "shardInfo"))
AddTo(ioswitch2.NewToShardStore(*w.targetSpace.MasterHub, w.targetSpace, ioswitch2.RawStream(), "shardInfo"))
for i, stg := range w.loadToStgs {
ft.AddTo(ioswitch2.NewLoadToPublic(*stg.MasterHub, stg, path.Join(w.loadToPath[i], pat)))
for i, space := range w.loadToSpaces {
ft.AddTo(ioswitch2.NewLoadToPublic(*space.MasterHub, space, path.Join(w.loadToPath[i], pat)))
}
plans := exec.NewPlanBuilder()
@ -75,12 +73,12 @@ func (w *UpdateUploader) Upload(pat string, stream io.Reader) error {
// 记录上传结果
shardInfo := ret["shardInfo"].(*ops2.ShardInfoValue)
w.successes = append(w.successes, coormq.AddObjectEntry{
Path: pat,
Size: shardInfo.Size,
FileHash: shardInfo.Hash,
UploadTime: uploadTime,
StorageIDs: []cdssdk.StorageID{w.targetStg.Storage.StorageID},
w.successes = append(w.successes, db.AddObjectEntry{
Path: pat,
Size: shardInfo.Size,
FileHash: shardInfo.Hash,
UploadTime: uploadTime,
UserSpaceIDs: []types.UserSpaceID{w.targetSpace.UserSpace.UserSpaceID},
})
return nil
}
@ -90,7 +88,7 @@ func (w *UpdateUploader) CancelObject(path string) {
w.lock.Lock()
defer w.lock.Unlock()
w.successes = lo.Reject(w.successes, func(e coormq.AddObjectEntry, i int) bool {
w.successes = lo.Reject(w.successes, func(e db.AddObjectEntry, i int) bool {
return e.Path == path
})
}
@ -119,22 +117,21 @@ func (w *UpdateUploader) Commit() (UpdateResult, error) {
defer w.distMutex.Unlock()
coorCli, err := stgglb.CoordinatorMQPool.Acquire()
var addedObjs []types.Object
err := w.uploader.db.DoTx(func(tx db.SQLContext) error {
var err error
addedObjs, err = w.uploader.db.Object().BatchAdd(tx, w.pkgID, w.successes)
return err
})
if err != nil {
return UpdateResult{}, fmt.Errorf("new coordinator client: %w", err)
}
defer stgglb.CoordinatorMQPool.Release(coorCli)
updateResp, err := coorCli.UpdatePackage(coormq.NewUpdatePackage(w.pkgID, w.successes))
if err != nil {
return UpdateResult{}, fmt.Errorf("updating package: %w", err)
return UpdateResult{}, fmt.Errorf("adding objects: %w", err)
}
ret := UpdateResult{
Objects: make(map[string]cdssdk.Object),
Objects: make(map[string]types.Object),
}
for _, entry := range updateResp.Added {
for _, entry := range addedObjs {
ret.Objects[entry.Path] = entry
}

View File

@ -10,112 +10,104 @@ import (
"github.com/samber/lo"
"gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec"
"gitlink.org.cn/cloudream/common/utils/lo2"
"gitlink.org.cn/cloudream/common/utils/sort2"
"gitlink.org.cn/cloudream/scheduler/common/pkgs/db"
cdssdk "gitlink.org.cn/cloudream/storage2/client/types"
stgglb "gitlink.org.cn/cloudream/storage2/common/globals"
stgmod "gitlink.org.cn/cloudream/storage2/common/models"
"gitlink.org.cn/cloudream/storage2/client/internal/db"
"gitlink.org.cn/cloudream/storage2/client/internal/metacache"
"gitlink.org.cn/cloudream/storage2/client/internal/publics"
"gitlink.org.cn/cloudream/storage2/client/types"
"gitlink.org.cn/cloudream/storage2/common/pkgs/connectivity"
"gitlink.org.cn/cloudream/storage2/common/pkgs/distlock"
"gitlink.org.cn/cloudream/storage2/common/pkgs/distlock/reqbuilder"
"gitlink.org.cn/cloudream/storage2/common/pkgs/ioswitch2"
"gitlink.org.cn/cloudream/storage2/common/pkgs/ioswitch2/ops2"
"gitlink.org.cn/cloudream/storage2/common/pkgs/ioswitch2/parser"
"gitlink.org.cn/cloudream/storage2/common/pkgs/metacache"
coormq "gitlink.org.cn/cloudream/storage2/common/pkgs/mq/coordinator"
"gitlink.org.cn/cloudream/storage2/common/pkgs/storage/agtpool"
"gitlink.org.cn/cloudream/storage2/common/pkgs/storage/factory"
)
type Uploader struct {
distlock *distlock.Service
connectivity *connectivity.Collector
stgAgts *agtpool.AgentPool
stgMeta *metacache.UserSpaceMeta
spaceMeta *metacache.UserSpaceMeta
db *db.DB
}
func NewUploader(distlock *distlock.Service, connectivity *connectivity.Collector, stgAgts *agtpool.AgentPool, stgMeta *metacache.UserSpaceMeta, db *db.DB) *Uploader {
func NewUploader(distlock *distlock.Service, connectivity *connectivity.Collector, stgAgts *agtpool.AgentPool, spaceMeta *metacache.UserSpaceMeta, db *db.DB) *Uploader {
return &Uploader{
distlock: distlock,
connectivity: connectivity,
stgAgts: stgAgts,
stgMeta: stgMeta,
spaceMeta: spaceMeta,
}
}
func (u *Uploader) BeginUpdate(pkgID cdssdk.PackageID, affinity cdssdk.UserSpaceID, loadTo []cdssdk.UserSpaceID, loadToPath []string) (*UpdateUploader, error) {
coorCli, err := stgglb.CoordinatorMQPool.Acquire()
func (u *Uploader) BeginUpdate(pkgID types.PackageID, affinity types.UserSpaceID, loadTo []types.UserSpaceID, loadToPath []string) (*UpdateUploader, error) {
spaceIDs, err := u.db.UserSpace().GetAllIDs(u.db.DefCtx())
if err != nil {
return nil, fmt.Errorf("new coordinator client: %w", err)
return nil, fmt.Errorf("getting user space ids: %w", err)
}
defer stgglb.CoordinatorMQPool.Release(coorCli)
getUserStgsResp, err := coorCli.GetUserStorageDetails(coormq.ReqGetUserStorageDetails(userID))
if err != nil {
return nil, fmt.Errorf("getting user storages: %w", err)
}
spaceDetails := u.spaceMeta.GetMany(spaceIDs)
spaceDetails = lo2.RemoveAllDefault(spaceDetails)
cons := u.connectivity.GetAll()
var userStgs []UploadStorageInfo
for _, stg := range getUserStgsResp.Storages {
if stg.MasterHub == nil {
var uploadSpaces []UploadSpaceInfo
for _, space := range spaceDetails {
if space.MasterHub == nil {
continue
}
delay := time.Duration(math.MaxInt64)
latency := time.Duration(math.MaxInt64)
con, ok := cons[stg.MasterHub.HubID]
con, ok := cons[space.MasterHub.HubID]
if ok && con.Latency != nil {
delay = *con.Latency
latency = *con.Latency
}
userStgs = append(userStgs, UploadStorageInfo{
Storage: stg,
Delay: delay,
IsSameLocation: stg.MasterHub.LocationID == stgglb.Local.LocationID,
uploadSpaces = append(uploadSpaces, UploadSpaceInfo{
Space: space,
Delay: latency,
IsSameLocation: space.MasterHub.LocationID == publics.Local.LocationID,
})
}
if len(userStgs) == 0 {
if len(uploadSpaces) == 0 {
return nil, fmt.Errorf("user no available storages")
}
loadToStgs := make([]stgmod.StorageDetail, len(loadTo))
for i, stgID := range loadTo {
stg, ok := lo.Find(getUserStgsResp.Storages, func(stg stgmod.StorageDetail) bool {
return stg.Storage.StorageID == stgID
loadToSpaces := make([]types.UserSpaceDetail, len(loadTo))
for i, spaceID := range loadTo {
space, ok := lo.Find(spaceDetails, func(space *types.UserSpaceDetail) bool {
return space.UserSpace.UserSpaceID == spaceID
})
if !ok {
return nil, fmt.Errorf("load to storage %v not found", stgID)
return nil, fmt.Errorf("load to storage %v not found", spaceID)
}
if stg.MasterHub == nil {
return nil, fmt.Errorf("load to storage %v has no master hub", stgID)
}
if !factory.GetBuilder(stg).PublicStoreDesc().Enabled() {
return nil, fmt.Errorf("load to storage %v has no public store", stgID)
if space.MasterHub == nil {
return nil, fmt.Errorf("load to storage %v has no master hub", spaceID)
}
loadToStgs[i] = stg
loadToSpaces[i] = *space
}
target := u.chooseUploadStorage(userStgs, affinity)
target := u.chooseUploadStorage(uploadSpaces, affinity)
// TODO2 加锁
// 给上传节点的IPFS加锁
// TODO 考虑加Object的Create锁
// 防止上传的副本被清除
distMutex, err := reqbuilder.NewBuilder().Shard().Buzy(target.Storage.Storage.StorageID).MutexLock(u.distlock)
if err != nil {
return nil, fmt.Errorf("acquire distlock: %w", err)
}
// distMutex, err := reqbuilder.NewBuilder().Shard().Buzy(target.Space.Storage.StorageID).MutexLock(u.distlock)
// if err != nil {
// return nil, fmt.Errorf("acquire distlock: %w", err)
// }
return &UpdateUploader{
uploader: u,
pkgID: pkgID,
targetStg: target.Storage,
distMutex: distMutex,
loadToStgs: loadToStgs,
loadToPath: loadToPath,
uploader: u,
pkgID: pkgID,
targetSpace: target.Space,
// distMutex: distMutex,
loadToSpaces: loadToSpaces,
loadToPath: loadToPath,
}, nil
}
@ -123,122 +115,109 @@ func (u *Uploader) BeginUpdate(pkgID cdssdk.PackageID, affinity cdssdk.UserSpace
// 1. 选择设置了亲和性的节点
// 2. 从与当前客户端相同地域的节点中随机选一个
// 3. 没有的话从所有节点选择延迟最低的节点
func (w *Uploader) chooseUploadStorage(storages []UploadStorageInfo, stgAffinity cdssdk.StorageID) UploadStorageInfo {
if stgAffinity > 0 {
aff, ok := lo.Find(storages, func(storage UploadStorageInfo) bool { return storage.Storage.Storage.StorageID == stgAffinity })
func (w *Uploader) chooseUploadStorage(spaces []UploadSpaceInfo, spaceAffinity types.UserSpaceID) UploadSpaceInfo {
if spaceAffinity > 0 {
aff, ok := lo.Find(spaces, func(space UploadSpaceInfo) bool { return space.Space.UserSpace.UserSpaceID == spaceAffinity })
if ok {
return aff
}
}
sameLocationStorages := lo.Filter(storages, func(e UploadStorageInfo, i int) bool { return e.IsSameLocation })
sameLocationStorages := lo.Filter(spaces, func(e UploadSpaceInfo, i int) bool { return e.IsSameLocation })
if len(sameLocationStorages) > 0 {
return sameLocationStorages[rand.Intn(len(sameLocationStorages))]
}
// 选择延迟最低的节点
storages = sort2.Sort(storages, func(e1, e2 UploadStorageInfo) int { return sort2.Cmp(e1.Delay, e2.Delay) })
spaces = sort2.Sort(spaces, func(e1, e2 UploadSpaceInfo) int { return sort2.Cmp(e1.Delay, e2.Delay) })
return storages[0]
return spaces[0]
}
func (u *Uploader) BeginCreateLoad(userID cdssdk.UserID, bktID cdssdk.BucketID, pkgName string, loadTo []cdssdk.StorageID, loadToPath []string) (*CreateLoadUploader, error) {
coorCli, err := stgglb.CoordinatorMQPool.Acquire()
if err != nil {
return nil, fmt.Errorf("new coordinator client: %w", err)
}
defer stgglb.CoordinatorMQPool.Release(coorCli)
func (u *Uploader) BeginCreateLoad(bktID types.BucketID, pkgName string, loadTo []types.UserSpaceID, loadToPath []string) (*CreateLoadUploader, error) {
getSpaces := u.spaceMeta.GetMany(loadTo)
getStgs := u.stgMeta.GetMany(loadTo)
targetStgs := make([]stgmod.StorageDetail, len(loadTo))
for i, stg := range getStgs {
spacesStgs := make([]types.UserSpaceDetail, len(loadTo))
for i, stg := range getSpaces {
if stg == nil {
return nil, fmt.Errorf("storage %v not found", loadTo[i])
}
targetStgs[i] = *stg
spacesStgs[i] = *stg
}
createPkg, err := coorCli.CreatePackage(coormq.NewCreatePackage(userID, bktID, pkgName))
pkg, err := u.db.Package().Create(u.db.DefCtx(), bktID, pkgName)
if err != nil {
return nil, fmt.Errorf("create package: %w", err)
}
reqBld := reqbuilder.NewBuilder()
for _, stg := range targetStgs {
reqBld.Shard().Buzy(stg.Storage.StorageID)
reqBld.Storage().Buzy(stg.Storage.StorageID)
}
lock, err := reqBld.MutexLock(u.distlock)
if err != nil {
return nil, fmt.Errorf("acquire distlock: %w", err)
}
// TODO2 加锁
// reqBld := reqbuilder.NewBuilder()
// for _, stg := range spacesStgs {
// reqBld.Shard().Buzy(stg.Storage.StorageID)
// reqBld.Storage().Buzy(stg.Storage.StorageID)
// }
// lock, err := reqBld.MutexLock(u.distlock)
// if err != nil {
// return nil, fmt.Errorf("acquire distlock: %w", err)
// }
return &CreateLoadUploader{
pkg: createPkg.Package,
userID: userID,
targetSpaces: targetStgs,
pkg: pkg,
targetSpaces: spacesStgs,
loadRoots: loadToPath,
uploader: u,
distlock: lock,
// distlock: lock,
}, nil
}
func (u *Uploader) UploadPart(userID cdssdk.UserID, objID cdssdk.ObjectID, index int, stream io.Reader) error {
coorCli, err := stgglb.CoordinatorMQPool.Acquire()
func (u *Uploader) UploadPart(objID types.ObjectID, index int, stream io.Reader) error {
detail, err := u.db.Object().GetDetail(u.db.DefCtx(), objID)
if err != nil {
return fmt.Errorf("new coordinator client: %w", err)
}
defer stgglb.CoordinatorMQPool.Release(coorCli)
details, err := coorCli.GetObjectDetails(coormq.ReqGetObjectDetails([]cdssdk.ObjectID{objID}))
if err != nil {
return err
return fmt.Errorf("getting object detail: %w", err)
}
if details.Objects[0] == nil {
return fmt.Errorf("object %v not found", objID)
}
objDe := details.Objects[0]
_, ok := objDe.Object.Redundancy.(*cdssdk.MultipartUploadRedundancy)
objDe := detail
_, ok := objDe.Object.Redundancy.(*types.MultipartUploadRedundancy)
if !ok {
return fmt.Errorf("object %v is not a multipart upload", objID)
}
var stg stgmod.StorageDetail
var space types.UserSpaceDetail
if len(objDe.Blocks) > 0 {
cstg := u.stgMeta.Get(objDe.Blocks[0].StorageID)
cstg := u.spaceMeta.Get(objDe.Blocks[0].UserSpaceID)
if cstg == nil {
return fmt.Errorf("storage %v not found", objDe.Blocks[0].StorageID)
return fmt.Errorf("space %v not found", objDe.Blocks[0].UserSpaceID)
}
stg = *cstg
space = *cstg
} else {
getUserStgsResp, err := coorCli.GetUserStorageDetails(coormq.ReqGetUserStorageDetails(userID))
spaceIDs, err := u.db.UserSpace().GetAllIDs(u.db.DefCtx())
if err != nil {
return fmt.Errorf("getting user storages: %w", err)
return fmt.Errorf("getting user space ids: %w", err)
}
spaces := u.spaceMeta.GetMany(spaceIDs)
spaces = lo2.RemoveAllDefault(spaces)
cons := u.connectivity.GetAll()
var userStgs []UploadStorageInfo
for _, stg := range getUserStgsResp.Storages {
if stg.MasterHub == nil {
var userStgs []UploadSpaceInfo
for _, space := range spaces {
if space.MasterHub == nil {
continue
}
delay := time.Duration(math.MaxInt64)
con, ok := cons[stg.MasterHub.HubID]
con, ok := cons[space.MasterHub.HubID]
if ok && con.Latency != nil {
delay = *con.Latency
}
userStgs = append(userStgs, UploadStorageInfo{
Storage: stg,
userStgs = append(userStgs, UploadSpaceInfo{
Space: *space,
Delay: delay,
IsSameLocation: stg.MasterHub.LocationID == stgglb.Local.LocationID,
IsSameLocation: space.MasterHub.LocationID == publics.Local.LocationID,
})
}
@ -246,19 +225,20 @@ func (u *Uploader) UploadPart(userID cdssdk.UserID, objID cdssdk.ObjectID, index
return fmt.Errorf("user no available storages")
}
stg = u.chooseUploadStorage(userStgs, 0).Storage
space = u.chooseUploadStorage(userStgs, 0).Space
}
lock, err := reqbuilder.NewBuilder().Shard().Buzy(stg.Storage.StorageID).MutexLock(u.distlock)
if err != nil {
return fmt.Errorf("acquire distlock: %w", err)
}
defer lock.Unlock()
// TODO2 加锁
// lock, err := reqbuilder.NewBuilder().Shard().Buzy(space.Storage.StorageID).MutexLock(u.distlock)
// if err != nil {
// return fmt.Errorf("acquire distlock: %w", err)
// }
// defer lock.Unlock()
ft := ioswitch2.NewFromTo()
fromDrv, hd := ioswitch2.NewFromDriver(ioswitch2.RawStream())
ft.AddFrom(fromDrv).
AddTo(ioswitch2.NewToShardStore(*stg.MasterHub, stg, ioswitch2.RawStream(), "shard"))
AddTo(ioswitch2.NewToShardStore(*space.MasterHub, space, ioswitch2.RawStream(), "shard"))
plans := exec.NewPlanBuilder()
err = parser.Parse(ft, plans)
@ -276,12 +256,14 @@ func (u *Uploader) UploadPart(userID cdssdk.UserID, objID cdssdk.ObjectID, index
}
shardInfo := ret["shard"].(*ops2.ShardInfoValue)
_, err = coorCli.AddMultipartUploadPart(coormq.ReqAddMultipartUploadPart(userID, objID, stgmod.ObjectBlock{
ObjectID: objID,
Index: index,
StorageID: stg.Storage.StorageID,
FileHash: shardInfo.Hash,
Size: shardInfo.Size,
}))
err = u.db.DoTx(func(tx db.SQLContext) error {
return u.db.Object().AppendPart(tx, types.ObjectBlock{
ObjectID: objID,
Index: index,
UserSpaceID: space.UserSpace.UserSpaceID,
FileHash: shardInfo.Hash,
Size: shardInfo.Size,
})
})
return err
}