JCS-pub/client/internal/uploader/user_space_upload.go

175 lines
5.2 KiB
Go

package uploader
import (
"context"
"fmt"
"math"
"strings"
"time"
"github.com/samber/lo"
"gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec"
"gitlink.org.cn/cloudream/jcs-pub/client/internal/db"
clitypes "gitlink.org.cn/cloudream/jcs-pub/client/types"
stgglb "gitlink.org.cn/cloudream/jcs-pub/common/globals"
"gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch2"
"gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch2/ops2"
"gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch2/parser"
corrpc "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/rpc/coordinator"
hubrpc "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/rpc/hub"
"gitlink.org.cn/cloudream/jcs-pub/common/pkgs/storage/types"
cortypes "gitlink.org.cn/cloudream/jcs-pub/coordinator/types"
)
func (u *Uploader) UserSpaceUpload(userSpaceID clitypes.UserSpaceID, rootPath string, targetBktID clitypes.BucketID, newPkgName string, uploadAffinity clitypes.UserSpaceID) (*clitypes.Package, error) {
srcSpace := u.spaceMeta.Get(userSpaceID)
if srcSpace == nil {
return nil, fmt.Errorf("user space %d not found", userSpaceID)
}
pkg, err := db.DoTx01(u.db, func(tx db.SQLContext) (clitypes.Package, error) {
_, err := u.db.Bucket().GetByID(tx, targetBktID)
if err != nil {
return clitypes.Package{}, err
}
return u.db.Package().Create(tx, targetBktID, newPkgName, time.Now())
})
if err != nil {
return nil, fmt.Errorf("creating package: %w", err)
}
delPkg := func() {
u.db.Package().Delete(u.db.DefCtx(), pkg.PackageID)
}
spaceIDs, err := u.db.UserSpace().GetAllIDs(u.db.DefCtx())
if err != nil {
delPkg()
return nil, fmt.Errorf("getting user space ids: %w", err)
}
spaceDetails := u.spaceMeta.GetMany(spaceIDs)
spaceDetails = lo.Filter(spaceDetails, func(e *clitypes.UserSpaceDetail, i int) bool {
return e != nil && e.UserSpace.ShardStore != nil
})
coorCli := stgglb.CoordinatorRPCPool.Get()
defer coorCli.Release()
resp, cerr := coorCli.GetHubConnectivities(context.Background(), corrpc.ReqGetHubConnectivities([]cortypes.HubID{srcSpace.RecommendHub.HubID}))
if cerr != nil {
delPkg()
return nil, fmt.Errorf("getting hub connectivities: %w", cerr.ToError())
}
cons := make(map[cortypes.HubID]cortypes.HubConnectivity)
for _, c := range resp.Connectivities {
cons[c.ToHubID] = c
}
var uploadSpaces []UploadSpaceInfo
for _, space := range spaceDetails {
latency := time.Duration(math.MaxInt64)
con, ok := cons[space.RecommendHub.HubID]
if ok && con.Latency != nil {
latency = time.Duration(*con.Latency * float32(time.Millisecond))
}
uploadSpaces = append(uploadSpaces, UploadSpaceInfo{
Space: *space,
Delay: latency,
IsSameLocation: space.RecommendHub.LocationID == srcSpace.RecommendHub.LocationID,
})
}
if len(uploadSpaces) == 0 {
delPkg()
return nil, fmt.Errorf("user no available userspaces")
}
targetSapce := u.chooseUploadStorage(uploadSpaces, uploadAffinity)
addr, ok := srcSpace.RecommendHub.Address.(*cortypes.GRPCAddressInfo)
if !ok {
delPkg()
return nil, fmt.Errorf("master of user space %v has no grpc address", srcSpace.UserSpace)
}
srcHubCli := stgglb.HubRPCPool.Get(stgglb.SelectGRPCAddress(&srcSpace.RecommendHub, addr))
defer srcHubCli.Release()
listAllResp, cerr := srcHubCli.BaseStoreListAll(context.Background(), &hubrpc.BaseStoreListAll{
UserSpace: *srcSpace,
Path: rootPath,
})
if cerr != nil {
delPkg()
return nil, fmt.Errorf("listing base store: %w", cerr.ToError())
}
adds, err := u.uploadFromBaseStore(srcSpace, &targetSapce.Space, listAllResp.Entries, rootPath)
if err != nil {
delPkg()
return nil, fmt.Errorf("uploading from base store: %w", err)
}
_, err = db.DoTx21(u.db, u.db.Object().BatchAdd, pkg.PackageID, adds)
if err != nil {
delPkg()
return nil, fmt.Errorf("adding objects: %w", err)
}
return &pkg, nil
}
func (u *Uploader) uploadFromBaseStore(srcSpace *clitypes.UserSpaceDetail, targetSpace *clitypes.UserSpaceDetail, entries []types.ListEntry, rootPath string) ([]db.AddObjectEntry, error) {
ft := ioswitch2.FromTo{}
for _, e := range entries {
// 可以考虑增加一个配置项来控制是否上传空目录
if e.IsDir {
continue
}
ft.AddFrom(ioswitch2.NewFromBaseStore(*srcSpace, e.Path))
ft.AddTo(ioswitch2.NewToShardStore(*targetSpace, ioswitch2.RawStream(), e.Path))
}
plans := exec.NewPlanBuilder()
err := parser.Parse(ft, plans)
if err != nil {
return nil, fmt.Errorf("parsing plan: %w", err)
}
exeCtx := exec.NewExecContext()
exec.SetValueByType(exeCtx, u.stgPool)
ret, err := plans.Execute(exeCtx).Wait(context.Background())
if err != nil {
return nil, fmt.Errorf("executing plan: %w", err)
}
cleanRoot := strings.TrimSuffix(rootPath, clitypes.ObjectPathSeparator)
adds := make([]db.AddObjectEntry, 0, len(ret))
for _, e := range entries {
if e.IsDir {
continue
}
pat := strings.TrimPrefix(e.Path, cleanRoot+clitypes.ObjectPathSeparator)
if pat == cleanRoot {
pat = clitypes.BaseName(e.Path)
}
info := ret[e.Path].(*ops2.FileInfoValue)
adds = append(adds, db.AddObjectEntry{
Path: pat,
Size: info.Size,
FileHash: info.Hash,
CreateTime: time.Now(),
UserSpaceIDs: []clitypes.UserSpaceID{targetSpace.UserSpace.UserSpaceID},
})
}
return adds, nil
}