This commit is contained in:
songjc 2025-07-03 11:06:04 +08:00
commit 853000bdc6
4 changed files with 92 additions and 59 deletions

View File

@ -3,8 +3,11 @@ package services
import (
"context"
"fmt"
"time"
"github.com/samber/lo"
"gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec"
"gitlink.org.cn/cloudream/common/pkgs/logger"
clitypes "gitlink.org.cn/cloudream/jcs-pub/client/types"
"gorm.io/gorm"
@ -196,48 +199,6 @@ func (svc *UserSpaceService) DownloadPackage(packageID clitypes.PackageID, users
return err
}
rootJPath := clitypes.PathFromJcsPathString(rootPath)
var pinned []clitypes.ObjectID
plans := exec.NewPlanBuilder()
for _, obj := range details {
strg, err := svc.StrategySelector.Select(strategy.Request{
Detail: obj,
DestLocation: destStg.UserSpace.Storage.GetLocation(),
})
if err != nil {
return fmt.Errorf("select download strategy: %w", err)
}
ft := ioswitch2.NewFromTo()
switch strg := strg.(type) {
case *strategy.DirectStrategy:
ft.AddFrom(ioswitch2.NewFromShardstore(strg.Detail.Object.FileHash, strg.UserSpace, ioswitch2.RawStream()))
case *strategy.ECReconstructStrategy:
for i, b := range strg.Blocks {
ft.AddFrom(ioswitch2.NewFromShardstore(b.FileHash, strg.UserSpaces[i], ioswitch2.ECStream(b.Index)))
ft.ECParam = &strg.Redundancy
}
default:
return fmt.Errorf("unsupported download strategy: %T", strg)
}
objPath := clitypes.PathFromJcsPathString(obj.Object.Path)
dstPath := rootJPath.ConcatNew(objPath)
ft.AddTo(ioswitch2.NewToBaseStore(*destStg, dstPath))
// 顺便保存到同存储服务的分片存储中
if destStg.UserSpace.ShardStore != nil {
ft.AddTo(ioswitch2.NewToShardStore(*destStg, ioswitch2.RawStream(), ""))
pinned = append(pinned, obj.Object.ObjectID)
}
err = parser.Parse(ft, plans)
if err != nil {
return fmt.Errorf("parse plan: %w", err)
}
}
mutex, err := reqbuilder.NewBuilder().
UserSpace().Buzy(userspaceID).
MutexLock(svc.PubLock)
@ -246,16 +207,84 @@ func (svc *UserSpaceService) DownloadPackage(packageID clitypes.PackageID, users
}
defer mutex.Unlock()
// 记录访问统计
for _, obj := range details {
svc.AccessStat.AddAccessCounter(obj.Object.ObjectID, packageID, userspaceID, 1)
}
exeCtx := exec.NewExecContext()
exec.SetValueByType(exeCtx, svc.StgPool)
drv := plans.Execute(exeCtx)
_, err = drv.Wait(context.Background())
if err != nil {
return err
rootJPath := clitypes.PathFromJcsPathString(rootPath)
dIndex := 0
var pinned []clitypes.PinnedObject
for dIndex < len(details) {
plans := exec.NewPlanBuilder()
for i := 0; i < 10 && dIndex < len(details); i++ {
strg, err := svc.StrategySelector.Select(strategy.Request{
Detail: details[dIndex],
DestLocation: destStg.UserSpace.Storage.GetLocation(),
})
if err != nil {
return fmt.Errorf("select download strategy: %w", err)
}
ft := ioswitch2.NewFromTo()
switch strg := strg.(type) {
case *strategy.DirectStrategy:
ft.AddFrom(ioswitch2.NewFromShardstore(strg.Detail.Object.FileHash, strg.UserSpace, ioswitch2.RawStream()))
case *strategy.ECReconstructStrategy:
for i, b := range strg.Blocks {
ft.AddFrom(ioswitch2.NewFromShardstore(b.FileHash, strg.UserSpaces[i], ioswitch2.ECStream(b.Index)))
ft.ECParam = &strg.Redundancy
}
default:
return fmt.Errorf("unsupported download strategy: %T", strg)
}
objPath := clitypes.PathFromJcsPathString(details[dIndex].Object.Path)
dstPath := rootJPath.ConcatNew(objPath)
ft.AddTo(ioswitch2.NewToBaseStore(*destStg, dstPath))
// 顺便保存到同存储服务的分片存储中
if destStg.UserSpace.ShardStore != nil {
ft.AddTo(ioswitch2.NewToShardStore(*destStg, ioswitch2.RawStream(), ""))
pinned = append(pinned, clitypes.PinnedObject{
ObjectID: details[dIndex].Object.ObjectID,
UserSpaceID: destStg.UserSpace.UserSpaceID,
CreateTime: time.Now(),
})
}
err = parser.Parse(ft, plans)
if err != nil {
return fmt.Errorf("parse plan: %w", err)
}
dIndex++
}
// 记录访问统计
for _, obj := range details {
svc.AccessStat.AddAccessCounter(obj.Object.ObjectID, packageID, userspaceID, 1)
}
exeCtx := exec.NewExecContext()
exec.SetValueByType(exeCtx, svc.StgPool)
drv := plans.Execute(exeCtx)
_, err = drv.Wait(context.Background())
if err != nil {
return err
}
err = svc.DB.DoTx(func(tx db.SQLContext) error {
objIDs := make([]clitypes.ObjectID, len(pinned))
for i, obj := range pinned {
objIDs[i] = obj.ObjectID
}
avaiIDs, err := svc.DB.Object().BatchTestObjectID(tx, objIDs)
if err != nil {
return err
}
pinned = lo.Filter(pinned, func(p clitypes.PinnedObject, idx int) bool { return avaiIDs[p.ObjectID] })
return svc.DB.PinnedObject().BatchTryCreate(svc.DB.DefCtx(), pinned)
})
if err != nil {
logger.Warnf("create pinned objects: %v", err)
}
}
return nil

View File

@ -89,7 +89,7 @@ func (s *BaseStore) Write(pat clitypes.JPath, stream io.Reader, opt types.WriteO
_, err := s.cli.PutObject(context.TODO(), &s3.PutObjectInput{
Bucket: aws.String(s.Bucket),
Key: aws.String(key.String()),
Body: counter,
Body: hashStr,
Metadata: meta,
})
if err != nil {

View File

@ -143,20 +143,20 @@ func getp(c *cobra.Command, ctx *cmd.CommandContext, opt option, args []string)
dir := filepath.Dir(localPath)
err = os.MkdirAll(dir, 0755)
if err != nil {
fmt.Printf("\tx")
fmt.Printf("\tx\n")
return err
}
fileStartTime := time.Now()
file, err := os.OpenFile(localPath, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0644)
if err != nil {
fmt.Printf("\tx")
fmt.Printf("\tx\n")
return err
}
_, err = io.Copy(file, tr)
if err != nil {
fmt.Printf("\tx")
fmt.Printf("\tx\n")
return err
}

View File

@ -33,13 +33,15 @@ func init() {
c.Flags().BoolVar(&opt.UseID, "id", false, "treat the second argument as package id")
c.Flags().StringVar(&opt.Prefix, "prefix", "", "add prefix to every uploaded file")
c.Flags().BoolVar(&opt.Create, "create", false, "create package if not exists")
c.Flags().Int64Var(&opt.Affinity, "affinity", 0, "affinity user space of the package")
cmd.RootCmd.AddCommand(c)
}
type option struct {
UseID bool
Prefix string
Create bool
UseID bool
Prefix string
Create bool
Affinity int64
}
func putp(c *cobra.Command, ctx *cmd.CommandContext, opt option, args []string) error {
@ -132,6 +134,7 @@ func putp(c *cobra.Command, ctx *cmd.CommandContext, opt option, args []string)
_, err = ctx.Client.Object().Upload(cliapi.ObjectUpload{
Info: cliapi.ObjectUploadInfo{
PackageID: pkgID,
Affinity: clitypes.UserSpaceID(opt.Affinity),
},
Files: iterator.Array(&cliapi.UploadingObject{
Path: pat,
@ -156,6 +159,7 @@ func putp(c *cobra.Command, ctx *cmd.CommandContext, opt option, args []string)
_, err = ctx.Client.Object().Upload(cliapi.ObjectUpload{
Info: cliapi.ObjectUploadInfo{
PackageID: pkgID,
Affinity: clitypes.UserSpaceID(opt.Affinity),
},
Files: iter,
})