JCS-pub/client/internal/uploader/uploader.go

273 lines
7.8 KiB
Go

package uploader
import (
"context"
"fmt"
"io"
"math"
"math/rand"
"time"
"github.com/samber/lo"
"gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec"
"gitlink.org.cn/cloudream/common/utils/lo2"
"gitlink.org.cn/cloudream/common/utils/sort2"
"gitlink.org.cn/cloudream/jcs-pub/client/internal/db"
"gitlink.org.cn/cloudream/jcs-pub/client/internal/metacache"
clitypes "gitlink.org.cn/cloudream/jcs-pub/client/types"
stgglb "gitlink.org.cn/cloudream/jcs-pub/common/globals"
"gitlink.org.cn/cloudream/jcs-pub/common/pkgs/connectivity"
"gitlink.org.cn/cloudream/jcs-pub/common/pkgs/distlock"
"gitlink.org.cn/cloudream/jcs-pub/common/pkgs/distlock/reqbuilder"
"gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch2"
"gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch2/ops2"
"gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch2/parser"
"gitlink.org.cn/cloudream/jcs-pub/common/pkgs/storage/pool"
)
type Uploader struct {
pubLock *distlock.Service
connectivity *connectivity.Collector
stgPool *pool.Pool
spaceMeta *metacache.UserSpaceMeta
db *db.DB
}
func NewUploader(pubLock *distlock.Service, connectivity *connectivity.Collector, stgPool *pool.Pool, spaceMeta *metacache.UserSpaceMeta, db *db.DB) *Uploader {
return &Uploader{
pubLock: pubLock,
connectivity: connectivity,
stgPool: stgPool,
spaceMeta: spaceMeta,
db: db,
}
}
func (u *Uploader) BeginUpdate(pkgID clitypes.PackageID, affinity clitypes.UserSpaceID, loadTo []clitypes.UserSpaceID, loadToPath []string) (*UpdateUploader, error) {
spaceIDs, err := u.db.UserSpace().GetAllIDs(u.db.DefCtx())
if err != nil {
return nil, fmt.Errorf("getting user space ids: %w", err)
}
spaceDetails := u.spaceMeta.GetMany(spaceIDs)
spaceDetails = lo2.RemoveAllDefault(spaceDetails)
cons := u.connectivity.GetAll()
var uploadSpaces []UploadSpaceInfo
for _, space := range spaceDetails {
if space.MasterHub == nil {
continue
}
latency := time.Duration(math.MaxInt64)
con, ok := cons[space.MasterHub.HubID]
if ok && con.Latency != nil {
latency = *con.Latency
}
uploadSpaces = append(uploadSpaces, UploadSpaceInfo{
Space: *space,
Delay: latency,
IsSameLocation: space.MasterHub.LocationID == stgglb.Local.LocationID,
})
}
if len(uploadSpaces) == 0 {
return nil, fmt.Errorf("user no available userspaces")
}
loadToSpaces := make([]clitypes.UserSpaceDetail, len(loadTo))
for i, spaceID := range loadTo {
space, ok := lo.Find(spaceDetails, func(space *clitypes.UserSpaceDetail) bool {
return space.UserSpace.UserSpaceID == spaceID
})
if !ok {
return nil, fmt.Errorf("load to storage %v not found", spaceID)
}
if space.MasterHub == nil {
return nil, fmt.Errorf("load to storage %v has no master hub", spaceID)
}
loadToSpaces[i] = *space
}
target := u.chooseUploadStorage(uploadSpaces, affinity)
// 防止上传的副本被清除
pubLock, err := reqbuilder.NewBuilder().Shard().Buzy(target.Space.UserSpace.UserSpaceID).MutexLock(u.pubLock)
if err != nil {
return nil, fmt.Errorf("acquire lock: %w", err)
}
return &UpdateUploader{
uploader: u,
pkgID: pkgID,
targetSpace: target.Space,
pubLock: pubLock,
loadToSpaces: loadToSpaces,
loadToPath: loadToPath,
}, nil
}
// chooseUploadStorage 选择一个上传文件的节点
// 1. 选择设置了亲和性的节点
// 2. 从与当前客户端相同地域的节点中随机选一个
// 3. 没有的话从所有节点选择延迟最低的节点
func (w *Uploader) chooseUploadStorage(spaces []UploadSpaceInfo, spaceAffinity clitypes.UserSpaceID) UploadSpaceInfo {
if spaceAffinity > 0 {
aff, ok := lo.Find(spaces, func(space UploadSpaceInfo) bool { return space.Space.UserSpace.UserSpaceID == spaceAffinity })
if ok {
return aff
}
}
sameLocationStorages := lo.Filter(spaces, func(e UploadSpaceInfo, i int) bool { return e.IsSameLocation })
if len(sameLocationStorages) > 0 {
return sameLocationStorages[rand.Intn(len(sameLocationStorages))]
}
// 选择延迟最低的节点
spaces = sort2.Sort(spaces, func(e1, e2 UploadSpaceInfo) int { return sort2.Cmp(e1.Delay, e2.Delay) })
return spaces[0]
}
func (u *Uploader) BeginCreateLoad(bktID clitypes.BucketID, pkgName string, loadTo []clitypes.UserSpaceID, loadToPath []string) (*CreateLoadUploader, error) {
getSpaces := u.spaceMeta.GetMany(loadTo)
spacesStgs := make([]clitypes.UserSpaceDetail, len(loadTo))
for i, stg := range getSpaces {
if stg == nil {
return nil, fmt.Errorf("storage %v not found", loadTo[i])
}
spacesStgs[i] = *stg
}
pkg, err := db.DoTx01(u.db, func(tx db.SQLContext) (clitypes.Package, error) {
_, err := u.db.Bucket().GetByID(tx, bktID)
if err != nil {
return clitypes.Package{}, err
}
return u.db.Package().Create(u.db.DefCtx(), bktID, pkgName, time.Now())
})
if err != nil {
return nil, fmt.Errorf("create package: %w", err)
}
reqBld := reqbuilder.NewBuilder()
for _, stg := range spacesStgs {
reqBld.Shard().Buzy(stg.UserSpace.UserSpaceID)
}
lock, err := reqBld.MutexLock(u.pubLock)
if err != nil {
return nil, fmt.Errorf("acquire lock: %w", err)
}
return &CreateLoadUploader{
pkg: pkg,
targetSpaces: spacesStgs,
loadRoots: loadToPath,
uploader: u,
pubLock: lock,
}, nil
}
func (u *Uploader) UploadPart(objID clitypes.ObjectID, index int, stream io.Reader) error {
detail, err := u.db.Object().GetDetail(u.db.DefCtx(), objID)
if err != nil {
return fmt.Errorf("getting object detail: %w", err)
}
objDe := detail
_, ok := objDe.Object.Redundancy.(*clitypes.MultipartUploadRedundancy)
if !ok {
return fmt.Errorf("object %v is not a multipart upload", objID)
}
var space clitypes.UserSpaceDetail
if len(objDe.Blocks) > 0 {
cstg := u.spaceMeta.Get(objDe.Blocks[0].UserSpaceID)
if cstg == nil {
return fmt.Errorf("space %v not found", objDe.Blocks[0].UserSpaceID)
}
space = *cstg
} else {
spaceIDs, err := u.db.UserSpace().GetAllIDs(u.db.DefCtx())
if err != nil {
return fmt.Errorf("getting user space ids: %w", err)
}
spaces := u.spaceMeta.GetMany(spaceIDs)
spaces = lo2.RemoveAllDefault(spaces)
cons := u.connectivity.GetAll()
var userStgs []UploadSpaceInfo
for _, space := range spaces {
if space.MasterHub == nil {
continue
}
delay := time.Duration(math.MaxInt64)
con, ok := cons[space.MasterHub.HubID]
if ok && con.Latency != nil {
delay = *con.Latency
}
userStgs = append(userStgs, UploadSpaceInfo{
Space: *space,
Delay: delay,
IsSameLocation: space.MasterHub.LocationID == stgglb.Local.LocationID,
})
}
if len(userStgs) == 0 {
return fmt.Errorf("user no available storages")
}
space = u.chooseUploadStorage(userStgs, 0).Space
}
lock, err := reqbuilder.NewBuilder().Shard().Buzy(space.UserSpace.UserSpaceID).MutexLock(u.pubLock)
if err != nil {
return fmt.Errorf("acquire lock: %w", err)
}
defer lock.Unlock()
ft := ioswitch2.NewFromTo()
fromDrv, hd := ioswitch2.NewFromDriver(ioswitch2.RawStream())
ft.AddFrom(fromDrv).
AddTo(ioswitch2.NewToShardStore(*space.MasterHub, space, ioswitch2.RawStream(), "shard"))
plans := exec.NewPlanBuilder()
err = parser.Parse(ft, plans)
if err != nil {
return fmt.Errorf("parse fromto: %w", err)
}
exeCtx := exec.NewExecContext()
exec.SetValueByType(exeCtx, u.stgPool)
exec := plans.Execute(exeCtx)
exec.BeginWrite(io.NopCloser(stream), hd)
ret, err := exec.Wait(context.TODO())
if err != nil {
return fmt.Errorf("executing plan: %w", err)
}
shardInfo := ret["shard"].(*ops2.ShardInfoValue)
err = u.db.DoTx(func(tx db.SQLContext) error {
return u.db.Object().AppendPart(tx, clitypes.ObjectBlock{
ObjectID: objID,
Index: index,
UserSpaceID: space.UserSpace.UserSpaceID,
FileHash: shardInfo.Hash,
Size: shardInfo.Size,
})
})
return err
}